Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Run QTT using the testing tool #3180

Merged
merged 12 commits into from
Aug 10, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public Record build(final Map<String, Topic> topics) {
);
}

public Optional<WindowData> getWindow() {
return window;
}

private Object buildValue(final Topic topic) {
if (value.asText().equals("null")) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.metastore.model.KsqlTopic;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.parser.DefaultKsqlParser;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
Expand All @@ -35,8 +36,11 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SchemaConverters;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.KeyFormat;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.test.model.WindowData.Type;
import io.confluent.ksql.test.serde.SerdeSupplier;
import io.confluent.ksql.test.tools.Record;
import io.confluent.ksql.test.tools.TestCase;
Expand All @@ -50,6 +54,7 @@
import io.confluent.ksql.util.DecimalUtil;
import io.confluent.ksql.util.KsqlConstants;
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -226,22 +231,45 @@ private Map<String, Topic> getTestCaseTopics(
throw new InvalidFieldException("statements/topics", "The test does not define any topics");
}

final SerdeSupplier defaultKeySerdeSupplier =
allTopics.values().iterator().next().getKeySerdeSupplier();

final SerdeSupplier defaultValueSerdeSupplier =
allTopics.values().iterator().next().getValueSerdeSupplier();

// Get topics from inputs and outputs fields:
Streams.concat(inputs.stream(), outputs.stream())
.map(RecordNode::topicName)
.map(topicName -> new Topic(topicName, Optional.empty(), defaultKeySerdeSupplier,
defaultValueSerdeSupplier, 4, 1, Optional.empty()))
.map(recordNode -> new Topic(
recordNode.topicName(),
Optional.empty(),
getKeySedeSupplier(recordNode.getWindow()),
defaultValueSerdeSupplier,
4,
1,
Optional.empty()))
.forEach(topic -> allTopics.putIfAbsent(topic.getName(), topic));

return allTopics;
}

private static SerdeSupplier<?> getKeySedeSupplier(final Optional<WindowData> windowDataInfo) {
if (windowDataInfo.isPresent()) {
final WindowData windowData = windowDataInfo.get();
final WindowType windowType = WindowType.of((windowData.type == Type.SESSION)
? WindowType.SESSION.name()
: WindowType.TUMBLING.name());
final KeyFormat windowKeyFormat = KeyFormat.windowed(
Format.KAFKA,
Optional.empty(),
WindowInfo.of(
windowType,
windowType == WindowType.SESSION
? Optional.empty() : Optional.of(Duration.ofMillis(windowData.size())))
);
return SerdeUtil.getKeySerdeSupplier(windowKeyFormat, () -> LogicalSchema.builder().build());
}
return SerdeUtil.getKeySerdeSupplier(
KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)),
() -> LogicalSchema.builder().build());
}

