Skip to content

Commit

Permalink
feat: remove restriction on key column name
Browse files Browse the repository at this point in the history
fixes: confluentinc#3536

Previously, stream `KEY` columns and table `PRIMARY KEY` columns had to be named `ROWKEY`. This change removes this restriction. Key columns can now have _any_ name.

For example, a table of users with the `BIGINT` user id stored in the Kafka record's key may have previously been defined as:

```sql
CREATE TABLE USERS (ROWKEY BIGINT PRIMARY KEY, NAME STRING)
   WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```

With this change, this can now be defined with a more appropriate name for the table's primary key:

```sql
CREATE TABLE USERS (ID BIGINT PRIMARY KEY, NAME STRING)
   WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```

To get around the `ROWKEY` column name requirement, previous versions of ksqlDB allowed an alias to the key to be defined in the WITH clause, for example:

```sql
CREATE TABLE USERS (ROWKEY BIGINT PRIMARY KEY, NAME STRING, ID BIGINT)
   WITH('KEY'='ID', 'KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```

The downside of this approach is that it required a _copy_ of the key column in the value of the Kafka record, which was often not present. This is no longer required. The above statement can now also be simplified to:

```sql
CREATE TABLE USERS (ID BIGINT PRIMARY KEY, NAME STRING)
   WITH('KAFKA_TOPIC'='users', 'VALUE_FORMAT'='JSON');
```
  • Loading branch information
big-andy-coates committed Apr 17, 2020
1 parent 1cd2a28 commit 6181d37
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,6 @@ public class KsqlConfig extends AbstractConfig {
+ "behavior, and instead throw an exception to ensure that no data is missed, set "
+ "ksql.timestamp.skip.invalid to true.";

public static final String KSQL_ANY_KEY_NAME_ENABLED = "ksql.any.key.name.enabled";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -623,13 +621,6 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.MEDIUM,
KSQL_TIMESTAMP_THROW_ON_INVALID_DOC
)
.define(
KSQL_ANY_KEY_NAME_ENABLED,
Type.BOOLEAN,
false,
Importance.LOW,
"Feature flag for removing restriction on key names - WIP, do not enable."
)
.define(
KSQL_QUERY_PULL_METRICS_ENABLED,
Type.BOOLEAN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,6 @@ private static LogicalSchema buildSchema(
if (SchemaUtil.isSystemColumn(e.getName())) {
throw new KsqlException("'" + e.getName().text() + "' is a reserved column name.");
}

if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) {
final boolean isRowKey = e.getName().equals(SchemaUtil.ROWKEY_NAME);
if (e.getNamespace().isKey()) {
if (!isRowKey) {
throw new KsqlException("'" + e.getName().text() + "' is an invalid KEY column name. "
+ "KSQL currently only supports KEY columns named ROWKEY.");
}
} else if (isRowKey) {
throw new KsqlException("'" + e.getName().text() + "' is a reserved column name. "
+ "It can only be used for KEY columns.");
}
}
});

