From ce6f3922ca6423a11fa982e02ba5b88fa4afcacf Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Tue, 3 Mar 2020 21:01:15 +0000 Subject: [PATCH] refactor: do not allow duplicates in schemas by default KsqlDB does not support data sources with duplicate column names in the key and value. (Mainly due to the fact that it copies the key columns into the value while processing with Kafka Streams). It therefore makes sense, to me at least, that our `LogicalSchema` rejects duplicates by default, but allows them if explicitly told to do so. Doesn't change anything functionally, but does mean code is more explicit about when it does and does not allow duplicates. --- .../ksql/schema/ksql/LogicalSchema.java | 33 ++++++++++++++++++- .../ksql/schema/ksql/LogicalSchemaTest.java | 29 ++++++++++++++-- .../ksql/planner/LogicalPlanner.java | 4 ++- .../execution/transform/select/Selection.java | 1 + .../model/StructuredDataSourceTest.java | 1 + .../streams/GroupByParamsFactory.java | 1 + .../execution/streams/StepSchemaResolver.java | 1 + .../streams/StreamFlatMapBuilder.java | 1 + .../streams/StreamGroupByBuilderTest.java | 2 ++ .../streams/StreamSelectKeyBuilderTest.java | 1 + .../streams/TableGroupByBuilderTest.java | 2 ++ 11 files changed, 71 insertions(+), 5 deletions(-) diff --git a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java index 7ac62008c929..f32550d68f41 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java +++ b/ksql-common/src/main/java/io/confluent/ksql/schema/ksql/LogicalSchema.java @@ -310,11 +310,27 @@ public static class Builder { private final Set seenKeys = new HashSet<>(); private final Set seenValues = new HashSet<>(); + private boolean failOnDuplicates = true; + public Builder withRowTime() { columns.add(IMPLICIT_TIME_COLUMN); return this; } + /** + * Allows key and value columns with the same name. + * + *

