Skip to content

Commit

Permalink
Correctly validate joins between windowed and non-windowed sources (#…
Browse files Browse the repository at this point in the history
…4390)

* chore: fail on joins with incompatible window/non-windowed keys

Previously, KSQL did nothing to validate a join between two sources had keys with compatible windowing.

* it did not check that both sources were either windowed or non-windowed.  Joining a non-windowed source to a windowed source will never result in any output as the binary keys can never match.
* it did not check that both sources had compatible window types. Session windows include both window start and end times in the serialized key, where as hopping and tumbling windows only serialize the window start. For this reason, session windowed sources can only be joined with other session windowed sources. Hopping and tumbling windowed sources can only be joined with other hopping and tumbling windowed sources.

This commit adds these checks and suitable QTT tests to prove them.
  • Loading branch information
big-andy-coates authored Jan 29, 2020
1 parent c8ad596 commit c1faef9
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 26 deletions.
111 changes: 91 additions & 20 deletions ksql-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.confluent.ksql.execution.windows.KsqlWindowExpression;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.model.WindowType;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.FunctionName;
import io.confluent.ksql.name.SourceName;
Expand Down Expand Up @@ -66,6 +67,7 @@
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.SerdeOptions;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.HashSet;
Expand Down Expand Up @@ -354,14 +356,6 @@ protected AstNode visitJoin(final Join node, final Void context) {
throw new KsqlException("Only equality join criteria is supported.");
}

if (left.getDataSource().getName().equals(right.getDataSource().getName())) {
throw new KsqlException(
"Can not join '" + left.getDataSource().getName().toString(FormatOptions.noEscape())
+ "' to '" + right.getDataSource().getName().toString(FormatOptions.noEscape())
+ "': self joins are not yet supported."
);
}