return tableElements.toLogicalSchema(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,12 @@ private void validateExistingSink(
final LogicalSchema existingSchema = existing.getSchema();

if (!resultSchema.equals(existingSchema)) {
throw new KsqlException("Incompatible schema between results and sink. "
throw new KsqlException("Incompatible schema between results and sink."
+ System.lineSeparator()
+ "Result schema is " + resultSchema
+ ", but the sink schema is " + existingSchema + ".");
+ System.lineSeparator()
+ "Sink schema is " + existingSchema
);
}

enforceKeyEquivalence(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,34 +564,25 @@ private LogicalSchema buildAggregateSchema(
final SqlType keyType;

if (groupByExps.size() != 1) {
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) {
keyName = groupBy.getAlias()
.orElseGet(keyColNameGen::nextKsqlColAlias);
} else {
keyName = SchemaUtil.ROWKEY_NAME;
}
keyType = SqlTypes.STRING;
keyName = groupBy.getAlias()
.orElseGet(keyColNameGen::nextKsqlColAlias);

} else {
final Expression expression = groupByExps.get(0);

if (ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) {
if (groupBy.getAlias().isPresent()) {
keyName = groupBy.getAlias().get();
} else if (expression instanceof ColumnReferenceExp) {
keyName = ((ColumnReferenceExp) expression).getColumnName();
} else {
keyName = keyColNameGen.uniqueAliasFor(expression);
}
} else {
keyName = exactlyMatchesKeyColumns(expression, sourceSchema)
? ((ColumnReferenceExp) expression).getColumnName()
: SchemaUtil.ROWKEY_NAME;
}

final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourceSchema, functionRegistry);

keyType = typeManager.getExpressionSqlType(expression);

if (groupBy.getAlias().isPresent()) {
keyName = groupBy.getAlias().get();
} else if (expression instanceof ColumnReferenceExp) {
keyName = ((ColumnReferenceExp) expression).getColumnName();
} else {
keyName = keyColNameGen.uniqueAliasFor(expression);
}
}

return LogicalSchema.builder()
Expand All @@ -607,20 +598,6 @@ private LogicalSchema buildRepartitionedSchema(
) {
final LogicalSchema sourceSchema = sourceNode.getSchema();

if (!ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) {
final ExpressionTypeManager expressionTypeManager =
new ExpressionTypeManager(sourceSchema, functionRegistry);

final SqlType keyType = expressionTypeManager
.getExpressionSqlType(partitionBy.getExpression());

return LogicalSchema.builder()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, keyType)
.valueColumns(sourceSchema.value())
.build();
}

return PartitionByParamsFactory.buildSchema(
sourceSchema,
partitionBy.getExpression(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,8 @@ public SchemaKStream<Struct> selectKey(
+ "See https://github.com/confluentinc/ksql/issues/4385.");
}

final ExecutionStep<KStreamHolder<Struct>> step = ksqlConfig
.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)
? ExecutionStepFactory.streamSelectKey(contextStacker, sourceStep, keyExpression, alias)
: ExecutionStepFactory.streamSelectKeyV1(contextStacker, sourceStep, keyExpression);
final ExecutionStep<KStreamHolder<Struct>> step = ExecutionStepFactory
.streamSelectKey(contextStacker, sourceStep, keyExpression, alias);

return new SchemaKStream<>(
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.SchemaUtil;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -55,31 +53,29 @@ private GroupByParamsFactory() {
public static LogicalSchema buildSchema(
final LogicalSchema sourceSchema,
final List<ExpressionMetadata> groupBys,
final Optional<ColumnName> alias,
final KsqlConfig ksqlConfig
final Optional<ColumnName> alias
) {
Objects.requireNonNull(alias, "alias");

final ProcessingLogger logger = NoopProcessingLogContext.NOOP_LOGGER;

return buildGrouper(sourceSchema, groupBys, alias, logger, ksqlConfig)
return buildGrouper(sourceSchema, groupBys, alias, logger)
.getSchema();
}

public static GroupByParams build(
final LogicalSchema sourceSchema,
final List<ExpressionMetadata> groupBys,
final Optional<ColumnName> alias,
final ProcessingLogger logger,
final KsqlConfig ksqlConfig
final ProcessingLogger logger
) {
Objects.requireNonNull(alias, "alias");

if (groupBys.isEmpty()) {
throw new IllegalArgumentException("No GROUP BY groupBys");
}

final Grouper grouper = buildGrouper(sourceSchema, groupBys, alias, logger, ksqlConfig);
final Grouper grouper = buildGrouper(sourceSchema, groupBys, alias, logger);

return new GroupByParams(grouper.getSchema(), grouper::apply);
}
Expand All @@ -88,12 +84,11 @@ private static Grouper buildGrouper(
final LogicalSchema sourceSchema,
final List<ExpressionMetadata> groupBys,
final Optional<ColumnName> alias,
final ProcessingLogger logger,
final KsqlConfig ksqlConfig
final ProcessingLogger logger
) {
return groupBys.size() == 1
? new SingleExpressionGrouper(sourceSchema, groupBys.get(0), alias, logger, ksqlConfig)
: new MultiExpressionGrouper(sourceSchema, groupBys, alias, logger, ksqlConfig);
? new SingleExpressionGrouper(sourceSchema, groupBys.get(0), alias, logger)
: new MultiExpressionGrouper(sourceSchema, groupBys, alias, logger);
}

private static LogicalSchema buildSchemaWithKeyType(
Expand Down Expand Up @@ -158,10 +153,9 @@ private static final class SingleExpressionGrouper implements Grouper {
final LogicalSchema sourceSchema,
final ExpressionMetadata groupBy,
final Optional<ColumnName> alias,
final ProcessingLogger logger,
final KsqlConfig ksqlConfig
final ProcessingLogger logger
) {
this.schema = singleExpressionSchema(sourceSchema, groupBy, alias, ksqlConfig);
this.schema = singleExpressionSchema(sourceSchema, groupBy, alias);
this.groupBy = requireNonNull(groupBy, "groupBy");
this.keyBuilder = keyBuilder(schema);
this.logger = Objects.requireNonNull(logger, "logger");
Expand All @@ -184,8 +178,7 @@ public Struct apply(final GenericRow row) {
private static LogicalSchema singleExpressionSchema(
final LogicalSchema sourceSchema,
final ExpressionMetadata groupBy,
final Optional<ColumnName> alias,
final KsqlConfig ksqlConfig
final Optional<ColumnName> alias
) {
final SqlType keyType = groupBy.getExpressionType();
final Expression groupByExp = groupBy.getExpression();
Expand All @@ -195,16 +188,12 @@ private static LogicalSchema singleExpressionSchema(

final ColumnName singleColumnName;

if (ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) {
if (alias.isPresent()) {
singleColumnName = alias.get();
} else if (groupByExp instanceof ColumnReferenceExp) {
singleColumnName = ((ColumnReferenceExp) groupByExp).getColumnName();
} else {
singleColumnName = keyAliasGenerator.uniqueAliasFor(groupByExp);
}
if (alias.isPresent()) {
singleColumnName = alias.get();
} else if (groupByExp instanceof ColumnReferenceExp) {
singleColumnName = ((ColumnReferenceExp) groupByExp).getColumnName();
} else {
singleColumnName = SchemaUtil.ROWKEY_NAME;
singleColumnName = keyAliasGenerator.uniqueAliasFor(groupByExp);
}

return buildSchemaWithKeyType(sourceSchema, singleColumnName, keyType);
Expand All @@ -222,10 +211,9 @@ private static final class MultiExpressionGrouper implements Grouper {
final LogicalSchema sourceSchema,
final List<ExpressionMetadata> groupBys,
final Optional<ColumnName> alias,
final ProcessingLogger logger,
final KsqlConfig ksqlConfig
final ProcessingLogger logger
) {
this.schema = multiExpressionSchema(sourceSchema, alias, ksqlConfig);
this.schema = multiExpressionSchema(sourceSchema, alias);
this.groupBys = ImmutableList.copyOf(requireNonNull(groupBys, "groupBys"));
this.keyBuilder = keyBuilder(schema);
this.logger = Objects.requireNonNull(logger, "logger");
Expand Down Expand Up @@ -262,13 +250,12 @@ public Struct apply(final GenericRow row) {

private static LogicalSchema multiExpressionSchema(
final LogicalSchema sourceSchema,
final Optional<ColumnName> alias,
final KsqlConfig ksqlConfig
final Optional<ColumnName> alias
) {
final ColumnName keyName = alias
.orElseGet(() -> ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)
? ColumnNames.columnAliasGenerator(Stream.of(sourceSchema)).nextKsqlColAlias()
: SchemaUtil.ROWKEY_NAME);
.orElseGet(() -> ColumnNames
.columnAliasGenerator(Stream.of(sourceSchema))
.nextKsqlColAlias());

return buildSchemaWithKeyType(sourceSchema, keyName, SqlTypes.STRING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private LogicalSchema handleStreamGroupBy(
);

return GroupByParamsFactory
.buildSchema(sourceSchema, compiledGroupBy, streamGroupBy.getAlias(), ksqlConfig);
.buildSchema(sourceSchema, compiledGroupBy, streamGroupBy.getAlias());
}

private LogicalSchema handleTableGroupBy(
Expand All @@ -201,7 +201,7 @@ private LogicalSchema handleTableGroupBy(
);

return GroupByParamsFactory
.buildSchema(sourceSchema, compiledGroupBy, tableGroupBy.getAlias(), ksqlConfig);
.buildSchema(sourceSchema, compiledGroupBy, tableGroupBy.getAlias());
}

private LogicalSchema handleStreamSelect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.util.KsqlConfig;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serde;
Expand Down Expand Up @@ -100,7 +99,7 @@ public <K> KGroupedStreamHolder build(
final ProcessingLogger logger = queryBuilder.getProcessingLogger(queryContext);

final GroupByParams params = paramsFactory
.build(sourceSchema, groupBy, step.getAlias(), logger, queryBuilder.getKsqlConfig());
.build(sourceSchema, groupBy, step.getAlias(), logger);

final Grouped<Struct, GenericRow> grouped = buildGrouped(
formats,
Expand Down Expand Up @@ -147,8 +146,7 @@ GroupByParams build(
LogicalSchema sourceSchema,
List<ExpressionMetadata> groupBys,
Optional<ColumnName> alias,
ProcessingLogger logger,
KsqlConfig ksqlConfig
ProcessingLogger logger
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.util.KsqlConfig;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
Expand Down Expand Up @@ -85,7 +84,7 @@ public <K> KGroupedTableHolder build(
final ProcessingLogger logger = queryBuilder.getProcessingLogger(queryContext);

final GroupByParams params = paramsFactory
.build(sourceSchema, groupBy, step.getAlias(), logger, queryBuilder.getKsqlConfig());
.build(sourceSchema, groupBy, step.getAlias(), logger);

final PhysicalSchema physicalSchema = PhysicalSchema.from(
params.getSchema(),
Expand Down Expand Up @@ -135,8 +134,7 @@ GroupByParams build(
LogicalSchema sourceSchema,
List<ExpressionMetadata> groupBys,
Optional<ColumnName> alias,
ProcessingLogger logger,
KsqlConfig ksqlConfig
ProcessingLogger logger
);
}
}

0 comments on commit 6181d37

Please sign in to comment.