Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data-gen): support KAFKA format in DataGen #3120

Merged
merged 8 commits into from
Aug 6, 2019
6 changes: 4 additions & 2 deletions docs/tutorials/generate-custom-test-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ Required Arguments
Name Default Description
========================================== ======= ===========================================================================================
``schema=<avro schema file>`` Path to an Avro schema file. Requires the ``format``, ``topic``, and ``key`` options.
``format=<record format>`` json Format of generated records: one of ``avro``, ``json``, or ``delimited``. Case-insensitive.
``key-format=<key format>`` kafka Format of generated record keys: one of ``avro``, ``json``, ``delimited``, ``kafka``. Case-insensitive.
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
``value-format=<value format>`` json Format of generated record values: one of ``avro``, ``json``, ``delimited``. Case-insensitive.
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
``topic=<kafka topic name>`` Name of the topic that receives generated records.
``key=<name of key column>`` Field to use as the key for generated records.
``quickstart=<quickstart preset>`` Generate records from a preset schema: ``orders``, ``users``, or ``pageviews``. Case-insensitive.
Expand All @@ -70,7 +71,8 @@ The following options apply to both the ``schema`` and ``quickstart`` options.
Name Default Description
============================================ =================================================== =========================================================================================
``bootstrap-server=<kafka-server>:<port>`` localhost:9092 IP address and port for the Kafka server to connect to.
``format=<record format>`` json Format of generated records: ``avro``, ``json``, or ``delimited``. Case-insensitive. Required by the ``schema`` option.
``key-format=<key format>`` kafka Format of generated record keys: ``avro``, ``json``, ``delimited`` or ``kafka``. Case-insensitive. Required by the ``schema`` option.
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
``value-format=<value format>`` json Format of generated record values: ``avro``, ``json``, or ``delimited``. Case-insensitive. Required by the ``schema`` option.
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
``topic=<kafka topic name>`` Name of the topic that receives generated records. Required by the ``schema`` option.
``key=<name of key column>`` Field to use as the key for generated records. Required by the ``schema`` option.
``iterations=<number of records>`` 1,000,000 The maximum number of records to generate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,14 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -72,7 +69,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings("WeakerAccess")
public class IntegrationTestHarness extends ExternalResource {
public final class IntegrationTestHarness extends ExternalResource {

private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestHarness.class);
private static final int DEFAULT_PARTITION_COUNT = 1;
Expand Down Expand Up @@ -166,8 +163,11 @@ public void ensureTopics(final int partitionCount, final String... topicNames) {
*/
public void produceRecord(final String topicName, final String key, final String data) {
try {
try (final KafkaProducer<String, String> producer =
new KafkaProducer<>(producerConfig(), new StringSerializer(), new StringSerializer())) {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
kafkaCluster.producerConfig(),
new StringSerializer(),
new StringSerializer())
) {
producer.send(new ProducerRecord<>(topicName, key, data)).get();
}
} catch (final Exception e) {
Expand Down Expand Up @@ -235,9 +235,11 @@ public Map<String, RecordMetadata> produceRows(
) {
ensureTopics(topic);

try (KafkaProducer<String, GenericRow> producer =
new KafkaProducer<>(producerConfig(), new StringSerializer(), valueSerializer)) {

try (KafkaProducer<String, GenericRow> producer = new KafkaProducer<>(
kafkaCluster.producerConfig(),
new StringSerializer(),
valueSerializer
)) {
final Map<String, Future<RecordMetadata>> futures = recordsToPublish.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> {
final String key = entry.getKey();
Expand Down Expand Up @@ -272,12 +274,12 @@ public List<ConsumerRecord<String, String>> verifyAvailableRecords(
final String topic,
final int expectedCount
) {
try (final KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(consumerConfig(), new StringDeserializer(), new StringDeserializer())) {
consumer.subscribe(Collections.singleton(topic.toUpperCase()));

return ConsumerTestUtil.verifyAvailableRecords(consumer, expectedCount);
}
return kafkaCluster.verifyAvailableRecords(
topic,
expectedCount,
new StringDeserializer(),
new StringDeserializer()
);
}

/**
Expand All @@ -298,13 +300,12 @@ public List<ConsumerRecord<String, GenericRow>> verifyAvailableRows(
final Deserializer<GenericRow> valueDeserializer =
getDeserializer(valueFormat, schema);

try (final KafkaConsumer<String, GenericRow> consumer
= new KafkaConsumer<>(consumerConfig(), new StringDeserializer(), valueDeserializer)) {

consumer.subscribe(Collections.singleton(topic));

return ConsumerTestUtil.verifyAvailableRecords(consumer, expectedCount);
}
return kafkaCluster.verifyAvailableRecords(
topic,
expectedCount,
new StringDeserializer(),
valueDeserializer
);
}

/**
Expand Down Expand Up @@ -371,9 +372,11 @@ public <K> List<ConsumerRecord<K, GenericRow>> verifyAvailableRows(
final Deserializer<GenericRow> valueDeserializer =
getDeserializer(valueFormat, schema);

try (final KafkaConsumer<K, GenericRow> consumer
= new KafkaConsumer<>(consumerConfig(), keyDeserializer, valueDeserializer)) {

try (KafkaConsumer<K, GenericRow> consumer = new KafkaConsumer<>(
kafkaCluster.consumerConfig(),
keyDeserializer,
valueDeserializer
)) {
consumer.subscribe(Collections.singleton(topic));

return ConsumerTestUtil.verifyAvailableRecords(consumer, expected, timeout);
Expand Down Expand Up @@ -442,9 +445,11 @@ public <K> Map<K, GenericRow> verifyAvailableUniqueRows(
final Deserializer<GenericRow> valueDeserializer =
getDeserializer(valueFormat, schema);

try (final KafkaConsumer<K, GenericRow> consumer
= new KafkaConsumer<>(consumerConfig(), keyDeserializer, valueDeserializer)) {

try (KafkaConsumer<K, GenericRow> consumer = new KafkaConsumer<>(
kafkaCluster.consumerConfig(),
keyDeserializer,
valueDeserializer
)) {
consumer.subscribe(Collections.singleton(topic));

final List<ConsumerRecord<K, GenericRow>> consumerRecords = ConsumerTestUtil
Expand Down Expand Up @@ -540,27 +545,6 @@ protected void after() {
kafkaCluster.stop();
}

private Map<String, Object> clientConfig() {
return new HashMap<>(kafkaCluster.getClientProperties());
}

private Map<String, Object> producerConfig() {
final Map<String, Object> config = clientConfig();
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 0);
return config;
}

Map<String, Object> consumerConfig() {
final Map<String, Object> config = clientConfig();
config.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Try to keep consumer groups stable:
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10_000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30_000);
return config;
}

private Serializer<GenericRow> getSerializer(
final Format format,
final PhysicalSchema schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.integration;


import static io.confluent.ksql.serde.Format.JSON;
import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static org.hamcrest.Matchers.hasItems;
Expand Down Expand Up @@ -103,9 +102,9 @@ public void shouldListConsumerGroupsWhenTheyExist() throws InterruptedException
@Test
public void shouldDescribeGroup() throws InterruptedException {
givenTopicExistsWithData();
try (final KafkaConsumer<String, byte[]> c1 = createConsumer(group0)) {
try (KafkaConsumer<String, byte[]> c1 = createConsumer(group0)) {
verifyDescribeGroup(1, group0, ImmutableList.of(c1));
try (final KafkaConsumer<String, byte[]> c2 = createConsumer(group0)) {
try (KafkaConsumer<String, byte[]> c2 = createConsumer(group0)) {
verifyDescribeGroup(2, group0, ImmutableList.of(c1, c2));
}
}
Expand Down Expand Up @@ -135,7 +134,7 @@ private void verifyDescribeGroup(

private void verifyListsGroups(final String newGroup, final List<String> consumerGroups) {

try(final KafkaConsumer<String, byte[]> consumer = createConsumer(newGroup)) {
try (KafkaConsumer<String, byte[]> consumer = createConsumer(newGroup)) {

final Supplier<List<String>> pollAndGetGroups = () -> {
consumer.poll(Duration.ofMillis(1));
Expand All @@ -152,7 +151,7 @@ private void givenTopicExistsWithData() {
}

private KafkaConsumer<String, byte[]> createConsumer(final String group) {
final Map<String, Object> consumerConfigs = TEST_HARNESS.consumerConfig();
final Map<String, Object> consumerConfigs = TEST_HARNESS.getKafkaCluster().consumerConfig();
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, group);

final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(
Expand Down Expand Up @@ -183,8 +182,8 @@ public boolean equals(final Object o) {
return false;
}
final ConsumerAndPartitionCount that = (ConsumerAndPartitionCount) o;
return consumerCount == that.consumerCount &&
partitionCount == that.partitionCount;
return consumerCount == that.consumerCount
&& partitionCount == that.partitionCount;
}

@Override
Expand Down

This file was deleted.

Loading