final Set<SourceName> srcsUsedInLeft =
new ExpressionAnalyzer(analysis.getFromSourceSchemas()).analyzeExpression(
comparisonExpression.getLeft(),
Expand All @@ -378,13 +372,9 @@ protected AstNode visitJoin(final Join node, final Void context) {
final SourceName rightSourceName = getOnlySourceForJoin(
comparisonExpression.getRight(), comparisonExpression, srcsUsedInRight);

if (!validJoin(left.getAlias(), right.getAlias(), leftSourceName, rightSourceName)) {
throw new KsqlException(
"Each side of the join must reference exactly one source and not the same source. "
+ "Left side references " + leftSourceName
+ " and right references " + rightSourceName
);
}
throwOnSelfJoin(left, right);
throwOnIncompleteJoinCriteria(left, right, leftSourceName, rightSourceName);
throwOnIncompatibleSourceWindowing(left, right);

final boolean flipped = leftSourceName.equals(right.getAlias());
analysis.setJoin(new JoinInfo(
Expand All @@ -397,14 +387,95 @@ protected AstNode visitJoin(final Join node, final Void context) {
return null;
}

private boolean validJoin(
final SourceName leftName,
final SourceName rightName,
private void throwOnSelfJoin(final AliasedDataSource left, final AliasedDataSource right) {
if (left.getDataSource().getName().equals(right.getDataSource().getName())) {
throw new KsqlException(
"Can not join '" + left.getDataSource().getName().toString(FormatOptions.noEscape())
+ "' to '" + right.getDataSource().getName().toString(FormatOptions.noEscape())
+ "': self joins are not yet supported."
);
}
}

private void throwOnIncompleteJoinCriteria(
final AliasedDataSource left,
final AliasedDataSource right,
final SourceName leftExpressionSource,
final SourceName rightExpressionSource
) {
return ImmutableSet.of(leftExpressionSource, rightExpressionSource)
.containsAll(ImmutableList.of(leftName, rightName));
final boolean valid = ImmutableSet.of(leftExpressionSource, rightExpressionSource)
.containsAll(ImmutableList.of(left.getAlias(), right.getAlias()));

if (!valid) {
throw new KsqlException(
"Each side of the join must reference exactly one source and not the same source. "
+ "Left side references " + leftExpressionSource
+ " and right references " + rightExpressionSource
);
}
}

private void throwOnIncompatibleSourceWindowing(
final AliasedDataSource left,
final AliasedDataSource right
) {
final Optional<WindowType> leftWindowType = left.getDataSource()
.getKsqlTopic()
.getKeyFormat()
.getWindowInfo()
.map(WindowInfo::getType);

final Optional<WindowType> rightWindowType = right.getDataSource()
.getKsqlTopic()
.getKeyFormat()
.getWindowInfo()
.map(WindowInfo::getType);

if (leftWindowType.isPresent() != rightWindowType.isPresent()) {
throw windowedNonWindowedJoinException(left, right, leftWindowType, rightWindowType);
}

if (!leftWindowType.isPresent()) {
return;
}

final WindowType leftWt = leftWindowType.get();
final WindowType rightWt = rightWindowType.get();
final boolean compatible = leftWt == WindowType.SESSION
? rightWt == WindowType.SESSION
: rightWt == WindowType.HOPPING || rightWt == WindowType.TUMBLING;

if (!compatible) {
throw new KsqlException("Incompatible windowed sources."
+ System.lineSeparator()
+ "Left source: " + leftWt
+ System.lineSeparator()
+ "Right source: " + rightWt
+ System.lineSeparator()
+ "Session windowed sources can only be joined to other session windowed sources, "
+ "and may still not result in expected behaviour as session bounds must be an exact "
+ "match for the join to work"
+ System.lineSeparator()
+ "Hopping and tumbling windowed sources can only be joined to other hopping and "
+ "tumbling windowed sources"
);
}
}

private KsqlException windowedNonWindowedJoinException(
final AliasedDataSource left,
final AliasedDataSource right,
final Optional<WindowType> leftWindowType,
final Optional<WindowType> rightWindowType
) {
final String leftMsg = leftWindowType.map(Object::toString).orElse("not");
final String rightMsg = rightWindowType.map(Object::toString).orElse("not");
return new KsqlException("Can not join windowed source to non-windowed source."
+ System.lineSeparator()
+ left.getAlias() + " is " + leftMsg + " windowed"
+ System.lineSeparator()
+ right.getAlias() + " is " + rightMsg + " windowed"
);
}

private SourceName getOnlySourceForJoin(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private static void failPersistentQueryOnWindowedTable(final Analysis analysis)
return;
}
if (analysis.getFromDataSources().stream().anyMatch(PushQueryValidator::isWindowedTable)) {
throw new KsqlException("KSQL does not support persistent push queries on windowed tables.");
throw new KsqlException("KSQL does not support persistent queries on windowed tables.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ private LogicalSchema buildRepartitionedSchema(
final LogicalSchema sourceSchema = sourceNode.getSchema();

final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourceSchema, functionRegistry, true);
new ExpressionTypeManager(sourceSchema, functionRegistry, false);

final SqlType keyType = typeManager.getExpressionSqlType(partitionBy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ public SchemaKStream<Struct> selectKey(
}

if (keyFormat.isWindowed()) {
throw new UnsupportedOperationException("Can not selectKey of windowed stream");
throw new KsqlException("Implicit repartitioning of windowed sources is not supported. "
+ "See https://github.com/confluentinc/ksql/issues/4385.");
}

final StreamSelectKey step = ExecutionStepFactory.streamSelectKey(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void shouldThrowOnPersistentPushQueryOnWindowedTable() {
// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"KSQL does not support persistent push queries on windowed tables.");
"KSQL does not support persistent queries on windowed tables.");

// When:
validator.validate(analysis);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ private void verifyMetaStore(final MetaStore metaStore) {
final String text = values.stream()
.map(s -> s.getDataSourceType() + ":" + s.getName().name()
+ ", key:" + s.getKeyField().ref()
+ ", value:" + s.getSchema())
+ ", value:" + s.getSchema()
+ ", keyFormat:" + s.getKsqlTopic().getKeyFormat()
)
.collect(Collectors.joining(System.lineSeparator()));

assertThat("metastore sources after the statements have run:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1870,6 +1870,122 @@
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Can not join 'INPUT' to 'INPUT': self joins are not yet supported."
}
},
{
"name": "matching session-windowed",
"comments": [
"Note: the first record on the right topic intersects with the session on the right side, but no row is output as keys must",
"be an EXACT BINARY match"
],
"statements": [
"CREATE STREAM S1 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='left_topic', value_format='JSON', WINDOW_TYPE='SESSION');",
"CREATE STREAM S2 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='right_topic', value_format='JSON', WINDOW_TYPE='SESSION');",
"CREATE STREAM OUTPUT as SELECT S1.ID, S2.ID FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ROWKEY = S2.ROWKEY;"
],
"inputs": [
{"topic": "left_topic", "key": 1, "value": {"ID": 1}, "timestamp": 765, "window": {"start": 234, "end": 765, "type": "session"}},
{"topic": "right_topic", "key": 1, "value": {"ID": 2}, "timestamp": 567, "window": {"start": 234, "end": 567, "type": "session"}},
{"topic": "right_topic", "key": 1, "value": {"ID": 3}, "timestamp": 765, "window": {"start": 234, "end": 765, "type": "session"}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"S1_ID": 1, "S2_ID": 3}, "timestamp": 765, "window": {"start": 234, "end": 765, "type": "session"}}
],
"post": {
"sources": [
{
"name": "OUTPUT",
"type": "stream",
"keyFormat": {"format": "KAFKA", "windowType": "SESSION"},
"schema": "ROWKEY INT KEY, S1_ID BIGINT, S2_ID BIGINT"
}
]
}
},
{
"name": "matching time-windowed",
"comments": [
"Note: the two streams use a different window size. However, only the start of the window is serialized, so its possible to get a matching binary key",
"This may meet users requirements, hence KSQL allows such joins",
"Note: the key format is currently taken from the left source."
],
"statements": [
"CREATE STREAM S1 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='left_topic', value_format='JSON', WINDOW_TYPE='Hopping', WINDOW_SIZE='5 SECONDS');",
"CREATE STREAM S2 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='right_topic', value_format='JSON', WINDOW_TYPE='Tumbling', WINDOW_SIZE='2 SECOND');",
"CREATE STREAM OUTPUT as SELECT S1.ID, S2.ID FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ROWKEY = S2.ROWKEY;"
],
"inputs": [
{"topic": "left_topic", "key": 1, "value": {"ID": 1}, "timestamp": 0, "window": {"start": 0, "end": 5000, "type": "time"}},
{"topic": "left_topic", "key": 1, "value": {"ID": 2}, "timestamp": 1000, "window": {"start": 1000, "end": 6000, "type": "time"}},
{"topic": "left_topic", "key": 1, "value": {"ID": 3}, "timestamp": 2000, "window": {"start": 2000, "end": 7000, "type": "time"}},
{"topic": "right_topic", "key": 1, "value": {"ID": 4}, "timestamp": 0, "window": {"start": 0, "end": 2000, "type": "time"}},
{"topic": "right_topic", "key": 1, "value": {"ID": 5}, "timestamp": 2000, "window": {"start": 2000, "end": 4000, "type": "time"}}
],
"outputs": [
{"topic": "OUTPUT", "key": 1, "value": {"S1_ID": 1, "S2_ID": 4}, "timestamp": 0, "window": {"start": 0, "end":5000, "type": "time"}},
{"topic": "OUTPUT", "key": 1, "value": {"S1_ID": 3, "S2_ID": 5}, "timestamp": 2000, "window": {"start": 2000, "end":7000, "type": "time"}}
],
"post": {
"sources": [
{
"name": "OUTPUT",
"type": "stream",
"keyFormat": {"format": "KAFKA", "windowType": "HOPPING", "windowSize": 5000},
"schema": "ROWKEY INT KEY, S1_ID BIGINT, S2_ID BIGINT"
}
]
}
},
{
"name": "session - timed windowed",
"comments": [
"Session windows serialize both start and end window bounds, where as tumbling/hopping only serialize the start time.",
"Keys will never be binary compatible, and hence KSQL should disallow such joins"
],
"statements": [
"CREATE STREAM S1 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='left_topic', value_format='JSON', WINDOW_TYPE='Session');",
"CREATE STREAM S2 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='right_topic', value_format='JSON', WINDOW_TYPE='TUMBLING', WINDOW_SIZE='1 SECOND');",
"CREATE STREAM OUTPUT as SELECT * FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ROWKEY = S2.ROWKEY;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Incompatible windowed sources.\nLeft source: SESSION\nRight source: TUMBLING\nSession windowed sources can only be joined to other session windowed sources, and may still not result in expected behaviour as session bounds must be an exact match for the join to work\nHopping and tumbling windowed sources can only be joined to other hopping and tumbling windowed sources"
}
},
{
"name": "windowed - non-windowed - INT",
"statements": [
"CREATE STREAM S1 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='left_topic', value_format='JSON', WINDOW_TYPE='SESSION');",
"CREATE STREAM S2 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT * FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ROWKEY = S2.ROWKEY;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Can not join windowed source to non-windowed source.\n`S1` is SESSION windowed\n`S2` is not windowed"
}
},
{
"name": "windowed - non-windowed - STRING",
"statements": [
"CREATE STREAM S1 (ROWKEY STRING KEY, ID bigint) WITH (kafka_topic='left_topic', value_format='JSON', WINDOW_TYPE='SESSION');",
"CREATE STREAM S2 (ROWKEY STRING KEY, ID bigint) WITH (kafka_topic='right_topic', value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT * FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ROWKEY = S2.ROWKEY;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Can not join windowed source to non-windowed source.\n`S1` is SESSION windowed\n`S2` is not windowed"
}
},
{
"name": "join requiring repartition of windowed source",
"statements": [
"CREATE STREAM S1 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='left_topic', value_format='JSON', WINDOW_TYPE='SESSION');",
"CREATE STREAM S2 (ROWKEY INT KEY, ID bigint) WITH (kafka_topic='right_topic', value_format='JSON', WINDOW_TYPE='SESSION');",
"CREATE STREAM OUTPUT as SELECT * FROM S1 JOIN S2 WITHIN 1 MINUTE ON S1.ID = S2.ID;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Implicit repartitioning of windowed sources is not supported. See https://github.com/confluentinc/ksql/issues/4385."
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlException",
"message": "KSQL does not support persistent push queries on windowed tables."
"message": "KSQL does not support persistent queries on windowed tables."
}
}
]
Expand Down

0 comments on commit c1faef9

Please sign in to comment.