From 6181d37f8902fccca16ee01f951f21d2ae2ffa4c Mon Sep 17 00:00:00 2001 From: Andy Coates Date: Fri, 17 Apr 2020 12:16:51 +0100 Subject: [PATCH] feat: remove restriction on key column name fixes: https://github.com/confluentinc/ksql/issues/3536 Previously, stream `KEY` columns and table `PRIMARY KEY` columns had to be named `ROWKEY`. This change removes this restriction. Key columns can now have _any_ name. For example, a table of users with the `BIGINT` user id stored in the Kafka record's key may have previously been defined as: ```sql CREATE TABLE USERS (ROWKEY BIGINT PRIMARY KEY, NAME STRING) WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON'); ``` With this change, this can now be defined with a more appropriate name for the table's primary key: ```sql CREATE TABLE USERS (ID BIGINT PRIMARY KEY, NAME STRING) WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON'); ``` To get around the `ROWKEY` column name requirement, previous versions of ksqlDB allowed an alias to the key to be defined in the WITH clause, for example: ```sql CREATE TABLE USERS (ROWKEY BIGINT PRIMARY KEY, NAME STRING, ID BIGINT) WITH('KEY'='ID', 'KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON'); ``` The downside of this approach is that it required a _copy_ of the key column in the value of the Kafka record, which was often not present. This is no longer required. The above statement can now also be simplified to: ```sql CREATE TABLE USERS (ID BIGINT PRIMARY KEY, NAME STRING) WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON'); ``` --- .../io/confluent/ksql/util/KsqlConfig.java | 9 --- .../ddl/commands/CreateSourceFactory.java | 13 ----- .../confluent/ksql/engine/EngineExecutor.java | 7 ++- .../ksql/planner/LogicalPlanner.java | 45 ++++----------- .../ksql/structured/SchemaKStream.java | 6 +- .../streams/GroupByParamsFactory.java | 55 +++++++------------ .../execution/streams/StepSchemaResolver.java | 4 +- .../streams/StreamGroupByBuilder.java | 6 +- .../streams/TableGroupByBuilder.java | 6 +- 9 files changed, 45 insertions(+), 106 deletions(-) diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index e462a2f71fa5..1f53c588050f 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -267,8 +267,6 @@ public class KsqlConfig extends AbstractConfig { + "behavior, and instead throw an exception to ensure that no data is missed, set " + "ksql.timestamp.skip.invalid to true."; - public static final String KSQL_ANY_KEY_NAME_ENABLED = "ksql.any.key.name.enabled"; - private enum ConfigGeneration { LEGACY, CURRENT @@ -623,13 +621,6 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.MEDIUM, KSQL_TIMESTAMP_THROW_ON_INVALID_DOC ) - .define( - KSQL_ANY_KEY_NAME_ENABLED, - Type.BOOLEAN, - false, - Importance.LOW, - "Feature flag for removing restriction on key names - WIP, do not enable." - ) .define( KSQL_QUERY_PULL_METRICS_ENABLED, Type.BOOLEAN, diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java index 7c4efaf2d034..282653c8a9cb 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceFactory.java @@ -176,19 +176,6 @@ private static LogicalSchema buildSchema( if (SchemaUtil.isSystemColumn(e.getName())) { throw new KsqlException("'" + e.getName().text() + "' is a reserved column name."); } - - if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) { - final boolean isRowKey = e.getName().equals(SchemaUtil.ROWKEY_NAME); - if (e.getNamespace().isKey()) { - if (!isRowKey) { - throw new KsqlException("'" + e.getName().text() + "' is an invalid KEY column name. " - + "KSQL currently only supports KEY columns named ROWKEY."); - } - } else if (isRowKey) { - throw new KsqlException("'" + e.getName().text() + "' is a reserved column name. " - + "It can only be used for KEY columns."); - } - } }); return tableElements.toLogicalSchema(true); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index 46c4ecd17e9c..8ad1d804b193 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -318,9 +318,12 @@ private void validateExistingSink( final LogicalSchema existingSchema = existing.getSchema(); if (!resultSchema.equals(existingSchema)) { - throw new KsqlException("Incompatible schema between results and sink. " + throw new KsqlException("Incompatible schema between results and sink." + + System.lineSeparator() + "Result schema is " + resultSchema - + ", but the sink schema is " + existingSchema + "."); + + System.lineSeparator() + + "Sink schema is " + existingSchema + ); } enforceKeyEquivalence( diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index 34426aaffe05..ddfb83b29e50 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -564,34 +564,25 @@ private LogicalSchema buildAggregateSchema( final SqlType keyType; if (groupByExps.size() != 1) { - if (ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) { - keyName = groupBy.getAlias() - .orElseGet(keyColNameGen::nextKsqlColAlias); - } else { - keyName = SchemaUtil.ROWKEY_NAME; - } keyType = SqlTypes.STRING; + keyName = groupBy.getAlias() + .orElseGet(keyColNameGen::nextKsqlColAlias); + } else { final Expression expression = groupByExps.get(0); - if (ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) { - if (groupBy.getAlias().isPresent()) { - keyName = groupBy.getAlias().get(); - } else if (expression instanceof ColumnReferenceExp) { - keyName = ((ColumnReferenceExp) expression).getColumnName(); - } else { - keyName = keyColNameGen.uniqueAliasFor(expression); - } - } else { - keyName = exactlyMatchesKeyColumns(expression, sourceSchema) - ? ((ColumnReferenceExp) expression).getColumnName() - : SchemaUtil.ROWKEY_NAME; - } - final ExpressionTypeManager typeManager = new ExpressionTypeManager(sourceSchema, functionRegistry); keyType = typeManager.getExpressionSqlType(expression); + + if (groupBy.getAlias().isPresent()) { + keyName = groupBy.getAlias().get(); + } else if (expression instanceof ColumnReferenceExp) { + keyName = ((ColumnReferenceExp) expression).getColumnName(); + } else { + keyName = keyColNameGen.uniqueAliasFor(expression); + } } return LogicalSchema.builder() @@ -607,20 +598,6 @@ private LogicalSchema buildRepartitionedSchema( ) { final LogicalSchema sourceSchema = sourceNode.getSchema(); - if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) { - final ExpressionTypeManager expressionTypeManager = - new ExpressionTypeManager(sourceSchema, functionRegistry); - - final SqlType keyType = expressionTypeManager - .getExpressionSqlType(partitionBy.getExpression()); - - return LogicalSchema.builder() - .withRowTime() - .keyColumn(SchemaUtil.ROWKEY_NAME, keyType) - .valueColumns(sourceSchema.value()) - .build(); - } - return PartitionByParamsFactory.buildSchema( sourceSchema, partitionBy.getExpression(), diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java b/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java index 3fa4217cfc4a..484c7c10c492 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java @@ -337,10 +337,8 @@ public SchemaKStream selectKey( + "See https://github.com/confluentinc/ksql/issues/4385."); } - final ExecutionStep> step = ksqlConfig - .getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED) - ? ExecutionStepFactory.streamSelectKey(contextStacker, sourceStep, keyExpression, alias) - : ExecutionStepFactory.streamSelectKeyV1(contextStacker, sourceStep, keyExpression); + final ExecutionStep> step = ExecutionStepFactory + .streamSelectKey(contextStacker, sourceStep, keyExpression, alias); return new SchemaKStream<>( step, diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java index 0496906f5735..f4269933c677 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java @@ -35,8 +35,6 @@ import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; -import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.SchemaUtil; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -55,14 +53,13 @@ private GroupByParamsFactory() { public static LogicalSchema buildSchema( final LogicalSchema sourceSchema, final List groupBys, - final Optional alias, - final KsqlConfig ksqlConfig + final Optional alias ) { Objects.requireNonNull(alias, "alias"); final ProcessingLogger logger = NoopProcessingLogContext.NOOP_LOGGER; - return buildGrouper(sourceSchema, groupBys, alias, logger, ksqlConfig) + return buildGrouper(sourceSchema, groupBys, alias, logger) .getSchema(); } @@ -70,8 +67,7 @@ public static GroupByParams build( final LogicalSchema sourceSchema, final List groupBys, final Optional alias, - final ProcessingLogger logger, - final KsqlConfig ksqlConfig + final ProcessingLogger logger ) { Objects.requireNonNull(alias, "alias"); @@ -79,7 +75,7 @@ public static GroupByParams build( throw new IllegalArgumentException("No GROUP BY groupBys"); } - final Grouper grouper = buildGrouper(sourceSchema, groupBys, alias, logger, ksqlConfig); + final Grouper grouper = buildGrouper(sourceSchema, groupBys, alias, logger); return new GroupByParams(grouper.getSchema(), grouper::apply); } @@ -88,12 +84,11 @@ private static Grouper buildGrouper( final LogicalSchema sourceSchema, final List groupBys, final Optional alias, - final ProcessingLogger logger, - final KsqlConfig ksqlConfig + final ProcessingLogger logger ) { return groupBys.size() == 1 - ? new SingleExpressionGrouper(sourceSchema, groupBys.get(0), alias, logger, ksqlConfig) - : new MultiExpressionGrouper(sourceSchema, groupBys, alias, logger, ksqlConfig); + ? new SingleExpressionGrouper(sourceSchema, groupBys.get(0), alias, logger) + : new MultiExpressionGrouper(sourceSchema, groupBys, alias, logger); } private static LogicalSchema buildSchemaWithKeyType( @@ -158,10 +153,9 @@ private static final class SingleExpressionGrouper implements Grouper { final LogicalSchema sourceSchema, final ExpressionMetadata groupBy, final Optional alias, - final ProcessingLogger logger, - final KsqlConfig ksqlConfig + final ProcessingLogger logger ) { - this.schema = singleExpressionSchema(sourceSchema, groupBy, alias, ksqlConfig); + this.schema = singleExpressionSchema(sourceSchema, groupBy, alias); this.groupBy = requireNonNull(groupBy, "groupBy"); this.keyBuilder = keyBuilder(schema); this.logger = Objects.requireNonNull(logger, "logger"); @@ -184,8 +178,7 @@ public Struct apply(final GenericRow row) { private static LogicalSchema singleExpressionSchema( final LogicalSchema sourceSchema, final ExpressionMetadata groupBy, - final Optional alias, - final KsqlConfig ksqlConfig + final Optional alias ) { final SqlType keyType = groupBy.getExpressionType(); final Expression groupByExp = groupBy.getExpression(); @@ -195,16 +188,12 @@ private static LogicalSchema singleExpressionSchema( final ColumnName singleColumnName; - if (ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) { - if (alias.isPresent()) { - singleColumnName = alias.get(); - } else if (groupByExp instanceof ColumnReferenceExp) { - singleColumnName = ((ColumnReferenceExp) groupByExp).getColumnName(); - } else { - singleColumnName = keyAliasGenerator.uniqueAliasFor(groupByExp); - } + if (alias.isPresent()) { + singleColumnName = alias.get(); + } else if (groupByExp instanceof ColumnReferenceExp) { + singleColumnName = ((ColumnReferenceExp) groupByExp).getColumnName(); } else { - singleColumnName = SchemaUtil.ROWKEY_NAME; + singleColumnName = keyAliasGenerator.uniqueAliasFor(groupByExp); } return buildSchemaWithKeyType(sourceSchema, singleColumnName, keyType); @@ -222,10 +211,9 @@ private static final class MultiExpressionGrouper implements Grouper { final LogicalSchema sourceSchema, final List groupBys, final Optional alias, - final ProcessingLogger logger, - final KsqlConfig ksqlConfig + final ProcessingLogger logger ) { - this.schema = multiExpressionSchema(sourceSchema, alias, ksqlConfig); + this.schema = multiExpressionSchema(sourceSchema, alias); this.groupBys = ImmutableList.copyOf(requireNonNull(groupBys, "groupBys")); this.keyBuilder = keyBuilder(schema); this.logger = Objects.requireNonNull(logger, "logger"); @@ -262,13 +250,12 @@ public Struct apply(final GenericRow row) { private static LogicalSchema multiExpressionSchema( final LogicalSchema sourceSchema, - final Optional alias, - final KsqlConfig ksqlConfig + final Optional alias ) { final ColumnName keyName = alias - .orElseGet(() -> ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED) - ? ColumnNames.columnAliasGenerator(Stream.of(sourceSchema)).nextKsqlColAlias() - : SchemaUtil.ROWKEY_NAME); + .orElseGet(() -> ColumnNames + .columnAliasGenerator(Stream.of(sourceSchema)) + .nextKsqlColAlias()); return buildSchemaWithKeyType(sourceSchema, keyName, SqlTypes.STRING); } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java index 32ae18ead12a..20cf1591aa88 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java @@ -185,7 +185,7 @@ private LogicalSchema handleStreamGroupBy( ); return GroupByParamsFactory - .buildSchema(sourceSchema, compiledGroupBy, streamGroupBy.getAlias(), ksqlConfig); + .buildSchema(sourceSchema, compiledGroupBy, streamGroupBy.getAlias()); } private LogicalSchema handleTableGroupBy( @@ -201,7 +201,7 @@ private LogicalSchema handleTableGroupBy( ); return GroupByParamsFactory - .buildSchema(sourceSchema, compiledGroupBy, tableGroupBy.getAlias(), ksqlConfig); + .buildSchema(sourceSchema, compiledGroupBy, tableGroupBy.getAlias()); } private LogicalSchema handleStreamSelect( diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java index a7c1702f2a04..7426ca25d461 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java @@ -32,7 +32,6 @@ import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; -import io.confluent.ksql.util.KsqlConfig; import java.util.List; import java.util.Optional; import org.apache.kafka.common.serialization.Serde; @@ -100,7 +99,7 @@ public KGroupedStreamHolder build( final ProcessingLogger logger = queryBuilder.getProcessingLogger(queryContext); final GroupByParams params = paramsFactory - .build(sourceSchema, groupBy, step.getAlias(), logger, queryBuilder.getKsqlConfig()); + .build(sourceSchema, groupBy, step.getAlias(), logger); final Grouped grouped = buildGrouped( formats, @@ -147,8 +146,7 @@ GroupByParams build( LogicalSchema sourceSchema, List groupBys, Optional alias, - ProcessingLogger logger, - KsqlConfig ksqlConfig + ProcessingLogger logger ); } } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java index 5e3d42e1cef1..72054fc4687b 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java @@ -31,7 +31,6 @@ import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; -import io.confluent.ksql.util.KsqlConfig; import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -85,7 +84,7 @@ public KGroupedTableHolder build( final ProcessingLogger logger = queryBuilder.getProcessingLogger(queryContext); final GroupByParams params = paramsFactory - .build(sourceSchema, groupBy, step.getAlias(), logger, queryBuilder.getKsqlConfig()); + .build(sourceSchema, groupBy, step.getAlias(), logger); final PhysicalSchema physicalSchema = PhysicalSchema.from( params.getSchema(), @@ -135,8 +134,7 @@ GroupByParams build( LogicalSchema sourceSchema, List groupBys, Optional alias, - ProcessingLogger logger, - KsqlConfig ksqlConfig + ProcessingLogger logger ); } }