From 3a24637261d1a433145fafe4d7028016b1ca1f6a Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 23 Jul 2019 21:04:32 +0100 Subject: [PATCH 1/6] feat(data-gen): support KAFKA format in DataGen ksql-data-gen now supports a `key-format` and `value-format`, rather than the previous `format`. `format` is still accepted to maintain backwards compatibility, however, its use is deprecated. `key-format` supports the existing formats and the new `KAFKA` format that writes data using Kafka's inbuilt serializers. `value-format` won't work with the `KAFKA` format as the value schemas have multiple fields. `key-format` defaults to `KAFKA`. This is backwards compatible as keys were previously serialized using `StringSerializer`. `value-format` defaults to `JSON`, as was the case for the old `format` option. --- docs/tutorials/generate-custom-test-data.rst | 6 +- .../integration/IntegrationTestHarness.java | 82 +++----- .../KafkaConsumerGroupClientTest.java | 13 +- .../confluent/ksql/datagen/AvroProducer.java | 71 ------- .../io/confluent/ksql/datagen/DataGen.java | 51 +++-- .../ksql/datagen/DataGenProducer.java | 78 +++++-- .../ksql/datagen/DelimitedProducer.java | 52 ----- .../confluent/ksql/datagen/JsonProducer.java | 52 ----- .../ksql/datagen/ProducerFactory.java | 124 +++++++++-- .../datagen/SchemaRegistryClientFactory.java | 52 +++++ .../ksql/datagen/DataGenFunctionalTest.java | 197 ++++++++++++++++++ .../confluent/ksql/datagen/DataGenTest.java | 24 ++- .../ksql/datagen/JsonProducerTest.java | 79 ------- .../util/EmbeddedSingleNodeKafkaCluster.java | 76 +++++++ 14 files changed, 597 insertions(+), 360 deletions(-) delete mode 100644 ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java delete mode 100644 ksql-examples/src/main/java/io/confluent/ksql/datagen/DelimitedProducer.java delete mode 100644 ksql-examples/src/main/java/io/confluent/ksql/datagen/JsonProducer.java create mode 100644 ksql-examples/src/main/java/io/confluent/ksql/datagen/SchemaRegistryClientFactory.java create mode 100644 ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenFunctionalTest.java delete mode 100644 ksql-examples/src/test/java/io/confluent/ksql/datagen/JsonProducerTest.java diff --git a/docs/tutorials/generate-custom-test-data.rst b/docs/tutorials/generate-custom-test-data.rst index 33abefdcd697..191026ae281d 100644 --- a/docs/tutorials/generate-custom-test-data.rst +++ b/docs/tutorials/generate-custom-test-data.rst @@ -47,7 +47,8 @@ Required Arguments Name Default Description ========================================== ======= =========================================================================================== ``schema=`` Path to an Avro schema file. Requires the ``format``, ``topic``, and ``key`` options. -``format=`` json Format of generated records: one of ``avro``, ``json``, or ``delimited``. Case-insensitive. +``key-format=`` kafka Format of generated record keys: one of ``avro``, ``json``, ``delimited``, ``kafka``. Case-insensitive. +``value-format=`` json Format of generated record values: one of ``avro``, ``json``, ``delimited``. Case-insensitive. ``topic=`` Name of the topic that receives generated records. ``key=`` Field to use as the key for generated records. ``quickstart=`` Generate records from a preset schema: ``orders``, ``users``, or ``pageviews``. Case-insensitive. @@ -70,7 +71,8 @@ The following options apply to both the ``schema`` and ``quickstart`` options. Name Default Description ============================================ =================================================== ========================================================================================= ``bootstrap-server=:`` localhost:9092 IP address and port for the Kafka server to connect to. -``format=`` json Format of generated records: ``avro``, ``json``, or ``delimited``. Case-insensitive. Required by the ``schema`` option. +``key-format=`` kafka Format of generated record keys: ``avro``, ``json``, ``delimited`` or ``kafka``. Case-insensitive. Required by the ``schema`` option. +``value-format=`` json Format of generated record values: ``avro``, ``json``, or ``delimited``. Case-insensitive. Required by the ``schema`` option. ``topic=`` Name of the topic that receives generated records. Required by the ``schema`` option. ``key=`` Field to use as the key for generated records. Required by the ``schema`` option. ``iterations=`` 1,000,000 The maximum number of records to generate. diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java index f20e815809b2..2358bf153590 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/IntegrationTestHarness.java @@ -48,17 +48,14 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.Deserializer; @@ -72,7 +69,7 @@ import org.slf4j.LoggerFactory; @SuppressWarnings("WeakerAccess") -public class IntegrationTestHarness extends ExternalResource { +public final class IntegrationTestHarness extends ExternalResource { private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestHarness.class); private static final int DEFAULT_PARTITION_COUNT = 1; @@ -166,8 +163,11 @@ public void ensureTopics(final int partitionCount, final String... topicNames) { */ public void produceRecord(final String topicName, final String key, final String data) { try { - try (final KafkaProducer producer = - new KafkaProducer<>(producerConfig(), new StringSerializer(), new StringSerializer())) { + try (KafkaProducer producer = new KafkaProducer<>( + kafkaCluster.producerConfig(), + new StringSerializer(), + new StringSerializer()) + ) { producer.send(new ProducerRecord<>(topicName, key, data)).get(); } } catch (final Exception e) { @@ -235,9 +235,11 @@ public Map produceRows( ) { ensureTopics(topic); - try (KafkaProducer producer = - new KafkaProducer<>(producerConfig(), new StringSerializer(), valueSerializer)) { - + try (KafkaProducer producer = new KafkaProducer<>( + kafkaCluster.producerConfig(), + new StringSerializer(), + valueSerializer + )) { final Map> futures = recordsToPublish.entrySet().stream() .collect(Collectors.toMap(Entry::getKey, entry -> { final String key = entry.getKey(); @@ -272,12 +274,12 @@ public List> verifyAvailableRecords( final String topic, final int expectedCount ) { - try (final KafkaConsumer consumer = - new KafkaConsumer<>(consumerConfig(), new StringDeserializer(), new StringDeserializer())) { - consumer.subscribe(Collections.singleton(topic.toUpperCase())); - - return ConsumerTestUtil.verifyAvailableRecords(consumer, expectedCount); - } + return kafkaCluster.verifyAvailableRecords( + topic, + expectedCount, + new StringDeserializer(), + new StringDeserializer() + ); } /** @@ -298,13 +300,12 @@ public List> verifyAvailableRows( final Deserializer valueDeserializer = getDeserializer(valueFormat, schema); - try (final KafkaConsumer consumer - = new KafkaConsumer<>(consumerConfig(), new StringDeserializer(), valueDeserializer)) { - - consumer.subscribe(Collections.singleton(topic)); - - return ConsumerTestUtil.verifyAvailableRecords(consumer, expectedCount); - } + return kafkaCluster.verifyAvailableRecords( + topic, + expectedCount, + new StringDeserializer(), + valueDeserializer + ); } /** @@ -371,9 +372,11 @@ public List> verifyAvailableRows( final Deserializer valueDeserializer = getDeserializer(valueFormat, schema); - try (final KafkaConsumer consumer - = new KafkaConsumer<>(consumerConfig(), keyDeserializer, valueDeserializer)) { - + try (KafkaConsumer consumer = new KafkaConsumer<>( + kafkaCluster.consumerConfig(), + keyDeserializer, + valueDeserializer + )) { consumer.subscribe(Collections.singleton(topic)); return ConsumerTestUtil.verifyAvailableRecords(consumer, expected, timeout); @@ -442,9 +445,11 @@ public Map verifyAvailableUniqueRows( final Deserializer valueDeserializer = getDeserializer(valueFormat, schema); - try (final KafkaConsumer consumer - = new KafkaConsumer<>(consumerConfig(), keyDeserializer, valueDeserializer)) { - + try (KafkaConsumer consumer = new KafkaConsumer<>( + kafkaCluster.consumerConfig(), + keyDeserializer, + valueDeserializer + )) { consumer.subscribe(Collections.singleton(topic)); final List> consumerRecords = ConsumerTestUtil @@ -540,27 +545,6 @@ protected void after() { kafkaCluster.stop(); } - private Map clientConfig() { - return new HashMap<>(kafkaCluster.getClientProperties()); - } - - private Map producerConfig() { - final Map config = clientConfig(); - config.put(ProducerConfig.ACKS_CONFIG, "all"); - config.put(ProducerConfig.RETRIES_CONFIG, 0); - return config; - } - - Map consumerConfig() { - final Map config = clientConfig(); - config.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - // Try to keep consumer groups stable: - config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10_000); - config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30_000); - return config; - } - private Serializer getSerializer( final Format format, final PhysicalSchema schema diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/KafkaConsumerGroupClientTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/KafkaConsumerGroupClientTest.java index e0d5c3972677..1d7332732174 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/KafkaConsumerGroupClientTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/KafkaConsumerGroupClientTest.java @@ -15,7 +15,6 @@ package io.confluent.ksql.integration; - import static io.confluent.ksql.serde.Format.JSON; import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually; import static org.hamcrest.Matchers.hasItems; @@ -103,9 +102,9 @@ public void shouldListConsumerGroupsWhenTheyExist() throws InterruptedException @Test public void shouldDescribeGroup() throws InterruptedException { givenTopicExistsWithData(); - try (final KafkaConsumer c1 = createConsumer(group0)) { + try (KafkaConsumer c1 = createConsumer(group0)) { verifyDescribeGroup(1, group0, ImmutableList.of(c1)); - try (final KafkaConsumer c2 = createConsumer(group0)) { + try (KafkaConsumer c2 = createConsumer(group0)) { verifyDescribeGroup(2, group0, ImmutableList.of(c1, c2)); } } @@ -135,7 +134,7 @@ private void verifyDescribeGroup( private void verifyListsGroups(final String newGroup, final List consumerGroups) { - try(final KafkaConsumer consumer = createConsumer(newGroup)) { + try (KafkaConsumer consumer = createConsumer(newGroup)) { final Supplier> pollAndGetGroups = () -> { consumer.poll(Duration.ofMillis(1)); @@ -152,7 +151,7 @@ private void givenTopicExistsWithData() { } private KafkaConsumer createConsumer(final String group) { - final Map consumerConfigs = TEST_HARNESS.consumerConfig(); + final Map consumerConfigs = TEST_HARNESS.getKafkaCluster().consumerConfig(); consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, group); final KafkaConsumer consumer = new KafkaConsumer<>( @@ -183,8 +182,8 @@ public boolean equals(final Object o) { return false; } final ConsumerAndPartitionCount that = (ConsumerAndPartitionCount) o; - return consumerCount == that.consumerCount && - partitionCount == that.partitionCount; + return consumerCount == that.consumerCount + && partitionCount == that.partitionCount; } @Override diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java deleted file mode 100644 index ca95bf023fda..000000000000 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/AvroProducer.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - - -package io.confluent.ksql.datagen; - -import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.ksql.GenericRow; -import io.confluent.ksql.logging.processing.NoopProcessingLogContext; -import io.confluent.ksql.schema.ksql.LogicalSchema; -import io.confluent.ksql.schema.ksql.PhysicalSchema; -import io.confluent.ksql.serde.GenericRowSerDe; -import io.confluent.ksql.serde.SerdeOption; -import io.confluent.ksql.serde.avro.KsqlAvroSerdeFactory; -import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlConstants; -import io.confluent.ksql.util.KsqlException; -import org.apache.avro.Schema; -import org.apache.kafka.common.serialization.Serializer; - -public class AvroProducer extends DataGenProducer { - - private final KsqlConfig ksqlConfig; - private final SchemaRegistryClient schemaRegistryClient; - - public AvroProducer(final KsqlConfig ksqlConfig) { - if (ksqlConfig.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY) == null) { - throw new KsqlException("Schema registry url is not set."); - } - this.ksqlConfig = ksqlConfig; - this.schemaRegistryClient = new CachedSchemaRegistryClient( - ksqlConfig.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY), - 100, - ksqlConfig.originalsWithPrefix(KsqlConfig.KSQL_SCHEMA_REGISTRY_PREFIX) - ); - } - - @Override - protected Serializer getSerializer( - final Schema avroSchema, - final org.apache.kafka.connect.data.Schema kafkaSchema, - final String topicName - ) { - final PhysicalSchema physicalSchema = PhysicalSchema.from( - LogicalSchema.of(KEY_SCHEMA, kafkaSchema), - SerdeOption.none() - ); - - return GenericRowSerDe.from( - new KsqlAvroSerdeFactory(KsqlConstants.DEFAULT_AVRO_SCHEMA_FULL_NAME), - physicalSchema, - ksqlConfig, - () -> schemaRegistryClient, - "", - NoopProcessingLogContext.INSTANCE - ).serializer(); - } -} diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java index 3192ea9b7b6d..6adf47d04c5c 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGen.java @@ -60,7 +60,8 @@ static void run(final String... args) throws IOException { final Generator generator = new Generator(arguments.schemaFile, new Random()); final Properties props = getProperties(arguments); - final DataGenProducer dataProducer = new ProducerFactory().getProducer(arguments.format, props); + final DataGenProducer dataProducer = ProducerFactory + .getProducer(arguments.keyFormat, arguments.valueFormat, props); dataProducer.populateTopic( props, @@ -96,8 +97,10 @@ private static void usage() { + "schema= " + newLine + "[schemaRegistryUrl= " + "(defaults to http://localhost:8081)] " + newLine - + "format= (case-insensitive; one of 'avro', 'json', or " + + "key-format= (case-insensitive; one of 'avro', 'json', 'kafka' or " + "'delimited') " + newLine + + "value-format= (case-insensitive; one of 'avro', 'json' or " + + "'delimited') " + newLine + "topic= " + newLine + "key= " + newLine + "[iterations= (defaults to 1,000,000)] " + newLine @@ -111,7 +114,8 @@ static class Arguments { private final boolean help; private final String bootstrapServer; private final InputStream schemaFile; - private final Format format; + private final Format keyFormat; + private final Format valueFormat; private final String topicName; private final String keyName; private final int iterations; @@ -119,11 +123,13 @@ static class Arguments { private final String schemaRegistryUrl; private final InputStream propertiesFile; + // CHECKSTYLE_RULES.OFF: ParameterNumberCheck Arguments( final boolean help, final String bootstrapServer, final InputStream schemaFile, - final Format format, + final Format keyFormat, + final Format valueFormat, final String topicName, final String keyName, final int iterations, @@ -131,10 +137,12 @@ static class Arguments { final String schemaRegistryUrl, final InputStream propertiesFile ) { + // CHECKSTYLE_RULES.ON: ParameterNumberCheck this.help = help; this.bootstrapServer = bootstrapServer; this.schemaFile = schemaFile; - this.format = format; + this.keyFormat = keyFormat; + this.valueFormat = valueFormat; this.topicName = topicName; this.keyName = keyName; this.iterations = iterations; @@ -157,7 +165,10 @@ private static final class Builder { .put("quickstart", (builder, argVal) -> builder.quickstart = parseQuickStart(argVal)) .put("bootstrap-server", (builder, argVal) -> builder.bootstrapServer = argVal) .put("schema", (builder, argVal) -> builder.schemaFile = toFileInputStream(argVal)) - .put("format", (builder, argVal) -> builder.format = parseFormat(argVal)) + .put("key-format", (builder, arg) -> builder.keyFormat = parseFormat(arg)) + .put("value-format", (builder, arg) -> builder.valueFormat = parseFormat(arg)) + // "format" is maintained for backwards compatibility, but should be removed later. + .put("format", (builder, argVal) -> builder.valueFormat = parseFormat(argVal)) .put("topic", (builder, argVal) -> builder.topicName = argVal) .put("key", (builder, argVal) -> builder.keyName = argVal) .put("iterations", (builder, argVal) -> builder.iterations = parseIterations(argVal)) @@ -173,7 +184,8 @@ private static final class Builder { private boolean help; private String bootstrapServer; private InputStream schemaFile; - private Format format; + private Format keyFormat; + private Format valueFormat; private String topicName; private String keyName; private int iterations; @@ -186,7 +198,8 @@ private Builder() { help = false; bootstrapServer = "localhost:9092"; schemaFile = null; - format = null; + keyFormat = Format.KAFKA; + valueFormat = null; topicName = null; keyName = null; iterations = 1000000; @@ -227,27 +240,32 @@ public String getKeyName() { return keyName; } - public Format getFormat() { - return Format.JSON; + public Format getKeyFormat() { + return Format.KAFKA; } + public Format getValueFormat() { + return Format.JSON; + } } Arguments build() { if (help) { - return new Arguments(true, null, null, null, null, null, 0, -1, null, null); + return new Arguments(true, null, null, null, null,null, null, 0, -1, null, null); } if (quickstart != null) { schemaFile = Optional.ofNullable(schemaFile).orElse(quickstart.getSchemaFile()); - format = Optional.ofNullable(format).orElse(quickstart.getFormat()); - topicName = Optional.ofNullable(topicName).orElse(quickstart.getTopicName(format)); + keyFormat = Optional.ofNullable(keyFormat).orElse(quickstart.getKeyFormat()); + valueFormat = Optional.ofNullable(valueFormat).orElse(quickstart.getValueFormat()); + topicName = Optional.ofNullable(topicName).orElse(quickstart.getTopicName(valueFormat)); keyName = Optional.ofNullable(keyName).orElse(quickstart.getKeyName()); } try { Objects.requireNonNull(schemaFile, "Schema file not provided"); - Objects.requireNonNull(format, "Message format not provided"); + Objects.requireNonNull(keyFormat, "Message key format not provided"); + Objects.requireNonNull(valueFormat, "Message value format not provided"); Objects.requireNonNull(topicName, "Kafka topic name not provided"); Objects.requireNonNull(keyName, "Name of key column not provided"); } catch (final NullPointerException exception) { @@ -257,7 +275,8 @@ Arguments build() { help, bootstrapServer, schemaFile, - format, + keyFormat, + valueFormat, topicName, keyName, iterations, @@ -349,7 +368,7 @@ private static Format parseFormat(final String formatString) { return Format.valueOf(formatString.toUpperCase()); } catch (final IllegalArgumentException exception) { throw new ArgumentParseException(String.format( - "Invalid format in '%s'; was expecting one of AVRO, JSON, or DELIMITED " + "Invalid format in '%s'; was expecting one of AVRO, JSON, KAFKA or DELIMITED " + "(case-insensitive)", formatString )); diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java index 40a777f1ce33..0e1e07260964 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DataGenProducer.java @@ -15,31 +15,57 @@ package io.confluent.ksql.datagen; +import static java.util.Objects.requireNonNull; + import io.confluent.avro.random.generator.Generator; import io.confluent.connect.avro.AvroData; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.logging.processing.NoopProcessingLogContext; +import io.confluent.ksql.schema.ksql.LogicalSchema; +import io.confluent.ksql.schema.ksql.PersistenceSchema; +import io.confluent.ksql.schema.ksql.PhysicalSchema; +import io.confluent.ksql.serde.GenericRowSerDe; +import io.confluent.ksql.serde.KsqlSerdeFactory; +import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.SchemaUtil; import java.util.Objects; import java.util.Properties; +import java.util.function.Supplier; import org.apache.avro.Schema; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.SchemaBuilder; -public abstract class DataGenProducer { +public class DataGenProducer { - static final org.apache.kafka.connect.data.Schema KEY_SCHEMA = SchemaBuilder.struct() + static final ConnectSchema KEY_SCHEMA = (ConnectSchema) SchemaBuilder.struct() .field(SchemaUtil.ROWKEY_NAME, org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA) .build(); // Max 100 ms between messsages. public static final long INTER_MESSAGE_MAX_INTERVAL = 500; + private final KsqlSerdeFactory keySerializerFactory; + private final KsqlSerdeFactory valueSerializerFactory; + private final Supplier srClientSupplier; + + public DataGenProducer( + final KsqlSerdeFactory keySerializerFactory, + final KsqlSerdeFactory valueSerializerFactory, + final Supplier srClientSupplier + ) { + this.keySerializerFactory = requireNonNull(keySerializerFactory, "keySerializerFactory"); + this.valueSerializerFactory = requireNonNull(valueSerializerFactory, "valueSerializerFactory"); + this.srClientSupplier = requireNonNull(srClientSupplier, "srClientSupplier"); + } + public void populateTopic( final Properties props, final Generator generator, @@ -57,12 +83,21 @@ public void populateTopic( final org.apache.kafka.connect.data.Schema ksqlSchema = DataGenSchemaUtil.getOptionalSchema(avroData.toConnectSchema(avroSchema)); - final Serializer serializer = getSerializer(avroSchema, ksqlSchema, kafkaTopicName); + final PhysicalSchema physicalSchema = PhysicalSchema.from( + LogicalSchema.of(KEY_SCHEMA, ksqlSchema), + SerdeOption.none() + ); + + final KsqlConfig ksqConfig = new KsqlConfig(props); + + final Serializer keySerializer = getKeySerializer(ksqConfig); + + final Serializer valueSerializer = getValueSerializer(physicalSchema, ksqConfig); final KafkaProducer producer = new KafkaProducer<>( props, - new StringSerializer(), - serializer + keySerializer, + valueSerializer ); final SessionManager sessionManager = new SessionManager(); @@ -96,6 +131,31 @@ public void populateTopic( producer.close(); } + @SuppressWarnings("unchecked") + private Serializer getKeySerializer(final KsqlConfig ksqConfig) { + final PersistenceSchema schema = keySerializerFactory.getFormat().supportsUnwrapping() + ? PersistenceSchema.of((ConnectSchema) KEY_SCHEMA.fields().get(0).schema()) + : PersistenceSchema.of(KEY_SCHEMA); + + return (Serializer) (Serializer)keySerializerFactory + .createSerde(schema, ksqConfig, srClientSupplier) + .serializer(); + } + + private Serializer getValueSerializer( + final PhysicalSchema physicalSchema, + final KsqlConfig ksqConfig + ) { + return GenericRowSerDe.from( + valueSerializerFactory, + physicalSchema, + ksqConfig, + srClientSupplier, + "", + NoopProcessingLogContext.INSTANCE + ).serializer(); + } + private static class ErrorLoggingCallback implements Callback { private final String topic; @@ -122,10 +182,4 @@ public void onCompletion(final RecordMetadata metadata, final Exception e) { } } } - - protected abstract Serializer getSerializer( - Schema avroSchema, - org.apache.kafka.connect.data.Schema kafkaSchema, - String topicName - ); } diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DelimitedProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/DelimitedProducer.java deleted file mode 100644 index 5d2fc6972961..000000000000 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/DelimitedProducer.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.datagen; - -import com.google.common.collect.ImmutableMap; -import io.confluent.ksql.GenericRow; -import io.confluent.ksql.logging.processing.NoopProcessingLogContext; -import io.confluent.ksql.schema.ksql.LogicalSchema; -import io.confluent.ksql.schema.ksql.PhysicalSchema; -import io.confluent.ksql.serde.GenericRowSerDe; -import io.confluent.ksql.serde.SerdeOption; -import io.confluent.ksql.serde.delimited.KsqlDelimitedSerdeFactory; -import io.confluent.ksql.util.KsqlConfig; -import org.apache.avro.Schema; -import org.apache.kafka.common.serialization.Serializer; - -public class DelimitedProducer extends DataGenProducer { - - @Override - protected Serializer getSerializer( - final Schema avroSchema, - final org.apache.kafka.connect.data.Schema kafkaSchema, - final String topicName - ) { - final PhysicalSchema physicalSchema = PhysicalSchema.from( - LogicalSchema.of(KEY_SCHEMA, kafkaSchema), - SerdeOption.none() - ); - - return GenericRowSerDe.from( - new KsqlDelimitedSerdeFactory(), - physicalSchema, - new KsqlConfig(ImmutableMap.of()), - () -> null, - "", - NoopProcessingLogContext.INSTANCE - ).serializer(); - } -} diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/JsonProducer.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/JsonProducer.java deleted file mode 100644 index cf26d1e59ba4..000000000000 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/JsonProducer.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.datagen; - -import com.google.common.collect.ImmutableMap; -import io.confluent.ksql.GenericRow; -import io.confluent.ksql.logging.processing.NoopProcessingLogContext; -import io.confluent.ksql.schema.ksql.LogicalSchema; -import io.confluent.ksql.schema.ksql.PhysicalSchema; -import io.confluent.ksql.serde.GenericRowSerDe; -import io.confluent.ksql.serde.SerdeOption; -import io.confluent.ksql.serde.json.KsqlJsonSerdeFactory; -import io.confluent.ksql.util.KsqlConfig; -import org.apache.avro.Schema; -import org.apache.kafka.common.serialization.Serializer; - -public class JsonProducer extends DataGenProducer { - - @Override - protected Serializer getSerializer( - final Schema avroSchema, - final org.apache.kafka.connect.data.Schema kafkaSchema, - final String topicName - ) { - final PhysicalSchema physicalSchema = PhysicalSchema.from( - LogicalSchema.of(KEY_SCHEMA, kafkaSchema), - SerdeOption.none() - ); - - return GenericRowSerDe.from( - new KsqlJsonSerdeFactory(), - physicalSchema, - new KsqlConfig(ImmutableMap.of()), - () -> null, - "", - NoopProcessingLogContext.INSTANCE - ).serializer(); - } -} diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/ProducerFactory.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/ProducerFactory.java index 40a4d40d814b..2a12f45bb037 100644 --- a/ksql-examples/src/main/java/io/confluent/ksql/datagen/ProducerFactory.java +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/ProducerFactory.java @@ -15,26 +15,124 @@ package io.confluent.ksql.datagen; +import static java.util.Objects.requireNonNull; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.schema.ksql.PersistenceSchema; import io.confluent.ksql.serde.Format; +import io.confluent.ksql.serde.KsqlSerdeFactories; +import io.confluent.ksql.serde.KsqlSerdeFactory; import io.confluent.ksql.util.KsqlConfig; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Properties; +import java.util.function.Supplier; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.connect.data.ConnectSchema; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; + +final class ProducerFactory { + + private static final KsqlSerdeFactories SERDE_FACTORIES = new KsqlSerdeFactories(); + + private ProducerFactory() { + } + + static DataGenProducer getProducer( + final Format keyFormat, + final Format valueFormat, + final Properties props + ) { + final KsqlSerdeFactory keyFactory = createSerdeFactory(keyFormat); + final KsqlSerdeFactory valueFactory = createSerdeFactory(valueFormat); + + final Optional srClient = SchemaRegistryClientFactory + .getSrClient(keyFormat, valueFormat, props); + + return new DataGenProducer(keyFactory, valueFactory, srClient::get); + } + + private static KsqlSerdeFactory createSerdeFactory(final Format format) { + final KsqlSerdeFactory structSerdeFactory = SERDE_FACTORIES.create(format, Optional.empty()); + + if (format.supportsUnwrapping()) { + return structSerdeFactory; + } + + return new UnwrappedSerdeFactory(structSerdeFactory); + } + + private static class UnwrappedSerdeFactory implements KsqlSerdeFactory { + + private final KsqlSerdeFactory delegate; -class ProducerFactory { - DataGenProducer getProducer(final Format format, - final Properties props) { - switch (format) { - case AVRO: - return new AvroProducer(new KsqlConfig(props)); + UnwrappedSerdeFactory(final KsqlSerdeFactory delegate) { + this.delegate = Objects.requireNonNull(delegate, "delegate"); + } + + @Override + public Format getFormat() { + return delegate.getFormat(); + } + + @Override + public void validate(final PersistenceSchema schema) { + delegate.validate(schema); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public Serde createSerde( + final PersistenceSchema schema, + final KsqlConfig ksqlConfig, + final Supplier schemaRegistryClientFactory + ) { + final Serde serde = delegate + .createSerde(schema, ksqlConfig, schemaRegistryClientFactory); + + final Serializer serializer = serde.serializer(); + return Serdes.serdeFrom( + new WrappedSerializer((Serializer)serializer, schema.getConnectSchema()), + serde.deserializer() + ); + } + } + + private static class WrappedSerializer implements Serializer { + + private final Serializer inner; + private final ConnectSchema schema; + private final Field field; + + WrappedSerializer(final Serializer inner, final ConnectSchema schema) { + this.inner = requireNonNull(inner, "inner"); + this.schema = requireNonNull(schema, "schema"); + this.field = schema.fields().get(0); + + if (schema.fields().size() != 1) { + throw new IllegalArgumentException("Expecting only single field"); + } + } + + @Override + public void configure(final Map configs, final boolean isKey) { + inner.configure(configs, isKey); + } - case JSON: - return new JsonProducer(); + @Override + public byte[] serialize(final String topic, final Object data) { + if (data == null) { + return inner.serialize(topic, null); + } - case DELIMITED: - return new DelimitedProducer(); + final Struct struct = new Struct(schema); + struct.put(field, data); - default: - throw new IllegalArgumentException("Invalid format in '" + format - + "'; was expecting one of AVRO, JSON, or DELIMITED%n"); + return inner.serialize(topic, struct); } } } diff --git a/ksql-examples/src/main/java/io/confluent/ksql/datagen/SchemaRegistryClientFactory.java b/ksql-examples/src/main/java/io/confluent/ksql/datagen/SchemaRegistryClientFactory.java new file mode 100644 index 000000000000..1ecb6c3e2f52 --- /dev/null +++ b/ksql-examples/src/main/java/io/confluent/ksql/datagen/SchemaRegistryClientFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.datagen; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.ksql.serde.Format; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; +import java.util.Optional; +import java.util.Properties; + +final class SchemaRegistryClientFactory { + + private SchemaRegistryClientFactory() { + } + + static Optional getSrClient( + final Format keyFormat, + final Format valueFormat, + final Properties props + ) { + if (keyFormat != Format.AVRO && valueFormat != Format.AVRO) { + return Optional.empty(); + } + + final KsqlConfig ksqlConfig = new KsqlConfig(props); + if (ksqlConfig.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY) == null) { + throw new KsqlException("Schema registry url is not set."); + } + + return Optional.of(new CachedSchemaRegistryClient( + ksqlConfig.getString(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY), + 100, + ksqlConfig.originalsWithPrefix(KsqlConfig.KSQL_SCHEMA_REGISTRY_PREFIX) + )); + } + +} diff --git a/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenFunctionalTest.java b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenFunctionalTest.java new file mode 100644 index 000000000000..3e7b8cab46ae --- /dev/null +++ b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenFunctionalTest.java @@ -0,0 +1,197 @@ +/* + * Copyright 2019 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.datagen; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.startsWith; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.confluent.common.utils.IntegrationTest; +import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster; +import io.confluent.ksql.test.util.TopicTestUtil; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({IntegrationTest.class}) +public class DataGenFunctionalTest { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final Deserializer BYTE_DESERIALIZER = new ByteArrayDeserializer(); + private static final Deserializer KAFKA_STRING_DESERIALIZER = new StringDeserializer(); + + private static final int DEFAULT_MESSAGE_COUNT = 5; + + private static final Map DEFAULT_ARGS = ImmutableMap.of( + "quickstart", "users", + "key-format", "kafka", + "value-format", "json", + "maxInterval", "1", + "iterations", "" + DEFAULT_MESSAGE_COUNT + ); + + @ClassRule + public static final EmbeddedSingleNodeKafkaCluster CLUSTER = + EmbeddedSingleNodeKafkaCluster.build(); + + private String topicName; + + @Before + public void setUp() { + topicName = TopicTestUtil.uniqueTopicName(); + CLUSTER.createTopic(topicName, 1, 1); + } + + @Test + public void shouldWorkWithoutAnyFormatSupplied() throws Exception { + // Given: + final Map args = new HashMap<>(DEFAULT_ARGS); + args.remove("key-format"); + args.remove("value-format"); + + // When: + runWithExactArgs(args); + + // Then: + final List> records = CLUSTER + .verifyAvailableRecords( + topicName, + DEFAULT_MESSAGE_COUNT, + KAFKA_STRING_DESERIALIZER, + KAFKA_STRING_DESERIALIZER + ); + + assertKafkaKeys(records); + assertJsonValues(records); + } + + @Test + public void shouldProduceDataWithKafkaFormatKeys() throws Exception { + // When: + runWithArgOverrides(ImmutableMap.of( + "key-format", "kafka" + )); + + // Then: + final List> records = CLUSTER + .verifyAvailableRecords( + topicName, + DEFAULT_MESSAGE_COUNT, + KAFKA_STRING_DESERIALIZER, + BYTE_DESERIALIZER + ); + + assertKafkaKeys(records); + } + + @Test + public void shouldProduceDataWithJsonFormatKeys() throws Exception { + // When: + runWithArgOverrides(ImmutableMap.of( + "key-format", "json" + )); + + // Then: + final List> records = CLUSTER + .verifyAvailableRecords( + topicName, + DEFAULT_MESSAGE_COUNT, + KAFKA_STRING_DESERIALIZER, + BYTE_DESERIALIZER + ); + + assertJsonKeys(records); + } + + @Test + public void shouldProduceDataWithKJsonFormatValues() throws Exception { + // When: + runWithArgOverrides(ImmutableMap.of( + "value-format", "json" + )); + + // Then: + final List> records = CLUSTER + .verifyAvailableRecords( + topicName, + DEFAULT_MESSAGE_COUNT, + BYTE_DESERIALIZER, + KAFKA_STRING_DESERIALIZER + ); + + assertJsonValues(records); + } + + private static void assertKafkaKeys( + final List> records + ) { + records.forEach(r -> { + assertThat(r.key(), startsWith("User_")); + }); + } + + private static void assertJsonKeys( + final List> records + ) { + records.forEach(r -> { + assertThat(r.key(), startsWith("\"User_")); + }); + } + + private static void assertJsonValues( + final List> records + ) throws IOException { + for (final ConsumerRecord r : records) { + final Map value = OBJECT_MAPPER.readValue(r.value(), Map.class); + assertThat(value.keySet(), containsInAnyOrder( + "registertime", + "userid", + "regionid", + "gender" + )); + } + } + + private void runWithArgOverrides(final Map additionalArgs) throws IOException { + final Map args = new HashMap<>(DEFAULT_ARGS); + args.putAll(additionalArgs); + + runWithExactArgs(args); + } + + private void runWithExactArgs(final Map args) throws IOException { + args.put("topic", topicName); + args.put("bootstrap-server", CLUSTER.bootstrapServers()); + + final String[] argArray = args.entrySet().stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .toArray(String[]::new); + + DataGen.run(argArray); + } +} diff --git a/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java index 0e8efb80e401..4a7f0493d5a2 100644 --- a/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java +++ b/ksql-examples/src/test/java/io/confluent/ksql/datagen/DataGenTest.java @@ -19,14 +19,12 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import io.confluent.ksql.util.KsqlConfig; +import java.util.Properties; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import java.util.Properties; - -import io.confluent.ksql.util.KsqlConfig; - public class DataGenTest { @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -71,9 +69,21 @@ public void shouldThrowOnUnknownQuickStart() throws Exception { @Test public void shouldPassSchemaRegistryUrl() throws Exception { - DataGen.Arguments args = new DataGen.Arguments( - false, "bootstrap", null, null, "topic", "key", 0, 0L, "srUrl", null); - Properties props = DataGen.getProperties(args); + final DataGen.Arguments args = new DataGen.Arguments( + false, + "bootstrap", + null, + null, + null, + "topic", + "key", + 0, + 0L, + "srUrl", + null + ); + + final Properties props = DataGen.getProperties(args); assertThat(props.getProperty(KsqlConfig.SCHEMA_REGISTRY_URL_PROPERTY), equalTo("srUrl")); } } \ No newline at end of file diff --git a/ksql-examples/src/test/java/io/confluent/ksql/datagen/JsonProducerTest.java b/ksql-examples/src/test/java/io/confluent/ksql/datagen/JsonProducerTest.java deleted file mode 100644 index c2f0f1e645df..000000000000 --- a/ksql-examples/src/test/java/io/confluent/ksql/datagen/JsonProducerTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.datagen; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; - -import io.confluent.ksql.GenericRow; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class JsonProducerTest { - - private static final String TOPIC_NAME = "some-topic"; - - private static final org.apache.kafka.connect.data.Schema KSQL_SCHEMA = SchemaBuilder - .struct() - .field("f0", Schema.OPTIONAL_INT32_SCHEMA) - .field("f1", Schema.OPTIONAL_INT64_SCHEMA) - .field("f2", Schema.OPTIONAL_INT64_SCHEMA) - .field("f3", Schema.OPTIONAL_FLOAT64_SCHEMA) - .field("f4", Schema.OPTIONAL_STRING_SCHEMA) - .build(); - - @Mock - private org.apache.avro.Schema avroSchema; - - private Serializer serializer; - - @Before - public void setUp() { - serializer = new JsonProducer().getSerializer(avroSchema, KSQL_SCHEMA, TOPIC_NAME); - } - - @Test - public void shouldSerializeNull() { - assertThat(serializer.serialize("topic", null), is(nullValue())); - } - - @Test - public void shouldSerializeRow() { - // Given: - final GenericRow row = new GenericRow(0, 1L, null, 3.0, "four"); - - // When: - final byte[] result = serializer.serialize("topic", row); - - // Then: - assertThat(result, is(new byte[]{ - 123, 34, 102, 48, 34, 58, 48, 44, - 34, 102, 49, 34, 58, 49, 44, 34, - 102, 50, 34, 58, 110, 117, 108, 108, - 44, 34, 102, 51, 34, 58, 51, 46, - 48, 44, 34, 102, 52, 34, 58, 34, - 102, 111, 117, 114, 34, 125 - })); - } -} \ No newline at end of file diff --git a/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java b/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java index 6214f1c0f3e9..37017a01f9c6 100644 --- a/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java +++ b/ksql-test-util/src/main/java/io/confluent/ksql/test/util/EmbeddedSingleNodeKafkaCluster.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.security.auth.login.Configuration; @@ -45,6 +46,9 @@ import kafka.security.auth.SimpleAclAuthorizer; import kafka.server.KafkaConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.SslConfigs; @@ -55,6 +59,8 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.test.TestUtils; import org.junit.rules.ExternalResource; import org.junit.rules.TemporaryFolder; @@ -193,6 +199,33 @@ public Map getClientProperties() { return Collections.unmodifiableMap(clientConfig); } + /** + * Common consumer properties that tests will need. + * + * @return base set of consumer properties. + */ + public Map consumerConfig() { + final Map config = new HashMap<>(getClientProperties()); + config.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + // Try to keep consumer groups stable: + config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 7_000); + config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20_000); + return config; + } + + /** + * Common producer properties that tests will need. + * + * @return base set of producer properties. + */ + public Map producerConfig() { + final Map config = new HashMap<>(getClientProperties()); + config.put(ProducerConfig.ACKS_CONFIG, "all"); + config.put(ProducerConfig.RETRIES_CONFIG, 10); + return config; + } + /** * This cluster's ZK connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. * Example: `127.0.0.1:2181`. @@ -240,6 +273,49 @@ public void createTopic( broker.createTopic(topic, partitions, replication, topicConfig); } + /** + * Verify there are {@code expectedCount} records available on the supplied {@code topic}. + * + * @param topic the name of the topic to check. + * @param expectedCount the expected number of records. + * @return the list of consumed records. + */ + public List> verifyAvailableRecords( + final String topic, + final int expectedCount + ) { + return verifyAvailableRecords( + topic, + expectedCount, + new ByteArrayDeserializer(), + new ByteArrayDeserializer() + ); + } + + /** + * Verify there are {@code expectedCount} records available on the supplied {@code topic}. + * + * @param topic the name of the topic to check. + * @param expectedCount the expected number of records. + * @return the list of consumed records. + */ + public List> verifyAvailableRecords( + final String topic, + final int expectedCount, + final Deserializer keyDeserializer, + final Deserializer valueDeserializer + ) { + try (KafkaConsumer consumer = new KafkaConsumer<>( + consumerConfig(), + keyDeserializer, + valueDeserializer) + ) { + consumer.subscribe(Collections.singleton(topic)); + + return ConsumerTestUtil.verifyAvailableRecords(consumer, expectedCount); + } + } + /** * Writes the supplied ACL information to ZK, where it will be picked up by the brokes authorizer. * From 3fdb9bf26f532382685e6de13c1805bdbd0dd350 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Wed, 24 Jul 2019 14:07:14 +0100 Subject: [PATCH 2/6] Update docs/tutorials/generate-custom-test-data.rst Co-Authored-By: Jim Galasyn --- docs/tutorials/generate-custom-test-data.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tutorials/generate-custom-test-data.rst b/docs/tutorials/generate-custom-test-data.rst index 191026ae281d..fc85f38674a5 100644 --- a/docs/tutorials/generate-custom-test-data.rst +++ b/docs/tutorials/generate-custom-test-data.rst @@ -47,7 +47,7 @@ Required Arguments Name Default Description ========================================== ======= =========================================================================================== ``schema=`` Path to an Avro schema file. Requires the ``format``, ``topic``, and ``key`` options. -``key-format=`` kafka Format of generated record keys: one of ``avro``, ``json``, ``delimited``, ``kafka``. Case-insensitive. +``key-format=`` Kafka format of generated record keys: one of ``avro``, ``json``, ``delimited``, ``kafka``. Case-insensitive. ``value-format=`` json Format of generated record values: one of ``avro``, ``json``, ``delimited``. Case-insensitive. ``topic=`` Name of the topic that receives generated records. ``key=`` Field to use as the key for generated records. From 45fdc810f5c34142704cd966a20c26d43a995b66 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Wed, 24 Jul 2019 14:07:22 +0100 Subject: [PATCH 3/6] Update docs/tutorials/generate-custom-test-data.rst Co-Authored-By: Jim Galasyn --- docs/tutorials/generate-custom-test-data.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tutorials/generate-custom-test-data.rst b/docs/tutorials/generate-custom-test-data.rst index fc85f38674a5..1306de95247c 100644 --- a/docs/tutorials/generate-custom-test-data.rst +++ b/docs/tutorials/generate-custom-test-data.rst @@ -48,7 +48,7 @@ Name Default Description ========================================== ======= =========================================================================================== ``schema=`` Path to an Avro schema file. Requires the ``format``, ``topic``, and ``key`` options. ``key-format=`` Kafka format of generated record keys: one of ``avro``, ``json``, ``delimited``, ``kafka``. Case-insensitive. -``value-format=`` json Format of generated record values: one of ``avro``, ``json``, ``delimited``. Case-insensitive. +``value-format=`` JSON format of generated record values: one of ``avro``, ``json``, ``delimited``. Case-insensitive. ``topic=`` Name of the topic that receives generated records. ``key=`` Field to use as the key for generated records. ``quickstart=`` Generate records from a preset schema: ``orders``, ``users``, or ``pageviews``. Case-insensitive. From 0c64e3bca45bb78e1e121f290f0bfb9feaa1f286 Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Wed, 24 Jul 2019 14:07:31 +0100 Subject: [PATCH 4/6] Update docs/tutorials/generate-custom-test-data.rst Co-Authored-By: Jim Galasyn --- docs/tutorials/generate-custom-test-data.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tutorials/generate-custom-test-data.rst b/docs/tutorials/generate-custom-test-data.rst index 1306de95247c..1eb1d01638e2 100644 --- a/docs/tutorials/generate-custom-test-data.rst +++ b/docs/tutorials/generate-custom-test-data.rst @@ -71,7 +71,7 @@ The following options apply to both the ``schema`` and ``quickstart`` options. Name Default Description ============================================ =================================================== ========================================================================================= ``bootstrap-server=:`` localhost:9092 IP address and port for the Kafka server to connect to. -``key-format=`` kafka Format of generated record keys: ``avro``, ``json``, ``delimited`` or ``kafka``. Case-insensitive. Required by the ``schema`` option. +``key-format=`` Kafka format of generated record keys: ``avro``, ``json``, ``delimited`` or ``kafka``. Case-insensitive. Required by the ``schema`` option. ``value-format=`` json Format of generated record values: ``avro``, ``json``, or ``delimited``. Case-insensitive. Required by the ``schema`` option. ``topic=`` Name of the topic that receives generated records. Required by the ``schema`` option. ``key=`` Field to use as the key for generated records. Required by the ``schema`` option. From 1daa1a4b1a906c3678802405b5732233999c561f Mon Sep 17 00:00:00 2001 From: Andy Coates <8012398+big-andy-coates@users.noreply.github.com> Date: Wed, 24 Jul 2019 14:07:38 +0100 Subject: [PATCH 5/6] Update docs/tutorials/generate-custom-test-data.rst Co-Authored-By: Jim Galasyn --- docs/tutorials/generate-custom-test-data.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tutorials/generate-custom-test-data.rst b/docs/tutorials/generate-custom-test-data.rst index 1eb1d01638e2..184516104eec 100644 --- a/docs/tutorials/generate-custom-test-data.rst +++ b/docs/tutorials/generate-custom-test-data.rst @@ -72,7 +72,7 @@ Name Default ============================================ =================================================== ========================================================================================= ``bootstrap-server=:`` localhost:9092 IP address and port for the Kafka server to connect to. ``key-format=`` Kafka format of generated record keys: ``avro``, ``json``, ``delimited`` or ``kafka``. Case-insensitive. Required by the ``schema`` option. -``value-format=`` json Format of generated record values: ``avro``, ``json``, or ``delimited``. Case-insensitive. Required by the ``schema`` option. +``value-format=`` JSON format of generated record values: ``avro``, ``json``, or ``delimited``. Case-insensitive. Required by the ``schema`` option. ``topic=`` Name of the topic that receives generated records. Required by the ``schema`` option. ``key=`` Field to use as the key for generated records. Required by the ``schema`` option. ``iterations=`` 1,000,000 The maximum number of records to generate. From a5d2558dcd14d8644d64f2c5ec9a7557ae7fe971 Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 6 Aug 2019 16:16:51 +0100 Subject: [PATCH 6/6] docs: upddate to use value_format --- ksql-clickstream-demo/non-docker-clickstream.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ksql-clickstream-demo/non-docker-clickstream.md b/ksql-clickstream-demo/non-docker-clickstream.md index 31c2fe8f9964..5767fc520718 100644 --- a/ksql-clickstream-demo/non-docker-clickstream.md +++ b/ksql-clickstream-demo/non-docker-clickstream.md @@ -64,7 +64,7 @@ These steps will guide you through how to setup your environment and run the cli **Tip:** Because of shell redirection, this command does not print a newline and so it might look like it's still in the foreground. The process is running as a daemon, so just press return again to see the shell prompt. ```bash - $ bin/ksql-datagen -daemon quickstart=clickstream format=json topic=clickstream maxInterval=100 iterations=500000 + $ bin/ksql-datagen -daemon quickstart=clickstream value_format=json topic=clickstream maxInterval=100 iterations=500000 ``` Your output should resemble: @@ -76,7 +76,7 @@ These steps will guide you through how to setup your environment and run the cli 1. From your terminal, create the status codes using the ksql-datagen utility. This stream runs once to populate the table. ```bash - $ bin/ksql-datagen quickstart=clickstream_codes format=json topic=clickstream_codes maxInterval=20 iterations=100 + $ bin/ksql-datagen quickstart=clickstream_codes value_format=json topic=clickstream_codes maxInterval=20 iterations=100 ``` Your output should resemble: @@ -92,7 +92,7 @@ These steps will guide you through how to setup your environment and run the cli 1. From your terminal, create a set of users using ksql-datagen utility. This stream runs once to populate the table. ```bash - $ bin/ksql-datagen quickstart=clickstream_users format=json topic=clickstream_users maxInterval=10 iterations=1000 + $ bin/ksql-datagen quickstart=clickstream_users value_format=json topic=clickstream_users maxInterval=10 iterations=1000 ``` Your output should resemble: