Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(data-gen): support KAFKA format in DataGen #3120

Merged
merged 8 commits into from
Aug 6, 2019
6 changes: 4 additions & 2 deletions docs/tutorials/generate-custom-test-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ Required Arguments
Name Default Description
========================================== ======= ===========================================================================================
``schema=<avro schema file>`` Path to an Avro schema file. Requires the ``format``, ``topic``, and ``key`` options.
``format=<record format>`` json Format of generated records: one of ``avro``, ``json``, or ``delimited``. Case-insensitive.
``key-format=<key format>`` Kafka format of generated record keys: one of ``avro``, ``json``, ``delimited``, ``kafka``. Case-insensitive.
``value-format=<value format>`` JSON format of generated record values: one of ``avro``, ``json``, ``delimited``. Case-insensitive.
``topic=<kafka topic name>`` Name of the topic that receives generated records.
``key=<name of key column>`` Field to use as the key for generated records.
``quickstart=<quickstart preset>`` Generate records from a preset schema: ``orders``, ``users``, or ``pageviews``. Case-insensitive.
Expand All @@ -70,7 +71,8 @@ The following options apply to both the ``schema`` and ``quickstart`` options.
Name Default Description
============================================ =================================================== =========================================================================================
``bootstrap-server=<kafka-server>:<port>`` localhost:9092 IP address and port for the Kafka server to connect to.
``format=<record format>`` json Format of generated records: ``avro``, ``json``, or ``delimited``. Case-insensitive. Required by the ``schema`` option.
``key-format=<key format>`` Kafka format of generated record keys: ``avro``, ``json``, ``delimited`` or ``kafka``. Case-insensitive. Required by the ``schema`` option.
``value-format=<value format>`` JSON format of generated record values: ``avro``, ``json``, or ``delimited``. Case-insensitive. Required by the ``schema`` option.
``topic=<kafka topic name>`` Name of the topic that receives generated records. Required by the ``schema`` option.
``key=<name of key column>`` Field to use as the key for generated records. Required by the ``schema`` option.
``iterations=<number of records>`` 1,000,000 The maximum number of records to generate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@

package io.confluent.ksql.benchmark;

import static io.confluent.ksql.datagen.DataGenSchemaUtil.getOptionalSchema;

import io.confluent.avro.random.generator.Generator;
import io.confluent.connect.avro.AvroData;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.datagen.RowGenerator;
import io.confluent.ksql.datagen.SessionManager;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
import io.confluent.ksql.serde.Format;
Expand All @@ -40,14 +36,14 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -95,20 +91,14 @@ public static class SchemaAndGenericRowState {
public void setUp() throws Exception {
final Generator generator = new Generator(getSchemaStream(), new Random());

final Schema avroSchema = generator.schema();
final AvroData avroData = new AvroData(1);
final org.apache.kafka.connect.data.Schema ksqlSchema =
getOptionalSchema(avroData.toConnectSchema(avroSchema));

// choose arbitrary key
final String key = ksqlSchema.fields().get(0).name();
final String key = generator.schema().getFields().get(0).name();

final RowGenerator rowGenerator = new RowGenerator(generator, key);

final SessionManager sessionManager = new SessionManager();
final RowGenerator rowGenerator =
new RowGenerator(generator, avroData, avroSchema, ksqlSchema, sessionManager, key);
final Pair<String, GenericRow> genericRowPair = rowGenerator.generateRow();
final Pair<Struct, GenericRow> genericRowPair = rowGenerator.generateRow();
row = genericRowPair.getRight();
schema = ksqlSchema;
schema = rowGenerator.schema().valueSchema();
}

private InputStream getSchemaStream() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.github.rvesse.airline.annotations.restrictions.Once;
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.github.rvesse.airline.annotations.restrictions.ranges.LongRange;

import io.confluent.ksql.cli.console.OutputFormat;
import io.confluent.ksql.rest.util.OptionsParser;
import io.confluent.ksql.util.Pair;
Expand Down
8 changes: 4 additions & 4 deletions ksql-clickstream-demo/non-docker-clickstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ These steps will guide you through how to setup your environment and run the cli
**Tip:** Because of shell redirection, this command does not print a newline and so it might look like it's still in the foreground. The process is running as a daemon, so just press return again to see the shell prompt.

```bash
$ <path-to-ksql>bin/ksql-datagen -daemon quickstart=clickstream format=json topic=clickstream maxInterval=100 iterations=500000
$ <path-to-ksql>bin/ksql-datagen -daemon quickstart=clickstream value_format=json topic=clickstream maxInterval=100 iterations=500000
```

Your output should resemble:
Expand All @@ -76,7 +76,7 @@ These steps will guide you through how to setup your environment and run the cli
1. From your terminal, create the status codes using the ksql-datagen utility. This stream runs once to populate the table.

```bash
$ <path-to-ksql>bin/ksql-datagen quickstart=clickstream_codes format=json topic=clickstream_codes maxInterval=20 iterations=100
$ <path-to-ksql>bin/ksql-datagen quickstart=clickstream_codes value_format=json topic=clickstream_codes maxInterval=20 iterations=100
```

Your output should resemble:
Expand All @@ -92,7 +92,7 @@ These steps will guide you through how to setup your environment and run the cli
1. From your terminal, create a set of users using ksql-datagen utility. This stream runs once to populate the table.

```bash
$ <path-to-ksql>bin/ksql-datagen quickstart=clickstream_users format=json topic=clickstream_users maxInterval=10 iterations=1000
$ <path-to-ksql>bin/ksql-datagen quickstart=clickstream_users value_format=json topic=clickstream_users maxInterval=10 iterations=1000
```

Your output should resemble:
Expand All @@ -107,7 +107,7 @@ These steps will guide you through how to setup your environment and run the cli
1. Launch the KSQL CLI in local mode.

```bash
$ <path-to-ksql>bin/ksql-cli local
$ <path-to-ksql>bin/ksql
```

You should see the KSQL CLI welcome screen.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,14 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -71,7 +68,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings("WeakerAccess")
public class IntegrationTestHarness extends ExternalResource {
public final class IntegrationTestHarness extends ExternalResource {

private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestHarness.class);
private static final int DEFAULT_PARTITION_COUNT = 1;
Expand Down Expand Up @@ -164,8 +161,11 @@ public void ensureTopics(final int partitionCount, final String... topicNames) {
*/
public void produceRecord(final String topicName, final String key, final String data) {
try {
try (final KafkaProducer<String, String> producer =
new KafkaProducer<>(producerConfig(), new StringSerializer(), new StringSerializer())) {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
kafkaCluster.producerConfig(),
new StringSerializer(),
new StringSerializer())
) {
producer.send(new ProducerRecord<>(topicName, key, data)).get();
}
} catch (final Exception e) {
Expand Down Expand Up @@ -233,9 +233,11 @@ public Map<String, RecordMetadata> produceRows(
) {
ensureTopics(topic);

try (KafkaProducer<String, GenericRow> producer =
new KafkaProducer<>(producerConfig(), new StringSerializer(), valueSerializer)) {

try (KafkaProducer<String, GenericRow> producer = new KafkaProducer<>(
kafkaCluster.producerConfig(),
new StringSerializer(),
valueSerializer
)) {
final Map<String, Future<RecordMetadata>> futures = recordsToPublish.entrySet().stream()
.collect(Collectors.toMap(Entry::getKey, entry -> {
final String key = entry.getKey();
Expand Down Expand Up @@ -270,12 +272,12 @@ public List<ConsumerRecord<String, String>> verifyAvailableRecords(
final String topic,
final int expectedCount
) {
try (final KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(consumerConfig(), new StringDeserializer(), new StringDeserializer())) {
consumer.subscribe(Collections.singleton(topic.toUpperCase()));

return ConsumerTestUtil.verifyAvailableRecords(consumer, expectedCount);
}
return kafkaCluster.verifyAvailableRecords(
topic,
expectedCount,
new StringDeserializer(),
new StringDeserializer()
);
}

/**
Expand All @@ -296,13 +298,12 @@ public List<ConsumerRecord<String, GenericRow>> verifyAvailableRows(
final Deserializer<GenericRow> valueDeserializer =
getDeserializer(valueFormat, schema);

try (final KafkaConsumer<String, GenericRow> consumer
= new KafkaConsumer<>(consumerConfig(), new StringDeserializer(), valueDeserializer)) {

consumer.subscribe(Collections.singleton(topic));

return ConsumerTestUtil.verifyAvailableRecords(consumer, expectedCount);
}
return kafkaCluster.verifyAvailableRecords(
topic,
expectedCount,
new StringDeserializer(),
valueDeserializer
);
}

/**
Expand Down Expand Up @@ -369,9 +370,11 @@ public <K> List<ConsumerRecord<K, GenericRow>> verifyAvailableRows(
final Deserializer<GenericRow> valueDeserializer =
getDeserializer(valueFormat, schema);

try (final KafkaConsumer<K, GenericRow> consumer
= new KafkaConsumer<>(consumerConfig(), keyDeserializer, valueDeserializer)) {

try (KafkaConsumer<K, GenericRow> consumer = new KafkaConsumer<>(
kafkaCluster.consumerConfig(),
keyDeserializer,
valueDeserializer
)) {
consumer.subscribe(Collections.singleton(topic));

return ConsumerTestUtil.verifyAvailableRecords(consumer, expected, timeout);
Expand Down Expand Up @@ -440,9 +443,11 @@ public <K> Map<K, GenericRow> verifyAvailableUniqueRows(
final Deserializer<GenericRow> valueDeserializer =
getDeserializer(valueFormat, schema);

try (final KafkaConsumer<K, GenericRow> consumer
= new KafkaConsumer<>(consumerConfig(), keyDeserializer, valueDeserializer)) {

try (KafkaConsumer<K, GenericRow> consumer = new KafkaConsumer<>(
kafkaCluster.consumerConfig(),
keyDeserializer,
valueDeserializer
)) {
consumer.subscribe(Collections.singleton(topic));

final List<ConsumerRecord<K, GenericRow>> consumerRecords = ConsumerTestUtil
Expand Down Expand Up @@ -538,27 +543,6 @@ protected void after() {
kafkaCluster.stop();
}

private Map<String, Object> clientConfig() {
return new HashMap<>(kafkaCluster.getClientProperties());
}

private Map<String, Object> producerConfig() {
final Map<String, Object> config = clientConfig();
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 0);
return config;
}

Map<String, Object> consumerConfig() {
final Map<String, Object> config = clientConfig();
config.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Try to keep consumer groups stable:
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10_000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30_000);
return config;
}

private Serializer<GenericRow> getSerializer(
final Format format,
final PhysicalSchema schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.integration;


import static io.confluent.ksql.serde.Format.JSON;
import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
import static org.hamcrest.Matchers.hasItems;
Expand Down Expand Up @@ -103,9 +102,9 @@ public void shouldListConsumerGroupsWhenTheyExist() throws InterruptedException
@Test
public void shouldDescribeGroup() throws InterruptedException {
givenTopicExistsWithData();
try (final KafkaConsumer<String, byte[]> c1 = createConsumer(group0)) {
try (KafkaConsumer<String, byte[]> c1 = createConsumer(group0)) {
verifyDescribeGroup(1, group0, ImmutableList.of(c1));
try (final KafkaConsumer<String, byte[]> c2 = createConsumer(group0)) {
try (KafkaConsumer<String, byte[]> c2 = createConsumer(group0)) {
verifyDescribeGroup(2, group0, ImmutableList.of(c1, c2));
}
}
Expand Down Expand Up @@ -135,7 +134,7 @@ private void verifyDescribeGroup(

private void verifyListsGroups(final String newGroup, final List<String> consumerGroups) {

try(final KafkaConsumer<String, byte[]> consumer = createConsumer(newGroup)) {
try (KafkaConsumer<String, byte[]> consumer = createConsumer(newGroup)) {

final Supplier<List<String>> pollAndGetGroups = () -> {
consumer.poll(Duration.ofMillis(1));
Expand All @@ -152,7 +151,7 @@ private void givenTopicExistsWithData() {
}

private KafkaConsumer<String, byte[]> createConsumer(final String group) {
final Map<String, Object> consumerConfigs = TEST_HARNESS.consumerConfig();
final Map<String, Object> consumerConfigs = TEST_HARNESS.getKafkaCluster().consumerConfig();
consumerConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, group);

final KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(
Expand Down Expand Up @@ -183,8 +182,8 @@ public boolean equals(final Object o) {
return false;
}
final ConsumerAndPartitionCount that = (ConsumerAndPartitionCount) o;
return consumerCount == that.consumerCount &&
partitionCount == that.partitionCount;
return consumerCount == that.consumerCount
&& partitionCount == that.partitionCount;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down
Loading