Skip to content

Commit

Permalink
[FLINK-27806][table] Support binary & varbinary types for datagen con…
Browse files Browse the repository at this point in the history
…nector

This closes apache#19827.
  • Loading branch information
chucheng92 committed Jul 6, 2022
1 parent b0f0144 commit ef9ce85
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 6 deletions.
4 changes: 2 additions & 2 deletions docs/content.zh/docs/connectors/table/datagen.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ DataGen 连接器是内置的。

每个列,都有两种生成数据的方法:

- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、string (类型)可以指定长度。它是无界的生成器。
- 随机生成器是默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string (类型)可以指定长度。它是无界的生成器。

- 序列生成器,您可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。

Expand Down Expand Up @@ -136,7 +136,7 @@ CREATE TABLE datagen (
<td>可选</td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>随机生成器生成字符的长度,适用于 char、varchar、string。</td>
<td>随机生成器生成字符的长度,适用于 char、varchar、binary、varbinary、string。</td>
</tr>
<tr>
<td><h5>fields.#.start</h5></td>
Expand Down
14 changes: 12 additions & 2 deletions docs/content/docs/connectors/table/datagen.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Usage
-----

By default, a DataGen table will create an unbounded number of rows with a random value for each column.
For variable sized types, char/varchar/string/array/map/multiset, the length can be specified.
For variable sized types, char/varchar/binary/varbinary/string/array/map/multiset, the length can be specified.
Additionally, a total number of rows can be specified, resulting in a bounded table.

There also exists a sequence generator, where users specify a sequence of start and end values.
Expand Down Expand Up @@ -104,6 +104,16 @@ Types
<td>random / sequence</td>
<td></td>
</tr>
<tr>
<td>BINARY</td>
<td>random / sequence</td>
<td></td>
</tr>
<tr>
<td>VARBINARY</td>
<td>random / sequence</td>
<td></td>
</tr>
<tr>
<td>STRING</td>
<td>random / sequence</td>
Expand Down Expand Up @@ -271,7 +281,7 @@ Connector Options
<td>optional</td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>Size or length of the collection for generating char/varchar/string/array/map/multiset types.</td>
<td>Size or length of the collection for generating char/varchar/binary/varbinary/string/array/map/multiset types.</td>
</tr>
<tr>
<td><h5>fields.#.start</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DayTimeIntervalType;
Expand All @@ -49,6 +50,7 @@
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.YearMonthIntervalType;
import org.apache.flink.table.types.logical.ZonedTimestampType;
Expand All @@ -68,6 +70,8 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase {

public static final int RANDOM_STRING_LENGTH_DEFAULT = 100;

public static final int RANDOM_BYTES_LENGTH_DEFAULT = 100;

private static final int RANDOM_COLLECTION_LENGTH_DEFAULT = 3;

private final ConfigOptions.OptionBuilder minKey;
Expand Down Expand Up @@ -135,6 +139,32 @@ public DataGeneratorContainer visit(VarCharType varCharType) {
getRandomStringGenerator(config.get(lenOption)), lenOption);
}

@Override
public DataGeneratorContainer visit(BinaryType binaryType) {
ConfigOption<Integer> lenOption =
key(DataGenConnectorOptionsUtil.FIELDS
+ "."
+ name
+ "."
+ DataGenConnectorOptionsUtil.LENGTH)
.intType()
.defaultValue(RANDOM_BYTES_LENGTH_DEFAULT);
return DataGeneratorContainer.of(getRandomBytesGenerator(config.get(lenOption)), lenOption);
}

@Override
public DataGeneratorContainer visit(VarBinaryType varBinaryType) {
ConfigOption<Integer> lenOption =
key(DataGenConnectorOptionsUtil.FIELDS
+ "."
+ name
+ "."
+ DataGenConnectorOptionsUtil.LENGTH)
.intType()
.defaultValue(RANDOM_BYTES_LENGTH_DEFAULT);
return DataGeneratorContainer.of(getRandomBytesGenerator(config.get(lenOption)), lenOption);
}

@Override
public DataGeneratorContainer visit(TinyIntType tinyIntType) {
ConfigOption<Integer> min = minKey.intType().defaultValue((int) Byte.MIN_VALUE);
Expand Down Expand Up @@ -377,4 +407,15 @@ public TimestampData next() {
}
};
}

