Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial support for arbitrary key column names. #4701

Merged
merged 3 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMap1;
import io.confluent.ksql.util.HandlerMaps.Handler1;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.TabularRow;
import java.io.Closeable;
import java.io.File;
Expand Down Expand Up @@ -454,11 +455,11 @@ private static String formatFieldType(
final Optional<WindowType> windowType,
final String keyField
) {
if (field.getName().equals("ROWTIME")) {
if (field.getName().equals(SchemaUtil.ROWTIME_NAME.text())) {
return String.format("%-16s %s", field.getSchema().toTypeString(), "(system)");
}

if (field.getName().equals("ROWKEY")) {
if (field.getName().equals(SchemaUtil.ROWKEY_NAME.text())) {
final String wt = windowType
.map(v -> " (Window type: " + v + ")")
.orElse("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ public class KsqlConfig extends AbstractConfig {
+ "behavior, and instead throw an exception to ensure that no data is missed, set "
+ "ksql.timestamp.skip.invalid to true.";

public static final String KSQL_ANY_KEY_NAME_ENABLED = "ksql.any.key.name.enabled";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -623,6 +625,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.MEDIUM,
KSQL_TIMESTAMP_THROW_ON_INVALID_DOC
)
.define(
KSQL_ANY_KEY_NAME_ENABLED,
Type.BOOLEAN,
false,
Importance.LOW,
"Feature flag for removing restriction on key names - WIP, do not enable."
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ private void visitSelectStar(final AllColumns allColumns) {
// See https://github.com/confluentinc/ksql/issues/3731 for more info
final List<Column> valueColumns = persistent && !analysis.isJoin()
? schema.value()
: systemColumnsToTheFront(schema.withMetaAndKeyColsInValue(windowed).value());
: orderColumns(schema.withMetaAndKeyColsInValue(windowed).value(), schema);

for (final Column column : valueColumns) {

Expand All @@ -596,12 +596,15 @@ private void visitSelectStar(final AllColumns allColumns) {
}
}

private List<Column> systemColumnsToTheFront(final List<Column> columns) {
// When doing a `select *` the system columns should be at the front of the column list
private List<Column> orderColumns(
final List<Column> columns,
final LogicalSchema schema
) {
// When doing a `select *` system and key columns should be at the front of the column list
// but are added at the back during processing for performance reasons.
// Switch them around here:
final Map<Boolean, List<Column>> partitioned = columns.stream()
.collect(Collectors.groupingBy(c -> SchemaUtil.isSystemColumn(c.name())));
final Map<Boolean, List<Column>> partitioned = columns.stream().collect(Collectors
.groupingBy(c -> SchemaUtil.isSystemColumn(c.name()) || schema.isKeyColumn(c.name())));

final List<Column> all = partitioned.get(true);
all.addAll(partitioned.get(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public CreateStreamCommand createStreamCommand(
) {
final SourceName sourceName = statement.getName();
final KsqlTopic topic = buildTopic(statement.getProperties(), serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
final LogicalSchema schema = buildSchema(statement.getElements(), ksqlConfig);
final Optional<ColumnName> keyFieldName = buildKeyFieldName(statement, schema);
final Optional<TimestampColumn> timestampColumn = buildTimestampColumn(
ksqlConfig,
Expand Down Expand Up @@ -121,7 +121,7 @@ public CreateTableCommand createTableCommand(
) {
final SourceName sourceName = statement.getName();
final KsqlTopic topic = buildTopic(statement.getProperties(), serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
final LogicalSchema schema = buildSchema(statement.getElements(), ksqlConfig);
final Optional<ColumnName> keyFieldName = buildKeyFieldName(statement, schema);
final Optional<TimestampColumn> timestampColumn = buildTimestampColumn(
ksqlConfig,
Expand Down Expand Up @@ -165,7 +165,10 @@ private static Optional<ColumnName> buildKeyFieldName(
}
}

private static LogicalSchema buildSchema(final TableElements tableElements) {
private static LogicalSchema buildSchema(
final TableElements tableElements,
final KsqlConfig ksqlConfig
) {
if (Iterables.isEmpty(tableElements)) {
throw new KsqlException("The statement does not define any columns.");
}
Expand All @@ -177,14 +180,21 @@ private static LogicalSchema buildSchema(final TableElements tableElements) {
throw new KsqlException("'" + e.getName().text() + "' is a reserved column name.");
}

if (e.getNamespace() == Namespace.KEY) {
if (!isRowKey) {
throw new KsqlException("'" + e.getName().text() + "' is an invalid KEY column name. "
+ "KSQL currently only supports KEY columns named ROWKEY.");
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) {
if (isRowKey && e.getNamespace() != Namespace.KEY) {
throw new KsqlException("'" + e.getName().text() + "' is a reserved column name. "
+ "It can only be used for KEY columns.");
}
} else {
if (e.getNamespace() == Namespace.KEY) {
if (!isRowKey) {
throw new KsqlException("'" + e.getName().text() + "' is an invalid KEY column name. "
+ "KSQL currently only supports KEY columns named ROWKEY.");
}
} else if (isRowKey) {
throw new KsqlException("'" + e.getName().text() + "' is a reserved column name. "
+ "It can only be used for KEY columns.");
}
} else if (isRowKey) {
throw new KsqlException("'" + e.getName().text() + "' is a reserved column name. "
+ "It can only be used for KEY columns.");
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.DefaultSqlValueCoercer;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SchemaConverters;
Expand All @@ -61,7 +60,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -142,6 +140,7 @@ private InsertValuesExecutor(
this.valueSerdeFactory = Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory");
}

@SuppressWarnings("unused") // Part of required API.
public void execute(
final ConfiguredStatement<InsertValues> statement,
final SessionProperties sessionProperties,
Expand Down Expand Up @@ -175,7 +174,7 @@ public void execute(
}
}

private DataSource getDataSource(
private static DataSource getDataSource(
final KsqlConfig ksqlConfig,
final MetaStore metaStore,
final InsertValues insertValues
Expand Down Expand Up @@ -266,11 +265,23 @@ private RowData extractRow(
final Map<ColumnName, Object> values = resolveValues(
insertValues, columns, schema, functionRegistry, config);

handleExplicitKeyField(values, dataSource.getKeyField());
handleExplicitKeyField(
values,
dataSource.getKeyField(),
Iterables.getOnlyElement(dataSource.getSchema().key())
);

if (dataSource.getDataSourceType() == DataSourceType.KTABLE) {
final String noValue = dataSource.getSchema().key().stream()
.map(Column::name)
.filter(colName -> !values.containsKey(colName))
.map(ColumnName::text)
.collect(Collectors.joining(", "));

if (dataSource.getDataSourceType() == DataSourceType.KTABLE
&& values.get(SchemaUtil.ROWKEY_NAME) == null) {
throw new KsqlException("Value for ROWKEY is required for tables");
if (!noValue.isEmpty()) {
throw new KsqlException("Value for primary key column(s) "
+ noValue + " is required for tables");
}
}

final long ts = (long) values.getOrDefault(SchemaUtil.ROWTIME_NAME, clock.getAsLong());
Expand Down Expand Up @@ -357,26 +368,29 @@ private static Map<ColumnName, Object> resolveValues(

private static void handleExplicitKeyField(
final Map<ColumnName, Object> values,
final KeyField keyField
final KeyField keyField,
final Column keyColumn
) {
final Optional<ColumnName> keyFieldName = keyField.ref();
if (keyFieldName.isPresent()) {
final ColumnName key = keyFieldName.get();
final Object keyValue = values.get(key);
final Object rowKeyValue = values.get(SchemaUtil.ROWKEY_NAME);

if (keyValue != null ^ rowKeyValue != null) {
if (keyValue == null) {
values.put(key, rowKeyValue);
// key column: the key column in the source's schema.
// key field: the column identified in the WITH clause as being an alias to the key column.

keyField.ref().ifPresent(keyFieldName -> {
final ColumnName keyColumnName = keyColumn.name();
final Object keyFieldValue = values.get(keyFieldName);
final Object keyColumnValue = values.get(keyColumnName);

if (keyFieldValue != null ^ keyColumnValue != null) {
if (keyFieldValue == null) {
values.put(keyFieldName, keyColumnValue);
} else {
values.put(SchemaUtil.ROWKEY_NAME, keyValue);
values.put(keyColumnName, keyFieldValue);
}
} else if (keyValue != null && !Objects.equals(keyValue, rowKeyValue)) {
throw new KsqlException(String.format(
"Expected ROWKEY and %s to match but got %s and %s respectively.",
key.toString(FormatOptions.noEscape()), rowKeyValue, keyValue));
} else if (keyFieldValue != null && !Objects.equals(keyFieldValue, keyColumnValue)) {
throw new KsqlException(
"Expected " + keyColumnName.text() + " and " + keyFieldName.text() + " to match "
+ "but got " + keyColumnValue + " and " + keyFieldValue + " respectively.");
}
}
});
}

private static SqlType columnType(final ColumnName column, final LogicalSchema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.planner.plan.RepartitionNode;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.Column.Namespace;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.LogicalSchema.Builder;
Expand Down Expand Up @@ -173,8 +174,11 @@ private AggregateNode buildAggregateNode(final PlanNode sourcePlanNode) {
? groupByExps.get(0)
: null;

final Optional<ColumnName> keyFieldName = getSelectAliasMatching((expression, alias) ->
expression.equals(groupBy) && !SchemaUtil.isSystemColumn(alias),
final Optional<ColumnName> keyFieldName = getSelectAliasMatching(
(expression, alias) ->
expression.equals(groupBy)
&& !SchemaUtil.isSystemColumn(alias)
&& !schema.isKeyColumn(alias),
sourcePlanNode.getSelectExpressions());

return new AggregateNode(
Expand Down Expand Up @@ -389,9 +393,7 @@ private LogicalSchema buildProjectionSchema(
final Builder builder = LogicalSchema.builder()
.withRowTime();

final List<Column> keyColumns = schema.key();

builder.keyColumns(keyColumns);
builder.keyColumns(schema.key());

for (int i = 0; i < projection.size(); i++) {
final SelectExpression select = projection.get(i);
Expand All @@ -409,26 +411,36 @@ private LogicalSchema buildAggregateSchema(
final PlanNode sourcePlanNode,
final List<Expression> groupByExps
) {
final LogicalSchema sourceSchema = sourcePlanNode.getSchema();

final ColumnName keyName;
final SqlType keyType;
if (groupByExps.size() != 1) {
keyName = SchemaUtil.ROWKEY_NAME;
keyType = SqlTypes.STRING;
} else {
final Expression expression = groupByExps.get(0);

keyName = exactlyMatchesKeyColumns(expression, sourceSchema)
? ((ColumnReferenceExp) expression).getColumnName()
: SchemaUtil.ROWKEY_NAME;

final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourcePlanNode.getSchema(), functionRegistry);
new ExpressionTypeManager(sourceSchema, functionRegistry);

keyType = typeManager.getExpressionSqlType(groupByExps.get(0));
keyType = typeManager.getExpressionSqlType(expression);
}

final LogicalSchema sourceSchema = buildProjectionSchema(
sourcePlanNode.getSchema()
final LogicalSchema projectionSchema = buildProjectionSchema(
sourceSchema
.withMetaAndKeyColsInValue(analysis.getWindowExpression().isPresent()),
sourcePlanNode.getSelectExpressions()
);

return LogicalSchema.builder()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, keyType)
.valueColumns(sourceSchema.value())
.keyColumn(keyName, keyType)
.valueColumns(projectionSchema.value())
.build();
}

Expand All @@ -438,6 +450,11 @@ private LogicalSchema buildRepartitionedSchema(
) {
final LogicalSchema sourceSchema = sourceNode.getSchema();

if (exactlyMatchesKeyColumns(partitionBy, sourceSchema)) {
// No-op:
return sourceSchema;
}

final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourceSchema, functionRegistry);

Expand All @@ -450,6 +467,30 @@ private LogicalSchema buildRepartitionedSchema(
.build();
}

private static boolean exactlyMatchesKeyColumns(
final Expression expression,
final LogicalSchema schema
) {
if (schema.key().size() != 1) {
// Currently only support single key column:
return false;
}

if (!(expression instanceof ColumnReferenceExp)) {
// Anything not a column ref can't be a match:
return false;
}

final ColumnName columnName = ((ColumnReferenceExp) expression).getColumnName();

final Namespace ns = schema
.findColumn(columnName)
.map(Column::namespace)
.orElse(Namespace.VALUE);

return ns == Namespace.KEY;
}

private static List<SelectExpression> selectWithPrependAlias(
final SourceName alias,
final LogicalSchema schema
Expand Down
Loading