private static Topic createTopicFromStatement(
final FunctionRegistry functionRegistry,
final String sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.test.TestFrameworkException;
import io.confluent.ksql.test.serde.SerdeSupplier;
import io.confluent.ksql.test.serde.string.StringSerdeSupplier;
import io.confluent.ksql.test.tools.Topic;
import io.confluent.ksql.test.tools.exceptions.InvalidFieldException;
import io.confluent.ksql.test.utils.SerdeUtil;
Expand Down Expand Up @@ -63,10 +64,7 @@ class TopicNode {
Topic build(final String defaultFormat) {
final String formatToUse = format.replace("{FORMAT}", defaultFormat);

final SerdeSupplier<?> keySerdeSupplier = SerdeUtil.getSerdeSupplier(
Format.KAFKA,
this::logicalSchema
);
final SerdeSupplier<?> keySerdeSupplier = new StringSerdeSupplier();

final SerdeSupplier<?> valueSerdeSupplier = SerdeUtil.getSerdeSupplier(
Format.of(formatToUse),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ private static Object valueSpecToAvro(final Object spec, final org.apache.avro.S
case BOOLEAN:
return spec;
case ARRAY:
// An array of map entries will be deserialized as a Map.
// Check https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java
// for more details
if (schema.getElementType().getName().equals(AvroData.MAP_ENTRY_TYPE_NAME)
||
Objects.equals(
schema.getElementType().getProp(AvroData.CONNECT_INTERNAL_TYPE_NAME),
AvroData.MAP_ENTRY_TYPE_NAME)
) {
return ((Map<Object, Object>) spec).entrySet().stream()
.map(objectObjectEntry ->
getAvroRecordForMapEntry(objectObjectEntry, schema.getElementType()))
.collect(Collectors.toList());
}
final List<?> list = ((List<?>) spec).stream()
.map(o -> valueSpecToAvro(o, schema.getElementType()))
.collect(Collectors.toList());
Expand All @@ -138,32 +152,70 @@ private static Object valueSpecToAvro(final Object spec, final org.apache.avro.S

return new GenericMap(schema, map);
case RECORD:
final GenericRecord record = new GenericData.Record(schema);
final Map<String, String> caseInsensitiveFieldNames
= getCaseInsensitiveMap((Map<String, ?>) spec);
for (final org.apache.avro.Schema.Field field : schema.getFields()) {
record.put(
field.name(),
valueSpecToAvro(((Map<String, ?>) spec)
.get(caseInsensitiveFieldNames.get(field.name().toUpperCase())), field.schema())
);
}
return record;
return getAvroRecord(spec, schema);
case UNION:
for (final org.apache.avro.Schema memberSchema : schema.getTypes()) {
if (!memberSchema.getType().equals(org.apache.avro.Schema.Type.NULL)) {
return valueSpecToAvro(spec, memberSchema);
}
}
throw new RuntimeException("Union must have non-null type: "
+ schema.getType().getName());

return valueSpecUnionToAvro(spec, schema);
default:
throw new RuntimeException(
"This test does not support the data type yet: " + schema.getType().getName());
}
}

@SuppressWarnings("unchecked")
private static Object valueSpecUnionToAvro(
final Object spec,
final org.apache.avro.Schema schema
) {
// If the schema has two types and one is null, serialize the non null
if (schema.getTypes().size() == 2) {
if (schema.getTypes().get(0).getType() == org.apache.avro.Schema.Type.NULL) {
return valueSpecToAvro(spec, schema.getTypes().get(1));
} else {
return valueSpecToAvro(spec, schema.getTypes().get(0));
}
}
// if the schema has more than one non null type, avro serializes it as a map
for (final org.apache.avro.Schema memberSchema : schema.getTypes()) {
if (!memberSchema.getType().equals(org.apache.avro.Schema.Type.NULL)) {
final String typeName = memberSchema.getType().getName().toUpperCase();
final Object val = ((Map<String, ?>) spec).get(typeName);
if (val != null) {
return valueSpecToAvro(val, memberSchema);
}
}
}
throw new RuntimeException("Union must have non-null type: "
+ schema.getType().getName());
}

@SuppressWarnings("unchecked")
private static GenericRecord getAvroRecord(final Object spec, final Schema schema) {
final GenericRecord record = new GenericData.Record(schema);
final Map<String, String> caseInsensitiveFieldNames
= getUppercaseKeyToActualKey((Map) spec);
for (final org.apache.avro.Schema.Field field : schema.getFields()) {
record.put(
field.name(),
valueSpecToAvro(((Map<String, ?>) spec)
.get(caseInsensitiveFieldNames.get(field.name().toUpperCase())), field.schema())
);
}
return record;
}

@SuppressWarnings("unchecked")
// A map entry will be serialized as an avro object with two fields, key and value.
private static GenericRecord getAvroRecordForMapEntry(
final Map.Entry<?, ?> spec,
final Schema schema) {
final GenericRecord record = new GenericData.Record(schema);
record.put(AvroData.KEY_FIELD,
valueSpecToAvro(spec.getKey(), schema.getField(AvroData.KEY_FIELD).schema()));
record.put(AvroData.VALUE_FIELD,
valueSpecToAvro(spec.getValue(), schema.getField(AvroData.VALUE_FIELD).schema()));
return record;
}

private static class GenericMap
extends AbstractMap<Object, Object>
implements GenericContainer {
Expand Down Expand Up @@ -261,19 +313,23 @@ private static Object avroToValueSpec(final Object avro,
"BYTES must be ByteBuffer, got " + avro.getClass());
return new DecimalConversion().fromBytes((ByteBuffer) avro, schema, logicalType);
case ARRAY:
// Since Connect serializes maps as an array of MapEntries to support maps with non
// string keys, if the array element is MapEntry type we should deserialize it as
// a map!
// Check https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java
// for more details
if (schema.getElementType().getName().equals(AvroData.MAP_ENTRY_TYPE_NAME)
||
Objects.equals(
schema.getElementType().getProp(AvroData.CONNECT_INTERNAL_TYPE_NAME),
AvroData.MAP_ENTRY_TYPE_NAME)
) {
final org.apache.avro.Schema valueSchema
= schema.getElementType().getField("value").schema();

= schema.getElementType().getField(AvroData.VALUE_FIELD).schema();
final Map<String, Object> map = new HashMap<>();
((List<GenericData.Record>) avro).forEach(e -> map.put(
e.get("key").toString(),
avroToValueSpec(e.get("value"), valueSchema, toUpper)
e.get(AvroData.KEY_FIELD).toString(),
avroToValueSpec(e.get(AvroData.VALUE_FIELD), valueSchema, toUpper)
));
return map;
}
Expand All @@ -300,30 +356,40 @@ private static Object avroToValueSpec(final Object avro,
);
return recordSpec;
case UNION:
final int pos = GenericData.get().resolveUnion(schema, avro);
final boolean hasNull = schema.getTypes().stream()
.anyMatch(s -> s.getType().equals(org.apache.avro.Schema.Type.NULL));
final Object resolved = avroToValueSpec(avro, schema.getTypes().get(pos), toUpper);
if (schema.getTypes().get(pos).getType().equals(org.apache.avro.Schema.Type.NULL)
|| schema.getTypes().size() == 2 && hasNull) {
return resolved;
}
final Map<String, Object> ret = Maps.newHashMap();
schema.getTypes()
.forEach(
s -> ret.put(s.getName().toUpperCase(), null));
ret.put(schema.getTypes().get(pos).getName().toUpperCase(), resolved);
return ret;
return avroUnionToValueSpec(avro, schema, toUpper);
default:
throw new RuntimeException("Test cannot handle data of type: " + schema.getType());
}
}

private static Object avroUnionToValueSpec(
final Object avroUnion,
final org.apache.avro.Schema schema,
final boolean toUpper
) {
final int pos = GenericData.get().resolveUnion(schema, avroUnion);
final boolean hasNull = schema.getTypes().stream()
.anyMatch(s -> s.getType().equals(org.apache.avro.Schema.Type.NULL));
final Object resolved = avroToValueSpec(avroUnion, schema.getTypes().get(pos), toUpper);
// If there are two types and one is NULL, just return the resolved object
if (schema.getTypes().get(pos).getType().equals(org.apache.avro.Schema.Type.NULL)
|| schema.getTypes().size() == 2 && hasNull) {
return resolved;
}
// If there are more than two non NULL types
final Map<String, Object> ret = Maps.newHashMap();
schema.getTypes()
.forEach(
s -> ret.put(s.getName().toUpperCase(), null));
ret.put(schema.getTypes().get(pos).getName().toUpperCase(), resolved);
return ret;
}
}

private static Map<String, String> getCaseInsensitiveMap(final Map<String, ?> record) {
@SuppressWarnings("unchecked")
private static Map<String, String> getUppercaseKeyToActualKey(final Map<String, ?> record) {
return record.entrySet().stream().collect(Collectors.toMap(
entry -> entry.getKey().toUpperCase(),
Entry::getKey));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.util.KsqlException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -80,6 +81,10 @@ public Topic getTopic(final String topicName) {
return topicMap.get(topicName);
}

public Collection<Topic> getAllTopics() {
return topicMap.values();
}


Map<String, List<FakeKafkaRecord>> getTopicData() {
return topicData;
Expand Down
Loading