private static RandomGenerator<byte[]> getRandomBytesGenerator(int length) {
return new RandomGenerator<byte[]>() {
@Override
public byte[] next() {
byte[] arr = new byte[length];
random.getRandomGenerator().nextBytes(arr);
return arr;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.BooleanType;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.DecimalType;
Expand All @@ -35,8 +36,11 @@
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;

import org.apache.flink.shaded.guava30.com.google.common.primitives.Longs;

import static org.apache.flink.configuration.ConfigOptions.key;

/** Creates a sequential {@link DataGeneratorContainer} for a particular logical type. */
Expand Down Expand Up @@ -120,6 +124,22 @@ public DataGeneratorContainer visit(VarCharType varCharType) {
longEnd);
}

@Override
public DataGeneratorContainer visit(BinaryType binaryType) {
return DataGeneratorContainer.of(
getSequenceBytesGenerator(config.get(longStart), config.get(longEnd)),
longStart,
longEnd);
}

@Override
public DataGeneratorContainer visit(VarBinaryType varBinaryType) {
return DataGeneratorContainer.of(
getSequenceBytesGenerator(config.get(longStart), config.get(longEnd)),
longStart,
longEnd);
}

@Override
public DataGeneratorContainer visit(TinyIntType tinyIntType) {
return DataGeneratorContainer.of(
Expand Down Expand Up @@ -191,4 +211,18 @@ public StringData next() {
}
};
}

private static SequenceGenerator<byte[]> getSequenceBytesGenerator(long start, long end) {
return new SequenceGenerator<byte[]>(start, end) {
@Override
public byte[] next() {
Long value = valuesToEmit.poll();
if (value != null) {
return Longs.toByteArray(value);
} else {
return new byte[0];
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class DataGenTableSourceFactoryTest {
Column.physical("f0", DataTypes.STRING()),
Column.physical("f1", DataTypes.BIGINT()),
Column.physical("f2", DataTypes.BIGINT()),
Column.physical("f3", DataTypes.TIMESTAMP()));
Column.physical("f3", DataTypes.TIMESTAMP()),
Column.physical("f4", DataTypes.BINARY(2)),
Column.physical("f5", DataTypes.VARBINARY(4)));

@Test
void testDataTypeCoverage() throws Exception {
Expand Down Expand Up @@ -94,7 +96,10 @@ void testDataTypeCoverage() throws Exception {
"c",
DataTypes.ROW(
DataTypes.FIELD(
"d", DataTypes.TIMESTAMP()))))));
"d", DataTypes.TIMESTAMP()))))),
Column.physical("f21", DataTypes.BINARY(2)),
Column.physical("f22", DataTypes.BYTES()),
Column.physical("f23", DataTypes.VARBINARY(4)));

DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
Expand Down Expand Up @@ -165,6 +170,20 @@ void testSource() throws Exception {
DataGenConnectorOptionsUtil.FIELDS + ".f3." + DataGenConnectorOptionsUtil.MAX_PAST,
"5s");

descriptor.putString(
DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.KIND,
DataGenConnectorOptionsUtil.RANDOM);
descriptor.putLong(
DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.LENGTH,
2);
descriptor.putString(
DataGenConnectorOptionsUtil.FIELDS + ".f5." + DataGenConnectorOptionsUtil.KIND,
DataGenConnectorOptionsUtil.SEQUENCE);
descriptor.putLong(
DataGenConnectorOptionsUtil.FIELDS + ".f5." + DataGenConnectorOptionsUtil.START, 1);
descriptor.putLong(
DataGenConnectorOptionsUtil.FIELDS + ".f5." + DataGenConnectorOptionsUtil.END, 11);

final long begin = System.currentTimeMillis();
List<RowData> results = runGenerator(SCHEMA, descriptor);
final long end = System.currentTimeMillis();
Expand All @@ -176,6 +195,10 @@ void testSource() throws Exception {
assertThat(row.getLong(1)).isBetween(10L, 100L);
assertThat(row.getLong(2)).isEqualTo(i + 50);
assertThat(row.getTimestamp(3, 3).getMillisecond()).isBetween(begin - 5000, end);
assertThat(row.getBinary(4).length).isEqualTo(2);
// f5 is sequence bytes produced in sequence long [1, 11]
assertThat(row.getBinary(5).length).isEqualTo(8);
assertThat(row.getBinary(5)[row.getBinary(5).length - 1]).isEqualTo((byte) (i + 1));
}
}

Expand Down

0 comments on commit ef9ce85

Please sign in to comment.