Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: do not allow duplicates in schemas by default #4697

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,27 @@ public static class Builder {
private final Set<ColumnName> seenKeys = new HashSet<>();
private final Set<ColumnName> seenValues = new HashSet<>();

private boolean failOnDuplicates = true;

public Builder withRowTime() {
columns.add(IMPLICIT_TIME_COLUMN);
return this;
}

/**
* Allows key and value columns with the same name.
*
* <p>This should only be used when building the schemas used by Kafka Streams, where
* the key columns are copied into the value schema. KSQL does not support duplicate column
* names in data sources.
*
* @return self.
*/
public Builder allowDuplicates() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed offline, I think having this makes things a little more confusing. If we wanted to make it strongly typed, I think we should have a different class that allows duplicates but I think that's overkill

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, cool, my thinking was going down the same route.

failOnDuplicates = false;
return this;
}

public Builder keyColumns(final Iterable<? extends SimpleColumn> columns) {
columns.forEach(this::keyColumn);
return this;
Expand Down Expand Up @@ -344,7 +360,22 @@ public Builder valueColumn(final ColumnName name, final SqlType type) {
}

public LogicalSchema build() {
return new LogicalSchema(columns.build());
final LogicalSchema schema = new LogicalSchema(columns.build());

if (failOnDuplicates) {
final String duplicates = schema.value().stream()
.map(Column::name)
.filter(schema::isKeyColumn)
.map(ColumnName::toString)
.collect(Collectors.joining(", "));

if (!duplicates.isEmpty()) {
throw new IllegalStateException("Value column name(s) " + duplicates
+ " clashes with key column name(s).");
}
}

return schema;
}

private void addColumn(final Column column) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ public void shouldGetValueColumns() {
public void shouldPreferKeyOverValueAndMetaColumns() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
// Implicit meta ROWTIME
.allowDuplicates()
.withRowTime()
.valueColumn(ROWTIME_NAME, BIGINT)
.keyColumn(ROWTIME_NAME, BIGINT)
.build();
Expand Down Expand Up @@ -412,6 +413,7 @@ public void shouldAddMetaAndKeyColumnsToValue() {
// Then:
assertThat(result, is(LogicalSchema.builder()
.withRowTime()
.allowDuplicates()
.keyColumn(K0, INTEGER)
.keyColumn(K1, STRING)
.valueColumn(F0, STRING)
Expand Down Expand Up @@ -441,6 +443,7 @@ public void shouldAddWindowedMetaAndKeyColumnsToValue() {
// Then:
assertThat(result, is(LogicalSchema.builder()
.withRowTime()
.allowDuplicates()
.keyColumn(K0, INTEGER)
.keyColumn(K1, STRING)
.valueColumn(F0, STRING)
Expand Down Expand Up @@ -476,6 +479,7 @@ public void shouldRemoveOthersWhenAddingMetasAndKeyColumns() {
// Given:
final LogicalSchema ksqlSchema = LogicalSchema.builder()
.withRowTime()
.allowDuplicates()
.keyColumn(K0, INTEGER)
.valueColumn(F0, BIGINT)
.valueColumn(K0, DOUBLE)
Expand All @@ -489,6 +493,7 @@ public void shouldRemoveOthersWhenAddingMetasAndKeyColumns() {
// Then:
assertThat(result, is(LogicalSchema.builder()
.withRowTime()
.allowDuplicates()
.keyColumn(K0, INTEGER)
.valueColumn(F0, BIGINT)
.valueColumn(F1, BIGINT)
Expand Down Expand Up @@ -570,6 +575,7 @@ public void shouldRemoveMetaColumnsWhereEverTheyAre() {
public void shouldRemoveKeyColumnsWhereEverTheyAre() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.allowDuplicates()
.keyColumn(K0, STRING)
.valueColumn(F0, BIGINT)
.valueColumn(K0, STRING)
Expand Down Expand Up @@ -648,7 +654,7 @@ public void shouldGetKeyConnectSchema() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.keyColumn(F0, DOUBLE)
.valueColumn(F0, BIGINT)
.valueColumn(F1, BIGINT)
.build();

// When:
Expand All @@ -665,7 +671,7 @@ public void shouldGetKeyConnectSchema() {
public void shouldGetValueConnectSchema() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.keyColumn(F0, STRING)
.keyColumn(K0, STRING)
.valueColumn(F0, BIGINT)
.valueColumn(F1, STRING)
.build();
Expand Down Expand Up @@ -744,6 +750,23 @@ public void shouldOnlyMatchValueSchema() {
assertThat(schema.valueContainsAny(ImmutableSet.of(K0, V0, ROWTIME_NAME)), is(false));
}

@Test(expected = IllegalStateException.class)
public void shouldThrowOnDuplicateColumnName() {
LogicalSchema.builder()
.valueColumn(K0, STRING)
.keyColumn(K0, BIGINT)
.build();
}

@Test
public void shouldNotThrowOnDuplicateColumnName() {
LogicalSchema.builder()
.valueColumn(K0, STRING)
.keyColumn(K0, BIGINT)
.allowDuplicates()
.build();
}

private static org.apache.kafka.connect.data.Field connectField(
final String fieldName,
final int index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,8 @@ private LogicalSchema buildProjectionSchema(
);

final Builder builder = LogicalSchema.builder()
.withRowTime();
.withRowTime()
.allowDuplicates();

final List<Column> keyColumns = schema.key();

Expand Down Expand Up @@ -444,6 +445,7 @@ private LogicalSchema buildRepartitionedSchema(
final SqlType keyType = typeManager.getExpressionSqlType(partitionBy);

return LogicalSchema.builder()
.allowDuplicates()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, keyType)
.valueColumns(sourceSchema.value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private static LogicalSchema buildSchema(
final SelectValueMapper<?> mapper
) {
final LogicalSchema.Builder schemaBuilder = LogicalSchema.builder()
.allowDuplicates()
.withRowTime();

final List<Column> keyCols = sourceSchema.key();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void shouldThrowIfSchemaContainsRowTime() {
public void shouldThrowIfSchemaContainsRowKey() {
// Given:
final LogicalSchema schema = LogicalSchema.builder()
.allowDuplicates()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING)
.valueColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ private static LogicalSchema buildSchemaWithKeyType(
final SqlType rowKeyType
) {
return LogicalSchema.builder()
.allowDuplicates()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, rowKeyType)
.valueColumns(sourceSchema.value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ private LogicalSchema handleSelectKey(
.getExpressionSqlType(step.getKeyExpression());

return LogicalSchema.builder()
.allowDuplicates()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, keyType)
.valueColumns(sourceSchema.value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static LogicalSchema buildSchema(
final FunctionRegistry functionRegistry
) {
final LogicalSchema.Builder schemaBuilder = LogicalSchema.builder()
.allowDuplicates()
.withRowTime();

final List<Column> cols = inputSchema.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class StreamGroupByBuilderTest {
.withMetaAndKeyColsInValue(false);

private static final LogicalSchema REKEYED_SCHEMA = LogicalSchema.builder()
.allowDuplicates()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING)
.valueColumns(SCHEMA.value())
Expand Down Expand Up @@ -216,6 +217,7 @@ public void shouldReturnCorrectSchemaForGroupBy() {

// Then:
assertThat(result.getSchema(), is(LogicalSchema.builder()
.allowDuplicates()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING)
.valueColumns(SCHEMA.value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class StreamSelectKeyBuilderTest {
new UnqualifiedColumnReferenceExp(ColumnName.of("BOI"));

private static final LogicalSchema RESULT_SCHEMA = LogicalSchema.builder()
.allowDuplicates()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.BIGINT)
.valueColumn(ColumnName.of("BIG"), SqlTypes.BIGINT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class TableGroupByBuilderTest {
.withMetaAndKeyColsInValue(false);

private static final LogicalSchema REKEYED_SCHEMA = LogicalSchema.builder()
.allowDuplicates()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING)
.valueColumns(SCHEMA.value())
Expand Down Expand Up @@ -190,6 +191,7 @@ public void shouldReturnCorrectSchema() {

// Then:
assertThat(result.getSchema(), is(is(LogicalSchema.builder()
.allowDuplicates()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING)
.valueColumns(SCHEMA.value())
Expand Down