Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial support for arbitrary key column names. #4701

Merged
merged 3 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,28 @@ public class SourceSchemasTest {

private static final SourceName ALIAS_1 = SourceName.of("S1");
private static final SourceName ALIAS_2 = SourceName.of("S2");
private static final ColumnName COMMON_FIELD_NAME = ColumnName.of("F0");

private static final ColumnName K0 = ColumnName.of("K0");
private static final ColumnName K1 = ColumnName.of("K1");

private static final ColumnName COMMON_VALUE_FIELD_NAME = ColumnName.of("V0");
private static final ColumnName V1 = ColumnName.of("V1");
private static final ColumnName V2 = ColumnName.of("V2");

private static final LogicalSchema SCHEMA_1 = LogicalSchema.builder()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING)
.valueColumn(COMMON_FIELD_NAME, SqlTypes.STRING)
.valueColumn(ColumnName.of("F1"), SqlTypes.STRING)
.keyColumn(K0, SqlTypes.INTEGER)
.valueColumn(COMMON_VALUE_FIELD_NAME, SqlTypes.STRING)
.valueColumn(V1, SqlTypes.STRING)
.build();

private static final LogicalSchema SCHEMA_2 = LogicalSchema.builder()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.STRING)
.valueColumn(COMMON_FIELD_NAME, SqlTypes.STRING)
.valueColumn(ColumnName.of("F2"), SqlTypes.STRING)
.keyColumn(K1, SqlTypes.STRING)
.valueColumn(COMMON_VALUE_FIELD_NAME, SqlTypes.STRING)
.valueColumn(V2, SqlTypes.STRING)
.build();

private SourceSchemas sourceSchemas;
Expand Down Expand Up @@ -88,28 +96,28 @@ public void shouldFindNoField() {

@Test
public void shouldFindNoQualifiedField() {
assertThat(sourceSchemas.sourcesWithField(Optional.of(ALIAS_1), ColumnName.of("F2")), is(empty()));
assertThat(sourceSchemas.sourcesWithField(Optional.of(ALIAS_1), V2), is(empty()));
}

@Test
public void shouldFindUnqualifiedUniqueField() {
assertThat(sourceSchemas.sourcesWithField(Optional.empty(), ColumnName.of("F1")), contains(ALIAS_1));
assertThat(sourceSchemas.sourcesWithField(Optional.empty(), V1), contains(ALIAS_1));
}

@Test
public void shouldFindQualifiedUniqueField() {
assertThat(sourceSchemas.sourcesWithField(Optional.of(ALIAS_2), ColumnName.of("F2")), contains(ALIAS_2));
assertThat(sourceSchemas.sourcesWithField(Optional.of(ALIAS_2), V2), contains(ALIAS_2));
}

@Test
public void shouldFindUnqualifiedCommonField() {
assertThat(sourceSchemas.sourcesWithField(Optional.empty(), COMMON_FIELD_NAME),
assertThat(sourceSchemas.sourcesWithField(Optional.empty(), COMMON_VALUE_FIELD_NAME),
containsInAnyOrder(ALIAS_1, ALIAS_2));
}

@Test
public void shouldFindQualifiedFieldOnlyInThatSource() {
assertThat(sourceSchemas.sourcesWithField(Optional.of(ALIAS_1), COMMON_FIELD_NAME),
assertThat(sourceSchemas.sourcesWithField(Optional.of(ALIAS_1), COMMON_VALUE_FIELD_NAME),
contains(ALIAS_1));
}

Expand All @@ -125,22 +133,28 @@ public void shouldMatchNonValueFieldNameIfAliaasedMetaField() {

@Test
public void shouldMatchNonValueFieldNameIfKeyField() {
assertThat(sourceSchemas.matchesNonValueField(Optional.empty(), SchemaUtil.ROWKEY_NAME), is(true));
assertThat(sourceSchemas.matchesNonValueField(Optional.empty(), K0), is(true));
assertThat(sourceSchemas.matchesNonValueField(Optional.empty(), K1), is(true));
}

@Test
public void shouldNotMatchNonKeyFieldOnWrongSource() {
assertThat(sourceSchemas.matchesNonValueField(Optional.of(ALIAS_2), K0), is(false));
}

@Test
public void shouldMatchNonValueFieldNameIfAliasedKeyField() {
assertThat(sourceSchemas.matchesNonValueField(Optional.of(ALIAS_2), SchemaUtil.ROWKEY_NAME), is(true));
assertThat(sourceSchemas.matchesNonValueField(Optional.of(ALIAS_2), K1), is(true));
}

@Test(expected = IllegalArgumentException.class)
public void shouldThrowOnUnknownSourceWhenMatchingNonValueFields() {
sourceSchemas.matchesNonValueField(Optional.of(SourceName.of("unknown")), SchemaUtil.ROWKEY_NAME);
sourceSchemas.matchesNonValueField(Optional.of(SourceName.of("unknown")), K0);
}

@Test
public void shouldNotMatchOtherFields() {
assertThat(sourceSchemas.matchesNonValueField(Optional.of(ALIAS_2), ColumnName.of("F2")), is(false));
assertThat(sourceSchemas.matchesNonValueField(Optional.of(ALIAS_2), V2), is(false));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MetaStoreFixture;
import io.confluent.ksql.util.SchemaUtil;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -88,7 +87,7 @@ public class CodeGenRunnerTest {
private static final String COL_INVALID_JAVA = "col!Invalid:(";

private static final LogicalSchema META_STORE_SCHEMA = LogicalSchema.builder()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.BIGINT)
.keyColumn(ColumnName.of("K0"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("COL0"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("COL1"), SqlTypes.STRING)
.valueColumn(ColumnName.of("COL2"), SqlTypes.STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -863,22 +863,27 @@ public void shouldAllowNonStringKeyColumn() {
}

@Test
public void shouldThrowOnKeyColumnThatIsNotCalledRowKey() {
public void shouldNotThrowOnKeyColumnThatIsNotCalledRowKey() {
// Given:
ksqlConfig = new KsqlConfig(ImmutableMap.of(
KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED, true
));

final CreateStream statement = new CreateStream(
SOME_NAME,
TableElements.of(tableElement(KEY, "someKey", new Type(SqlTypes.STRING))),
true,
withProperties
);

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("'someKey' is an invalid KEY column name. "
+ "KSQL currently only supports KEY columns named ROWKEY.");

// When:
createSourceFactory.createStreamCommand(statement, ksqlConfig);
final CreateStreamCommand result = createSourceFactory
.createStreamCommand(statement, ksqlConfig);

// Then:
assertThat(result.getSchema().key(), contains(
keyColumn(ColumnName.of("someKey"), SqlTypes.STRING)
));
}

private void givenProperty(final String name, final Literal value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.util.MetaStoreFixture;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Optional;
import java.util.Set;
import org.hamcrest.MatcherAssert;
Expand All @@ -46,7 +45,7 @@ public class DdlCommandExecTest {
private static final SourceName TABLE_NAME = SourceName.of("t1");
private static final String TOPIC_NAME = "topic";
private static final LogicalSchema SCHEMA = new LogicalSchema.Builder()
.keyColumn(SchemaUtil.ROWKEY_NAME, SqlTypes.BIGINT)
.keyColumn(ColumnName.of("K0"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("F1"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("F2"), SqlTypes.STRING)
.build();
Expand Down
Loading