This should only be used when building the schemas used by Kafka Streams, where + * the key columns are copied into the value schema. KSQL does not support duplicate column + * names in data sources. + * + * @return self. + */ + public Builder allowDuplicates() { + failOnDuplicates = false; + return this; + } + public Builder keyColumns(final Iterable columns) { columns.forEach(this::keyColumn); return this; @@ -344,7 +360,22 @@ public Builder valueColumn(final ColumnName name, final SqlType type) { } public LogicalSchema build() { - return new LogicalSchema(columns.build()); + final LogicalSchema schema = new LogicalSchema(columns.build()); + + if (failOnDuplicates) { + final String duplicates = schema.value().stream() + .map(Column::name) + .filter(schema::isKeyColumn) + .map(ColumnName::toString) + .collect(Collectors.joining(", ")); + + if (!duplicates.isEmpty()) { + throw new IllegalStateException("Value column name(s) " + duplicates + + " clashes with key column name(s)."); + } + } + + return schema; } private void addColumn(final Column column) { diff --git a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java index ae5705ea376f..b1f97399ba42 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/schema/ksql/LogicalSchemaTest.java @@ -233,7 +233,8 @@ public void shouldGetValueColumns() { public void shouldPreferKeyOverValueAndMetaColumns() { // Given: final LogicalSchema schema = LogicalSchema.builder() - // Implicit meta ROWTIME + .allowDuplicates() + .withRowTime() .valueColumn(ROWTIME_NAME, BIGINT) .keyColumn(ROWTIME_NAME, BIGINT) .build(); @@ -412,6 +413,7 @@ public void shouldAddMetaAndKeyColumnsToValue() { // Then: assertThat(result, is(LogicalSchema.builder() .withRowTime() + .allowDuplicates() .keyColumn(K0, INTEGER) .keyColumn(K1, STRING) .valueColumn(F0, STRING) @@ -441,6 +443,7 @@ public void shouldAddWindowedMetaAndKeyColumnsToValue() { // Then: assertThat(result, is(LogicalSchema.builder() .withRowTime() + .allowDuplicates() .keyColumn(K0, INTEGER) .keyColumn(K1, STRING) .valueColumn(F0, STRING) @@ -476,6 +479,7 @@ public void shouldRemoveOthersWhenAddingMetasAndKeyColumns() { // Given: final LogicalSchema ksqlSchema = LogicalSchema.builder() .withRowTime() + .allowDuplicates() .keyColumn(K0, INTEGER) .valueColumn(F0, BIGINT) .valueColumn(K0, DOUBLE) @@ -489,6 +493,7 @@ public void shouldRemoveOthersWhenAddingMetasAndKeyColumns() { // Then: assertThat(result, is(LogicalSchema.builder() .withRowTime() + .allowDuplicates() .keyColumn(K0, INTEGER) .valueColumn(F0, BIGINT) .valueColumn(F1, BIGINT) @@ -570,6 +575,7 @@ public void shouldRemoveMetaColumnsWhereEverTheyAre() { public void shouldRemoveKeyColumnsWhereEverTheyAre() { // Given: final LogicalSchema schema = LogicalSchema.builder() + .allowDuplicates() .keyColumn(K0, STRING) .valueColumn(F0, BIGINT) .valueColumn(K0, STRING) @@ -648,7 +654,7 @@ public void shouldGetKeyConnectSchema() { // Given: final LogicalSchema schema = LogicalSchema.builder() .keyColumn(F0, DOUBLE) - .valueColumn(F0, BIGINT) + .valueColumn(F1, BIGINT) .build(); // When: @@ -665,7 +671,7 @@ public void shouldGetKeyConnectSchema() { public void shouldGetValueConnectSchema() { // Given: final LogicalSchema schema = LogicalSchema.builder() - .keyColumn(F0, STRING) + .keyColumn(K0, STRING) .valueColumn(F0, BIGINT) .valueColumn(F1, STRING) .build(); @@ -744,6 +750,23 @@ public void shouldOnlyMatchValueSchema() { assertThat(schema.valueContainsAny(ImmutableSet.of(K0, V0, ROWTIME_NAME)), is(false)); } + @Test(expected = IllegalStateException.class) + public void shouldThrowOnDuplicateColumnName() { + LogicalSchema.builder() + .valueColumn(K0, STRING) + .keyColumn(K0, BIGINT) + .build(); + } + + @Test + public void shouldNotThrowOnDuplicateColumnName() { + LogicalSchema.builder() + .valueColumn(K0, STRING) + .keyColumn(K0, BIGINT) + .allowDuplicates() + .build(); + } + private static org.apache.kafka.connect.data.Field connectField( final String fieldName, final int index, diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index 69b27fbbee0e..166c70d4a368 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -387,7 +387,8 @@ private LogicalSchema buildProjectionSchema( ); final Builder builder = LogicalSchema.builder() - .withRowTime(); + .withRowTime() + .allowDuplicates(); final List keyColumns = schema.key(); @@ -444,6 +445,7 @@ private LogicalSchema buildRepartitionedSchema( final SqlType keyType = typeManager.getExpressionSqlType(partitionBy); return LogicalSchema.builder() + .allowDuplicates() .withRowTime() .keyColumn(SchemaUtil.ROWKEY_NAME, keyType) .valueColumns(sourceSchema.value()) diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/transform/select/Selection.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/transform/select/Selection.java index 1a1eda9e0574..e2e05b2914ca 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/transform/select/Selection.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/transform/select/Selection.java @@ -52,6 +52,7 @@ private static LogicalSchema buildSchema( final SelectValueMapper mapper ) { final LogicalSchema.Builder schemaBuilder = LogicalSchema.builder() + .allowDuplicates() .withRowTime(); final List keyCols = sourceSchema.key(); diff --git a/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java b/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java index dfaf0cebe73b..9e5958cc0cbc 100644 --- a/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java +++ b/ksql-metastore/src/test/java/io/confluent/ksql/metastore/model/StructuredDataSourceTest.java @@ -76,6 +76,7 @@ public void shouldThrowIfSchemaContainsRowTime() { public void shouldThrowIfSchemaContainsRowKey() { // Given: final LogicalSchema schema = LogicalSchema.builder() + .allowDuplicates() .withRowTime() .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING) .valueColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING) diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java index 1df35ef2cf1e..fbd2efc00ebb 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java @@ -85,6 +85,7 @@ private static LogicalSchema buildSchemaWithKeyType( final SqlType rowKeyType ) { return LogicalSchema.builder() + .allowDuplicates() .withRowTime() .keyColumn(SchemaUtil.ROWKEY_NAME, rowKeyType) .valueColumns(sourceSchema.value()) diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java index 3671ea4d6f6f..8cba9347d4fe 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java @@ -218,6 +218,7 @@ private LogicalSchema handleSelectKey( .getExpressionSqlType(step.getKeyExpression()); return LogicalSchema.builder() + .allowDuplicates() .withRowTime() .keyColumn(SchemaUtil.ROWKEY_NAME, keyType) .valueColumns(sourceSchema.value()) diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java index 2064ad582769..f0bf6a285e51 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamFlatMapBuilder.java @@ -99,6 +99,7 @@ public static LogicalSchema buildSchema( final FunctionRegistry functionRegistry ) { final LogicalSchema.Builder schemaBuilder = LogicalSchema.builder() + .allowDuplicates() .withRowTime(); final List cols = inputSchema.value(); diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java index 91946e8ab6d7..33cf8dcd093b 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java @@ -69,6 +69,7 @@ public class StreamGroupByBuilderTest { .withMetaAndKeyColsInValue(false); private static final LogicalSchema REKEYED_SCHEMA = LogicalSchema.builder() + .allowDuplicates() .withRowTime() .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING) .valueColumns(SCHEMA.value()) @@ -216,6 +217,7 @@ public void shouldReturnCorrectSchemaForGroupBy() { // Then: assertThat(result.getSchema(), is(LogicalSchema.builder() + .allowDuplicates() .withRowTime() .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING) .valueColumns(SCHEMA.value()) diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java index b80151a8370a..26ba754d8076 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamSelectKeyBuilderTest.java @@ -74,6 +74,7 @@ public class StreamSelectKeyBuilderTest { new UnqualifiedColumnReferenceExp(ColumnName.of("BOI")); private static final LogicalSchema RESULT_SCHEMA = LogicalSchema.builder() + .allowDuplicates() .withRowTime() .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.BIGINT) .valueColumn(ColumnName.of("BIG"), SqlTypes.BIGINT) diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java index 8c1bc47a8580..3f4f3618eb1e 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java @@ -66,6 +66,7 @@ public class TableGroupByBuilderTest { .withMetaAndKeyColsInValue(false); private static final LogicalSchema REKEYED_SCHEMA = LogicalSchema.builder() + .allowDuplicates() .withRowTime() .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING) .valueColumns(SCHEMA.value()) @@ -190,6 +191,7 @@ public void shouldReturnCorrectSchema() { // Then: assertThat(result.getSchema(), is(is(LogicalSchema.builder() + .allowDuplicates() .withRowTime() .keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING) .valueColumns(SCHEMA.value())