diff --git a/docs/concepts/queries/pull.md b/docs/concepts/queries/pull.md index b62765d0e69a..551a0aefbc9e 100644 --- a/docs/concepts/queries/pull.md +++ b/docs/concepts/queries/pull.md @@ -38,7 +38,7 @@ Pull query features and limitations by key. - WHERE clauses can only have constraints on the key column for non-windowed tables. -- In addition, windowed tables support bounds on WINDOWSTART using operators +- In addition, windowed tables support bounds on `WINDOWSTART` and `WINDOWEND` using operators `<=`, `<`, `=`, `>`, `>=`. - JOIN, PARTITION BY, GROUP BY and WINDOW clauses aren't supported. - SELECT statements can contain column arithmetic and function calls. @@ -53,7 +53,7 @@ timestamp within the specified time window. ```sql SELECT * FROM user_location WHERE userId = 'user19r7t33' - AND '2019-10-02T21:31:16' <= WINDOWSTART AND WINDOWSTART <= '2019-10-03T21:31:16'; + AND '2019-10-02T21:31:16' <= WINDOWSTART AND WINDOWEND <= '2019-10-03T21:31:16'; ``` API Reference diff --git a/docs/developer-guide/ksqldb-reference/select-pull-query.md b/docs/developer-guide/ksqldb-reference/select-pull-query.md index c44e64f65396..edc1ca681766 100644 --- a/docs/developer-guide/ksqldb-reference/select-pull-query.md +++ b/docs/developer-guide/ksqldb-reference/select-pull-query.md @@ -36,7 +36,7 @@ Execute a pull query by sending an HTTP request to the ksqlDB REST API, and the API responds with a single response. The WHERE clause must contain a single primary-key to retrieve and may -optionally include bounds on WINDOWSTART if the materialized table is windowed. +optionally include bounds on `WINDOWSTART` and `WINDOWEND` if the materialized table is windowed. Example ------- @@ -44,17 +44,17 @@ Example ```sql SELECT * FROM pageviews_by_region WHERE regionId = 'Region_1' - AND 1570051876000 <= WINDOWSTART AND WINDOWSTART <= 1570138276000; + AND 1570051876000 <= WINDOWSTART AND WINDOWEND <= 1570138276000; ``` -When writing logical expressions using `WINDOWSTART`, you can use ISO-8601 +When writing logical expressions using `WINDOWSTART` or `WINDOWEND`, you can use ISO-8601 formatted datestrings to represent date times. For example, the previous query is equivalent to the following: ```sql SELECT * FROM pageviews_by_region WHERE regionId = 'Region_1' - AND '2019-10-02T21:31:16' <= WINDOWSTART AND WINDOWSTART <= '2019-10-03T21:31:16'; + AND '2019-10-02T21:31:16' <= WINDOWSTART AND WINDOWEND <= '2019-10-03T21:31:16'; ``` You can specify time zones within the datestring. For example, @@ -62,5 +62,5 @@ You can specify time zones within the datestring. For example, specified within the datestring, then timestamps are interpreted in the UTC time zone. -If no bounds are placed on `WINDOWSTART`, rows are returned for all windows +If no bounds are placed on `WINDOWSTART` or `WINDOWEND`, rows are returned for all windows in the windowed table. diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java index e04690527204..482b114fe4da 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/materialization/ks/KsMaterializationFunctionalTest.java @@ -320,7 +320,7 @@ public void shouldQueryMaterializedTableForTumblingWindowed() { final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema()); final List resultAtWindowStart = - withRetry(() -> table.get(key, Range.singleton(w.start()))); + withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all())); assertThat("at exact window start", resultAtWindowStart, hasSize(1)); assertThat(resultAtWindowStart.get(0).schema(), is(schema)); @@ -328,13 +328,17 @@ public void shouldQueryMaterializedTableForTumblingWindowed() { assertThat(resultAtWindowStart.get(0).key(), is(key)); assertThat(resultAtWindowStart.get(0).value(), is(v)); + final List resultAtWindowEnd = + withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end()))); + assertThat("at exact window end", resultAtWindowEnd, hasSize(1)); + final List resultFromRange = withRetry(() -> withRetry(() -> table - .get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1))))); + .get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all()))); assertThat("range including window start", resultFromRange, is(resultAtWindowStart)); final List resultPast = withRetry(() -> table - .get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)))); + .get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all())); assertThat("past start", resultPast, is(empty()) ); }); @@ -369,7 +373,7 @@ public void shouldQueryMaterializedTableForHoppingWindowed() { final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema()); final List resultAtWindowStart = - withRetry(() -> table.get(key, Range.singleton(w.start()))); + withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all())); assertThat("at exact window start", resultAtWindowStart, hasSize(1)); assertThat(resultAtWindowStart.get(0).schema(), is(schema)); @@ -377,13 +381,17 @@ public void shouldQueryMaterializedTableForHoppingWindowed() { assertThat(resultAtWindowStart.get(0).key(), is(key)); assertThat(resultAtWindowStart.get(0).value(), is(v)); + final List resultAtWindowEnd = + withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end()))); + assertThat("at exact window end", resultAtWindowEnd, hasSize(1)); + final List resultFromRange = withRetry(() -> table - .get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)))); + .get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all())); assertThat("range including window start", resultFromRange, is(resultAtWindowStart)); final List resultPast = withRetry(() -> table - .get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)))); + .get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all())); assertThat("past start", resultPast, is(empty())); }); @@ -417,7 +425,7 @@ public void shouldQueryMaterializedTableForSessionWindowed() { final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema()); final List resultAtWindowStart = - withRetry(() -> table.get(key, Range.singleton(w.start()))); + withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all())); assertThat("at exact window start", resultAtWindowStart, hasSize(1)); assertThat(resultAtWindowStart.get(0).schema(), is(schema)); @@ -425,12 +433,16 @@ public void shouldQueryMaterializedTableForSessionWindowed() { assertThat(resultAtWindowStart.get(0).key(), is(key)); assertThat(resultAtWindowStart.get(0).value(), is(v)); + final List resultAtWindowEnd = + withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end()))); + assertThat("at exact window end", resultAtWindowEnd, hasSize(1)); + final List resultFromRange = withRetry(() -> table - .get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)))); + .get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all())); assertThat("range including window start", resultFromRange, is(resultAtWindowStart)); final List resultPast = withRetry(() -> table - .get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)))); + .get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all())); assertThat("past start", resultPast, is(empty())); }); } @@ -644,7 +656,9 @@ private static void verifyRetainedWindows( ) { rows.forEach(record -> { final Struct key = asKeyStruct(record.key().key(), query.getPhysicalSchema()); - final List resultAtWindowStart = withRetry(() -> table.get(key, Range.all())); + final List resultAtWindowStart = + withRetry(() -> table.get(key, Range.all(), Range.all())); + assertThat("Should have fewer windows retained", resultAtWindowStart, hasSize(expectedWindows.size())); diff --git a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json index 79ec86b42560..f1f07d6f720a 100644 --- a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json +++ b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json @@ -413,6 +413,45 @@ ]} ] }, + { + "name": "tumbling windowed single key lookup with window start-end range", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT ID, WINDOWSTART, WINDOWEND, ROWTIME, COUNT FROM AGGREGATE WHERE 100 <= WindowStart AND WindowEnd < 200000 AND ID='10';", + "SELECT ID, WINDOWSTART, WINDOWEND, ROWTIME, COUNT FROM AGGREGATE WHERE 12000 <= WindowStart AND WindowEnd < 14000 AND ID='10';", + "SELECT ID, WINDOWSTART, WINDOWEND, ROWTIME, COUNT FROM AGGREGATE WHERE 12000 < WindowStart AND WindowEnd <= 15000 AND ID='10';", + "SELECT ID, WINDOWSTART, WINDOWEND, ROWTIME, COUNT FROM AGGREGATE WHERE WindowStart > 17000 AND 11234756356 > WindowEnd AND ID='10';" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 12001, "key": "11", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12211, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 14253, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 15364, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 13000, 12345, 2]}}, + {"row":{"columns":["10", 14000, 15000, 14253, 1]}}, + {"row":{"columns":["10", 15000, 16000, 15364, 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 12000, 13000, 12345, 2]}} + ]}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 14000, 15000, 14253, 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} + ]} + ] + }, { "name": "hopping windowed single key lookup with window start range", "statements": [ @@ -449,6 +488,42 @@ ]} ] }, + { + "name": "hopping windowed single key lookup with window start-end range", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW HOPPING(SIZE 5 SECOND, ADVANCE BY 1 SECONDS) GROUP BY ID;", + "SELECT ID, WINDOWSTART, WINDOWEND, ROWTIME, COUNT FROM AGGREGATE WHERE ID='10' AND 7000 <= WindowStart AND WindowEnd < 16000;", + "SELECT ID, WINDOWSTART, WINDOWEND, ROWTIME, COUNT FROM AGGREGATE WHERE ID='10' AND 7000 < WindowStart AND WindowEnd <= 16000;", + "SELECT ID, WINDOWSTART, WINDOWEND, ROWTIME, COUNT FROM AGGREGATE WHERE ID='10' AND 13001 <= WindowStart AND WindowEnd < 11234756356;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 10021, "key": "11", "value": {}}, + {"topic": "test_topic", "timestamp": 10345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13251, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 7000, 12000, 10345, 1]}}, + {"row":{"columns":["10", 8000, 13000, 10345, 1]}}, + {"row":{"columns":["10", 9000, 14000, 13251, 2]}}, + {"row":{"columns":["10", 10000, 15000, 13251, 2]}} + ]}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 8000, 13000, 10345, 1]}}, + {"row":{"columns":["10", 9000, 14000, 13251, 2]}}, + {"row":{"columns":["10", 10000, 15000, 13251, 2]}}, + {"row":{"columns":["10", 11000, 16000, 13251, 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}} + ]} + ] + }, { "name": "session windowed single key lookup with window start range", "statements": [ @@ -483,7 +558,40 @@ ] }, { - "name": "tumbling windowed single key lookup with unbounded window start", + "name": "session windowed single key lookup with window start-end range", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW SESSION(5 SECOND) GROUP BY ID;", + "SELECT ID, WINDOWSTART, WINDOWEND, ROWTIME, COUNT FROM AGGREGATE WHERE ID='10' AND 10 <= WindowStart AND WindowEnd < 200000;", + "SELECT ID, WINDOWSTART, WINDOWEND, ROWTIME, COUNT FROM AGGREGATE WHERE ID='10' AND 8001 <= WindowStart AND WindowEnd < 19444;", + "SELECT ID, WINDOWSTART, WINDOWEND, ROWTIME, COUNT FROM AGGREGATE WHERE ID='10' AND 8001 < WindowStart AND WindowEnd <= 19444;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 12345, "key": "11", "value": {}}, + {"topic": "test_topic", "timestamp": 8001, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 10456, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 19444, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 8001, 10456, 10456, 2]}}, + {"row":{"columns":["10", 19444, 19444, 19444, 1]}} + ]}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 8001, 10456, 10456, 2]}} + ]}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `ROWTIME` BIGINT, `COUNT` BIGINT"}}, + {"row":{"columns":["10", 19444, 19444, 19444, 1]}} + ]} + ] + }, + { + "name": "tumbling windowed single key lookup without window bounds", "statements": [ "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", @@ -505,7 +613,7 @@ ] }, { - "name": "hopping windowed single key lookup with unbounded window start", + "name": "hopping windowed single key lookup without window bounds", "statements": [ "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW HOPPING(SIZE 4 SECOND, ADVANCE BY 1 SECONDS) GROUP BY ID;", @@ -532,7 +640,7 @@ ] }, { - "name": "session windowed single key lookup with unbounded window start", + "name": "session windowed single key lookup without window bounds", "statements": [ "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW SESSION(10 SECOND) GROUP BY ID;", @@ -721,10 +829,11 @@ "statements": [ "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", - "SELECT * FROM AGGREGATE WHERE ID='10' AND WindowStart='2020-02-23T23:45:12.000';" + "SELECT * FROM AGGREGATE WHERE ID='10' AND WindowStart>='2020-02-23T23:45:12.000' AND WindowEnd<'2020-02-23T23:55:12.000';" ], "inputs": [ {"topic": "test_topic", "timestamp": 1582501512456, "key": "11", "value": {}}, + {"topic": "test_topic", "timestamp": 1582501512456, "key": "10", "value": {}}, {"topic": "test_topic", "timestamp": 1582501512456, "key": "10", "value": {}} ], "responses": [ @@ -1000,6 +1109,19 @@ "status": 400 } }, + { + "name": "fail if WINDOWEND used in non-windowed pull query", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND WINDOWEND=10;" + ], + "expectedError": { + "type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage", + "message": "WHERE column 'WINDOWEND' cannot be resolved", + "status": 400 + } + }, { "name": "window bounds in projection UDF", "statements": [ @@ -1118,6 +1240,260 @@ "message": "Pull queries are not supported on streams.", "status": 400 } + }, + { + "name": "window start lower bound", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND 13000 <= WindowStart;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",13000,14000,1]}} + ]} + ] + }, + { + "name": "window end lower bound", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND 14000 <= WindowEnd;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",13000,14000,1]}} + ]} + ] + }, + { + "name": "window start upper bound", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND WindowStart <= 12000;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",11000,12000,1]}}, + {"row":{"columns":["10",12000,13000,1]}} + ]} + ] + }, + { + "name": "window end upper bound", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND WindowEnd <= 13000;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",11000,12000,1]}}, + {"row":{"columns":["10",12000,13000,1]}} + ]} + ] + }, + { + "name": "window range - start-start", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND 12000 <= WindowStart AND WindowStart < 13000;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",12000,13000,1]}} + ]} + ] + }, + { + "name": "window range - end-end", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND 13000 <= WindowEnd AND WindowEnd < 14000;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",12000,13000,1]}} + ]} + ] + }, + { + "name": "window range - start-end", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND 12000 <= WindowStart AND WindowEnd < 14000;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",12000,13000,1]}} + ]} + ] + }, + { + "name": "window range - end-start", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND 13000 <= WindowEnd AND WindowStart < 13000;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",12000,13000,1]}} + ]} + ] + }, + { + "name": "window range - double lower bound - start highest", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND 13000 <=WindowStart AND 13000 <= WindowEnd;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",13000,14000,1]}} + ]} + ] + }, + { + "name": "window range - double lower bound - end highest", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND 12000 <=WindowStart AND 14000 <= WindowEnd;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",13000,14000,1]}} + ]} + ] + }, + { + "name": "window range - double upper bound - start lowest", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND WindowStart < 12000 AND WindowEnd < 14000;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",11000,12000,1]}} + ]} + ] + }, + { + "name": "window range - double upper bound - end lowest", + "statements": [ + "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", + "SELECT * FROM AGGREGATE WHERE ID='10' AND WindowStart <= 12000 AND WindowEnd < 13000;" + ], + "inputs": [ + {"topic": "test_topic", "timestamp": 11345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {}}, + {"topic": "test_topic", "timestamp": 13345, "key": "10", "value": {}} + ], + "responses": [ + {"admin": {"@type": "currentStatus"}}, + {"admin": {"@type": "currentStatus"}}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["10",11000,12000,1]}} + ]} + ] } ] } \ No newline at end of file diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java index 734671bec2bd..03e35a227456 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/PullQueryExecutor.java @@ -92,8 +92,8 @@ import io.confluent.ksql.serde.SerdeOption; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; +import io.confluent.ksql.util.GrammaticalJoiner; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlRequestConfig; import io.confluent.ksql.util.KsqlServerException; @@ -130,8 +130,11 @@ public final class PullQueryExecutor { Type.LESS_THAN_OR_EQUAL ); + private static final String VALID_WINDOW_BOUNDS_COLUMNS = + GrammaticalJoiner.and().join(SystemColumns.windowBoundsColumnNames()); + private static final String VALID_WINDOW_BOUNDS_TYPES_STRING = - VALID_WINDOW_BOUNDS_TYPES.toString(); + GrammaticalJoiner.and().join(VALID_WINDOW_BOUNDS_TYPES); private final KsqlExecutionContext executionContext; private final RoutingFilterFactory routingFilterFactory; @@ -313,11 +316,11 @@ private static TableRows queryRowsLocally( final PullQueryContext pullQueryContext ) { final Result result; - if (pullQueryContext.whereInfo.windowStartBounds.isPresent()) { - final Range windowStart = pullQueryContext.whereInfo.windowStartBounds.get(); + if (pullQueryContext.whereInfo.windowBounds.isPresent()) { + final WindowBounds windowBounds = pullQueryContext.whereInfo.windowBounds.get(); final List rows = pullQueryContext.mat.windowed() - .get(pullQueryContext.key, windowStart); + .get(pullQueryContext.key, windowBounds.start, windowBounds.end); result = new Result(pullQueryContext.mat.schema(), rows); } else { @@ -466,43 +469,33 @@ private PullQueryContext( this.pullQueryMetrics = Objects.requireNonNull( pullQueryMetrics, "pullQueryExecutorMetrics"); } + } - public Struct getKey() { - return key; - } - - public Materialization getMat() { - return mat; - } - - public ImmutableAnalysis getAnalysis() { - return analysis; - } - - public WhereInfo getWhereInfo() { - return whereInfo; - } + private static final class WindowBounds { - public QueryId getQueryId() { - return queryId; - } + private final Range start; + private final Range end; - public QueryContext.Stacker getContextStacker() { - return contextStacker; + private WindowBounds( + final Range start, + final Range end + ) { + this.start = Objects.requireNonNull(start, "startBounds"); + this.end = Objects.requireNonNull(end, "endBounds"); } } private static final class WhereInfo { private final Object keyBound; - private final Optional> windowStartBounds; + private final Optional windowBounds; private WhereInfo( final Object keyBound, - final Optional> windowStartBounds + final Optional windowBounds ) { this.keyBound = keyBound; - this.windowStartBounds = windowStartBounds; + this.windowBounds = Objects.requireNonNull(windowBounds); } } @@ -551,12 +544,10 @@ private static WhereInfo extractWhereInfo( return new WhereInfo(key, Optional.empty()); } - final Optional> windowBoundsComparison = - Optional.ofNullable(comparisons.get(ComparisonTarget.WINDOWSTART)); - - final Range windowStart = extractWhereClauseWindowBounds(windowBoundsComparison); + final WindowBounds windowBounds = + extractWhereClauseWindowBounds(comparisons); - return new WhereInfo(key, Optional.of(windowStart)); + return new WhereInfo(key, Optional.of(windowBounds)); } private static Object extractKeyWhereClause( @@ -605,24 +596,34 @@ private static Object coerceKey( .orElse(null); } + private static WindowBounds extractWhereClauseWindowBounds( + final Map> allComparisons + ) { + return new WindowBounds( + extractWhereClauseWindowBounds(ComparisonTarget.WINDOWSTART, allComparisons), + extractWhereClauseWindowBounds(ComparisonTarget.WINDOWEND, allComparisons) + ); + } + private static Range extractWhereClauseWindowBounds( - final Optional> maybeComparisons + final ComparisonTarget windowType, + final Map> allComparisons ) { - if (!maybeComparisons.isPresent()) { + final List comparisons = + Optional.ofNullable(allComparisons.get(windowType)) + .orElseGet(ImmutableList::of); + + if (comparisons.isEmpty()) { return Range.all(); } - final List comparisons = maybeComparisons.get(); - final Map> byType = comparisons.stream() .collect(Collectors.groupingBy(PullQueryExecutor::getSimplifiedBoundType)); final SetView unsupported = Sets.difference(byType.keySet(), VALID_WINDOW_BOUNDS_TYPES); if (!unsupported.isEmpty()) { throw invalidWhereClauseException( - "Unsupported " + ComparisonTarget.WINDOWSTART + " bounds: " + unsupported, - true - ); + "Unsupported " + windowType + " bounds: " + unsupported, true); } final String duplicates = byType.entrySet().stream() @@ -632,9 +633,7 @@ private static Range extractWhereClauseWindowBounds( if (!duplicates.isEmpty()) { throw invalidWhereClauseException( - "Duplicate bounds on " + ComparisonTarget.WINDOWSTART + ": " + duplicates, - true - ); + "Duplicate " + windowType + " bounds on: " + duplicates, true); } final Map singles = byType.entrySet().stream() @@ -644,8 +643,7 @@ private static Range extractWhereClauseWindowBounds( if (equals != null) { if (byType.size() > 1) { throw invalidWhereClauseException( - "`" + equals + "` cannot be combined with other bounds on " - + ComparisonTarget.WINDOWSTART, + "`" + equals + "` cannot be combined with other " + windowType + " bounds", true ); } @@ -744,14 +742,15 @@ private static Instant asInstant(final Expression other) { } throw invalidWhereClauseException( - ComparisonTarget.WINDOWSTART + " bounds must be BIGINT", + "Window bounds must be an INT, BIGINT or STRING containing a datetime.", true ); } private enum ComparisonTarget { KEYCOL, - WINDOWSTART + WINDOWSTART, + WINDOWEND } private static Map> extractComparisons( @@ -804,6 +803,10 @@ private static ComparisonTarget extractWhereClauseTarget( return ComparisonTarget.WINDOWSTART; } + if (columnName.equals(SystemColumns.WINDOWEND_NAME)) { + return ComparisonTarget.WINDOWEND; + } + final ColumnName keyColumn = Iterables.getOnlyElement(query.getLogicalSchema().key()).name(); if (columnName.equals(keyColumn)) { return ComparisonTarget.KEYCOL; @@ -1013,17 +1016,11 @@ private static KsqlException invalidWhereClauseException( final String additional = !windowed ? "" : System.lineSeparator() - + " - (optionally) limits the time bounds of the windowed table. This can be: " - + System.lineSeparator() - + " + a single window lower bound, e.g. `WHERE WINDOWSTART = z`, or" - + System.lineSeparator() - + " + a range, e.g. `WHERE a <= WINDOWSTART AND WINDOWSTART < b" + + " - (optionally) limits the time bounds of the windowed table." + System.lineSeparator() - + "WINDOWSTART currently supports operators: " + VALID_WINDOW_BOUNDS_TYPES_STRING + + "\t Bounds on " + VALID_WINDOW_BOUNDS_COLUMNS + " are supported" + System.lineSeparator() - + "WINDOWSTART currently comparison with epoch milliseconds " - + "or a datetime string in the form: " + KsqlConstants.DATE_TIME_PATTERN - + " with an optional numeric 4-digit timezone, e.g. '+0100'"; + + "\t Supported operators are " + VALID_WINDOW_BOUNDS_TYPES_STRING; return new KsqlException(msg + ". " + PullQueryValidator.PULL_QUERY_SYNTAX_HELP diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java index 55d7b8e3a1ce..f5a5a0a401eb 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterialization.java @@ -151,8 +151,12 @@ final class KsqlMaterializedWindowedTable implements MaterializedWindowedTable { } @Override - public List get(final Struct key, final Range windowStart) { - final List result = table.get(key, windowStart); + public List get( + final Struct key, + final Range windowStart, + final Range windowEnd + ) { + final List result = table.get(key, windowStart, windowEnd); final Builder builder = ImmutableList.builder(); diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/MaterializedWindowedTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/MaterializedWindowedTable.java index 57712ab779ef..3411c8ccfa52 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/MaterializedWindowedTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/MaterializedWindowedTable.java @@ -31,7 +31,8 @@ public interface MaterializedWindowedTable { * * @param key the key to look up. * @param windowStart the bounds on the window's start time. + * @param windowEnd the bounds on the window's end time. * @return the rows for the key that exist within the range. */ - List get(Struct key, Range windowStart); + List get(Struct key, Range windowStart, Range windowEnd); } diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java index 265273eb57e2..e45fe0ea5db9 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java @@ -27,6 +27,7 @@ import java.util.Objects; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreTypes; @@ -46,13 +47,14 @@ class KsMaterializedSessionTable implements MaterializedWindowedTable { @Override public List get( final Struct key, - final Range windowStart + final Range windowStart, + final Range windowEnd ) { try { final ReadOnlySessionStore store = stateStore .store(QueryableStoreTypes.sessionStore()); - return findSession(store, key, windowStart); + return findSession(store, key, windowStart, windowEnd); } catch (final Exception e) { throw new MaterializationException("Failed to get value from materialized table", e); } @@ -61,7 +63,8 @@ public List get( private List findSession( final ReadOnlySessionStore store, final Struct key, - final Range windowStart + final Range windowStart, + final Range windowEnd ) { try (KeyValueIterator, GenericRow> it = store.fetch(key)) { @@ -69,20 +72,26 @@ private List findSession( while (it.hasNext()) { final KeyValue, GenericRow> next = it.next(); + final Window wnd = next.key.window(); - if (windowStart.contains(next.key.window().startTime())) { + if (!windowStart.contains(wnd.startTime())) { + continue; + } + + if (!windowEnd.contains(wnd.endTime())) { + continue; + } - final long rowTime = next.key.window().end(); + final long rowTime = wnd.end(); - final WindowedRow row = WindowedRow.of( - stateStore.schema(), - next.key, - next.value, - rowTime - ); + final WindowedRow row = WindowedRow.of( + stateStore.schema(), + next.key, + next.value, + rowTime + ); - builder.add(row); - } + builder.add(row); } return builder.build(); diff --git a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java index c2c224605d5b..34a51c0730f6 100644 --- a/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java +++ b/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java @@ -51,19 +51,16 @@ class KsMaterializedWindowTable implements MaterializedWindowedTable { @Override public List get( final Struct key, - final Range windowStartBounds + final Range windowStartBounds, + final Range windowEndBounds ) { try { final ReadOnlyWindowStore> store = stateStore .store(QueryableStoreTypes.timestampedWindowStore()); - final Instant lower = windowStartBounds.hasLowerBound() - ? windowStartBounds.lowerEndpoint() - : Instant.ofEpochMilli(0); + final Instant lower = calculateLowerBound(windowStartBounds, windowEndBounds); - final Instant upper = windowStartBounds.hasUpperBound() - ? windowStartBounds.upperEndpoint() - : Instant.ofEpochMilli(Long.MAX_VALUE); + final Instant upper = calculateUpperBound(windowStartBounds, windowEndBounds); try (WindowStoreIterator> it = store.fetch(key, lower, upper)) { @@ -71,24 +68,28 @@ public List get( while (it.hasNext()) { final KeyValue> next = it.next(); - final Instant windowStart = Instant.ofEpochMilli(next.key); - if (windowStartBounds.contains(windowStart)) { + final Instant windowStart = Instant.ofEpochMilli(next.key); + if (!windowStartBounds.contains(windowStart)) { + continue; + } - final Instant windowEnd = windowStart.plus(windowSize); + final Instant windowEnd = windowStart.plus(windowSize); + if (!windowEndBounds.contains(windowEnd)) { + continue; + } - final TimeWindow window = - new TimeWindow(windowStart.toEpochMilli(), windowEnd.toEpochMilli()); + final TimeWindow window = + new TimeWindow(windowStart.toEpochMilli(), windowEnd.toEpochMilli()); - final WindowedRow row = WindowedRow.of( - stateStore.schema(), - new Windowed<>(key, window), - next.value.value(), - next.value.timestamp() - ); + final WindowedRow row = WindowedRow.of( + stateStore.schema(), + new Windowed<>(key, window), + next.value.value(), + next.value.timestamp() + ); - builder.add(row); - } + builder.add(row); } return builder.build(); @@ -97,4 +98,34 @@ public List get( throw new MaterializationException("Failed to get value from materialized table", e); } } + + private Instant calculateUpperBound( + final Range windowStartBounds, + final Range windowEndBounds + ) { + final Instant start = windowStartBounds.hasUpperBound() + ? windowStartBounds.upperEndpoint() + : Instant.ofEpochMilli(Long.MAX_VALUE); + + final Instant end = windowEndBounds.hasUpperBound() + ? windowEndBounds.upperEndpoint().minus(windowSize) + : Instant.ofEpochMilli(Long.MAX_VALUE); + + return start.compareTo(end) < 0 ? start : end; + } + + private Instant calculateLowerBound( + final Range windowStartBounds, + final Range windowEndBounds + ) { + final Instant start = windowStartBounds.hasLowerBound() + ? windowStartBounds.lowerEndpoint() + : Instant.ofEpochMilli(0); + + final Instant end = windowEndBounds.hasLowerBound() + ? windowEndBounds.lowerEndpoint().minus(windowSize) + : Instant.ofEpochMilli(0); + + return start.compareTo(end) < 0 ? end : start; + } } diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java index ad61104f4930..3307e41faa7f 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/KsqlMaterializationTest.java @@ -67,10 +67,17 @@ public class KsqlMaterializationTest { private static final Struct A_KEY = StructKeyUtil .keyBuilder(ColumnName.of("k0"), SqlTypes.STRING).build("k"); private static final long A_ROWTIME = 12335L; + private static final Range WINDOW_START_BOUNDS = Range.closed( Instant.now(), Instant.now().plusSeconds(10) ); + + private static final Range WINDOW_END_BOUNDS = Range.closed( + Instant.now().plusSeconds(1), + Instant.now().plusSeconds(11) + ); + private static final GenericRow A_VALUE = GenericRow.genericRow("a", "b"); private static final GenericRow TRANSFORMED = GenericRow.genericRow("x", "y"); private static final Window A_WINDOW = Window.of(Instant.now(), Instant.now().plusMillis(10)); @@ -118,7 +125,7 @@ public void setUp() { when(inner.windowed()).thenReturn(innerWindowed); when(innerNonWindowed.get(any())).thenReturn(Optional.of(ROW)); - when(innerWindowed.get(any(), any())).thenReturn(ImmutableList.of(WINDOWED_ROW)); + when(innerWindowed.get(any(), any(), any())).thenReturn(ImmutableList.of(WINDOWED_ROW)); } @SuppressWarnings("UnstableApiUsage") @@ -201,10 +208,10 @@ public void shouldCallInnerWindowedWithCorrectParamsOnGet() { givenNoopTransforms(); // When: - table.get(A_KEY, WINDOW_START_BOUNDS); + table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: - verify(innerWindowed).get(A_KEY, WINDOW_START_BOUNDS); + verify(innerWindowed).get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); } @Test @@ -227,7 +234,7 @@ public void shouldCallFilterWithCorrectValuesOnWindowedGet() { givenNoopTransforms(); // When: - table.get(A_KEY, WINDOW_START_BOUNDS); + table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: verify(filter).apply( @@ -255,11 +262,11 @@ public void shouldReturnEmptyIfInnerNonWindowedReturnsEmpty() { public void shouldReturnEmptyIfInnerWindowedReturnsEmpty() { // Given: final MaterializedWindowedTable table = materialization.windowed(); - when(innerWindowed.get(any(), any())).thenReturn(ImmutableList.of()); + when(innerWindowed.get(any(), any(), any())).thenReturn(ImmutableList.of()); givenNoopTransforms(); // When: - final List result = table.get(A_KEY, WINDOW_START_BOUNDS); + final List result = table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: assertThat(result, is(empty())); @@ -287,7 +294,7 @@ public void shouldFilterWindowed() { when(filter.apply(any(), any(), any())).thenReturn(Optional.empty()); // When: - final List result = table.get(A_KEY, WINDOW_START_BOUNDS); + final List result = table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: assertThat(result, is(empty())); @@ -315,7 +322,7 @@ public void shouldCallTransformsInOrderForWindowed() { givenNoopTransforms(); // When: - table.get(A_KEY, WINDOW_START_BOUNDS); + table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: final InOrder inOrder = inOrder(filter, project); @@ -345,7 +352,7 @@ public void shouldPipeTransformsWindowed() { when(project.apply(any(), any(), any())).thenReturn(Optional.of(TRANSFORMED)); // When: - table.get(A_KEY, WINDOW_START_BOUNDS); + table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: verify(filter).apply( @@ -381,7 +388,7 @@ public void shouldReturnSelectTransformedFromWindowed() { when(project.apply(any(), any(), any())).thenReturn(Optional.of(TRANSFORMED)); // When: - final List result = table.get(A_KEY, WINDOW_START_BOUNDS); + final List result = table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: assertThat(result, hasSize(1)); @@ -412,10 +419,10 @@ public void shouldMaintainResultOrdering() { WindowedRow.of(SCHEMA, new Windowed<>(A_KEY, window3), A_VALUE, A_ROWTIME) ); - when(innerWindowed.get(any(), any())).thenReturn(rows); + when(innerWindowed.get(any(), any(), any())).thenReturn(rows); // When: - final List result = table.get(A_KEY, WINDOW_START_BOUNDS); + final List result = table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: assertThat(result, hasSize(rows.size())); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java index cccb0b653d26..4c2a40d2de85 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTableTest.java @@ -68,11 +68,17 @@ public class KsMaterializedSessionTableTest { private static final Instant LOWER_INSTANT = Instant.now(); private static final Instant UPPER_INSTANT = LOWER_INSTANT.plusSeconds(10); + private static final Range WINDOW_START_BOUNDS = Range.closed( LOWER_INSTANT, UPPER_INSTANT ); + private static final Range WINDOW_END_BOUNDS = Range.closed( + LOWER_INSTANT, + UPPER_INSTANT + ); + @Mock private KsStateStore stateStore; @Mock @@ -115,7 +121,7 @@ public void shouldThrowIfGettingStateStoreFails() { // When: final Exception e = assertThrows( MaterializationException.class, - () -> table.get(A_KEY, WINDOW_START_BOUNDS) + () -> table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS) ); // Then: @@ -132,7 +138,7 @@ public void shouldThrowIfStoreFetchFails() { // When: final Exception e = assertThrows( MaterializationException.class, - () -> table.get(A_KEY, WINDOW_START_BOUNDS) + () -> table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS) ); // Then: @@ -145,7 +151,7 @@ public void shouldThrowIfStoreFetchFails() { @Test public void shouldGetStoreWithCorrectParams() { // When: - table.get(A_KEY, WINDOW_START_BOUNDS); + table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: verify(stateStore).store(any(SessionStoreType.class)); @@ -154,7 +160,7 @@ public void shouldGetStoreWithCorrectParams() { @Test public void shouldFetchWithCorrectParams() { // When: - table.get(A_KEY, WINDOW_START_BOUNDS); + table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: verify(sessionStore).fetch(A_KEY); @@ -163,7 +169,7 @@ public void shouldFetchWithCorrectParams() { @Test public void shouldCloseIterator() { // When: - table.get(A_KEY, WINDOW_START_BOUNDS); + table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: verify(fetchIterator).close(); @@ -172,7 +178,7 @@ public void shouldCloseIterator() { @Test public void shouldReturnEmptyIfKeyNotPresent() { // When: - final List result = table.get(A_KEY, WINDOW_START_BOUNDS); + final List result = table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: assertThat(result, is(empty())); @@ -184,7 +190,7 @@ public void shouldIgnoreSessionsThatFinishBeforeLowerBound() { givenSingleSession(LOWER_INSTANT.minusMillis(1), LOWER_INSTANT.minusMillis(1)); // When: - final List result = table.get(A_KEY, WINDOW_START_BOUNDS); + final List result = table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: assertThat(result, is(empty())); @@ -196,14 +202,14 @@ public void shouldIgnoreSessionsThatStartAfterUpperBound() { givenSingleSession(UPPER_INSTANT.plusMillis(1), UPPER_INSTANT.plusMillis(1)); // When: - final List result = table.get(A_KEY, WINDOW_START_BOUNDS); + final List result = table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: assertThat(result, is(empty())); } @Test - public void shouldReturnValueIfSessionStartsAtLowerBoundIfLowerBoundClosed() { + public void shouldReturnValueIfSessionStartsAtLowerBoundIfLowerStartBoundClosed() { // Given: final Range startBounds = Range.closed( LOWER_INSTANT, @@ -214,7 +220,7 @@ public void shouldReturnValueIfSessionStartsAtLowerBoundIfLowerBoundClosed() { givenSingleSession(LOWER_INSTANT, wend); // When: - final List result = table.get(A_KEY, startBounds); + final List result = table.get(A_KEY, startBounds, Range.all()); // Then: assertThat(result, contains(WindowedRow.of( @@ -236,7 +242,7 @@ public void shouldIgnoreSessionsThatStartAtLowerBoundIfLowerBoundOpen() { givenSingleSession(LOWER_INSTANT, LOWER_INSTANT.plusMillis(1)); // When: - final List result = table.get(A_KEY, startBounds); + final List result = table.get(A_KEY, startBounds, Range.all()); // Then: assertThat(result, is(empty())); @@ -254,7 +260,7 @@ public void shouldReturnValueIfSessionStartsAtUpperBoundIfUpperBoundClosed() { givenSingleSession(UPPER_INSTANT, wend); // When: - final List result = table.get(A_KEY, startBounds); + final List result = table.get(A_KEY, startBounds, Range.all()); // Then: assertThat(result, contains(WindowedRow.of( @@ -276,7 +282,7 @@ public void shouldIgnoreSessionsThatStartAtUpperBoundIfUpperBoundOpen() { givenSingleSession(UPPER_INSTANT, UPPER_INSTANT.plusMillis(1)); // When: - final List result = table.get(A_KEY, startBounds); + final List result = table.get(A_KEY, startBounds, Range.all()); // Then: assertThat(result, is(empty())); @@ -289,7 +295,7 @@ public void shouldReturnValueIfSessionStartsBetweenBounds() { givenSingleSession(LOWER_INSTANT.plusMillis(1), wend); // When: - final List result = table.get(A_KEY, WINDOW_START_BOUNDS); + final List result = table.get(A_KEY, WINDOW_START_BOUNDS, Range.all()); // Then: assertThat(result, contains(WindowedRow.of( @@ -300,6 +306,105 @@ public void shouldReturnValueIfSessionStartsBetweenBounds() { ))); } + @Test + public void shouldReturnValueIfSessionEndsAtLowerBoundIfLowerStartBoundClosed() { + // Given: + final Range endBounds = Range.closed( + LOWER_INSTANT, + UPPER_INSTANT + ); + + final Instant wstart = LOWER_INSTANT.minusMillis(1); + givenSingleSession(wstart, LOWER_INSTANT); + + // When: + final List result = table.get(A_KEY, Range.all(), endBounds); + + // Then: + assertThat(result, contains(WindowedRow.of( + SCHEMA, + sessionKey(wstart, LOWER_INSTANT), + A_VALUE, + LOWER_INSTANT.toEpochMilli() + ))); + } + + @Test + public void shouldIgnoreSessionsThatEndAtLowerBoundIfLowerBoundOpen() { + // Given: + final Range endBounds = Range.openClosed( + LOWER_INSTANT, + UPPER_INSTANT + ); + + givenSingleSession(LOWER_INSTANT.minusMillis(1), LOWER_INSTANT); + + // When: + final List result = table.get(A_KEY, Range.all(), endBounds); + + // Then: + assertThat(result, is(empty())); + } + + @Test + public void shouldReturnValueIfSessionEndsAtUpperBoundIfUpperBoundClosed() { + // Given: + final Range endBounds = Range.closed( + LOWER_INSTANT, + UPPER_INSTANT + ); + + final Instant wstart = UPPER_INSTANT.minusMillis(1); + givenSingleSession(wstart, UPPER_INSTANT); + + // When: + final List result = table.get(A_KEY, Range.all(), endBounds); + + // Then: + assertThat(result, contains(WindowedRow.of( + SCHEMA, + sessionKey(wstart, UPPER_INSTANT), + A_VALUE, + UPPER_INSTANT.toEpochMilli() + ))); + } + + @Test + public void shouldIgnoreSessionsThatEndAtUpperBoundIfUpperBoundOpen() { + // Given: + final Range endBounds = Range.closedOpen( + LOWER_INSTANT, + UPPER_INSTANT + ); + + givenSingleSession(UPPER_INSTANT.minusMillis(1), UPPER_INSTANT); + + // When: + final List result = table.get(A_KEY, Range.all(), endBounds); + + // Then: + assertThat(result, is(empty())); + } + + @Test + public void shouldReturnValueIfSessionEndsBetweenBounds() { + // Given: + final Instant wstart = LOWER_INSTANT.minusMillis(5); + final Instant wend = UPPER_INSTANT.minusMillis(1); + givenSingleSession(wstart, wend); + + // When: + final List result = table.get(A_KEY, Range.all(), WINDOW_END_BOUNDS); + + // Then: + assertThat(result, contains(WindowedRow.of( + SCHEMA, + sessionKey(wstart, wend), + A_VALUE, + wend.toEpochMilli() + ))); + } + @Test public void shouldReturnMultipleSessions() { // Given: @@ -311,7 +416,7 @@ public void shouldReturnMultipleSessions() { givenSingleSession(UPPER_INSTANT.plusMillis(1), UPPER_INSTANT.plusSeconds(1)); // When: - final List result = table.get(A_KEY, WINDOW_START_BOUNDS); + final List result = table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: assertThat(result, contains( @@ -331,13 +436,13 @@ public void shouldReturnMultipleSessions() { } @Test - public void shouldReturnAllSessionsForRangeall() { + public void shouldReturnAllSessionsForRangeAll() { // Given: givenSingleSession(Instant.now().minusSeconds(1000), Instant.now().plusSeconds(1000)); givenSingleSession(Instant.now().minusSeconds(1000), Instant.now().plusSeconds(1000)); // When: - final List result = table.get(A_KEY, Range.all()); + final List result = table.get(A_KEY, Range.all(), Range.all()); // Then: assertThat(result, hasSize(2)); diff --git a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java index b4453dda1787..2e3a5021cbac 100644 --- a/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java +++ b/ksqldb-streams/src/test/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -69,9 +70,16 @@ public class KsMaterializedWindowTableTest { private static final Struct A_KEY = StructKeyUtil .keyBuilder(ColumnName.of("K0"), SqlTypes.STRING).build("x"); + protected static final Instant NOW = Instant.now(); + private static final Range WINDOW_START_BOUNDS = Range.closed( - Instant.now(), - Instant.now().plusSeconds(10) + NOW, + NOW.plusSeconds(10) + ); + + private static final Range WINDOW_END_BOUNDS = Range.closed( + NOW.plusSeconds(5).plus(WINDOW_SIZE), + NOW.plusSeconds(15).plus(WINDOW_SIZE) ); private static final ValueAndTimestamp VALUE_1 = ValueAndTimestamp @@ -117,7 +125,7 @@ public void shouldThrowIfGettingStateStoreFails() { // When: final Exception e = assertThrows( MaterializationException.class, - () -> table.get(A_KEY, WINDOW_START_BOUNDS) + () -> table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS) ); // Then: @@ -135,7 +143,7 @@ public void shouldThrowIfStoreFetchFails() { // When: final Exception e = assertThrows( MaterializationException.class, - () -> table.get(A_KEY, WINDOW_START_BOUNDS) + () -> table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS) ); // Then: @@ -147,30 +155,147 @@ public void shouldThrowIfStoreFetchFails() { @Test public void shouldGetStoreWithCorrectParams() { // When: - table.get(A_KEY, WINDOW_START_BOUNDS); + table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: verify(stateStore).store(storeTypeCaptor.capture()); - assertThat(storeTypeCaptor.getValue().getClass().getSimpleName(), is("TimestampedWindowStoreType")); + assertThat(storeTypeCaptor.getValue().getClass().getSimpleName(), + is("TimestampedWindowStoreType")); } @Test - public void shouldFetchWithCorrectParams() { + public void shouldFetchWithCorrectKey() { + // Given: // When: - table.get(A_KEY, WINDOW_START_BOUNDS); + table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); + + // Then: + verify(tableStore).fetch(eq(A_KEY), any(), any()); + } + + @Test + public void shouldFetchWithNoBounds() { + // When: + table.get(A_KEY, Range.all(), Range.all()); // Then: verify(tableStore).fetch( - A_KEY, - WINDOW_START_BOUNDS.lowerEndpoint(), - WINDOW_START_BOUNDS.upperEndpoint() + any(), + eq(Instant.ofEpochMilli(0)), + eq(Instant.ofEpochMilli(Long.MAX_VALUE)) + ); + } + + @Test + public void shouldFetchWithOnlyStartBounds() { + // When: + table.get(A_KEY, WINDOW_START_BOUNDS, Range.all()); + + // Then: + verify(tableStore).fetch( + any(), + eq(WINDOW_START_BOUNDS.lowerEndpoint()), + eq(WINDOW_START_BOUNDS.upperEndpoint()) ); } + @Test + public void shouldFetchWithOnlyEndBounds() { + // When: + table.get(A_KEY, Range.all(), WINDOW_END_BOUNDS); + + // Then: + verify(tableStore).fetch( + any(), + eq(WINDOW_END_BOUNDS.lowerEndpoint().minus(WINDOW_SIZE)), + eq(WINDOW_END_BOUNDS.upperEndpoint().minus(WINDOW_SIZE)) + ); + } + + @Test + public void shouldFetchWithStartLowerBoundIfHighest() { + // Given: + final Range startBounds = Range.closed( + NOW.plusSeconds(5), + NOW.plusSeconds(10) + ); + + final Range endBounds = Range.closed( + NOW, + NOW.plusSeconds(15).plus(WINDOW_SIZE) + ); + + // When: + table.get(A_KEY, startBounds, endBounds); + + // Then: + verify(tableStore).fetch(any(), eq(startBounds.lowerEndpoint()), any()); + } + + @Test + public void shouldFetchWithEndLowerBoundIfHighest() { + // Given: + final Range startBounds = Range.closed( + NOW, + NOW.plusSeconds(10) + ); + + final Range endBounds = Range.closed( + NOW.plusSeconds(5).plus(WINDOW_SIZE), + NOW.plusSeconds(15).plus(WINDOW_SIZE) + ); + + // When: + table.get(A_KEY, startBounds, endBounds); + + // Then: + verify(tableStore).fetch(any(), eq(endBounds.lowerEndpoint().minus(WINDOW_SIZE)), any()); + } + + @Test + public void shouldFetchWithStartUpperBoundIfLowest() { + // Given: + final Range startBounds = Range.closed( + NOW, + NOW.plusSeconds(10) + ); + + final Range endBounds = Range.closed( + NOW.plusSeconds(5).plus(WINDOW_SIZE), + NOW.plusSeconds(15).plus(WINDOW_SIZE) + ); + + // When: + table.get(A_KEY, startBounds, endBounds); + + // Then: + verify(tableStore).fetch(any(), any(), eq(startBounds.upperEndpoint())); + } + + @Test + public void shouldFetchWithEndUpperBoundIfLowest() { + // Given: + final Range startBounds = Range.closed( + NOW, + NOW.plusSeconds(20) + ); + + final Range endBounds = Range.closed( + NOW.plusSeconds(5), + NOW.plusSeconds(10) + ); + + // When: + table.get(A_KEY, startBounds, endBounds); + + // Then: + verify(tableStore).fetch(any(), any(), eq(endBounds.upperEndpoint().minus(WINDOW_SIZE))); + } + @Test public void shouldCloseIterator() { // When: - table.get(A_KEY, WINDOW_START_BOUNDS); + table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: verify(fetchIterator).close(); @@ -179,18 +304,63 @@ public void shouldCloseIterator() { @Test public void shouldReturnEmptyIfKeyNotPresent() { // When: - final List result = table.get(A_KEY, WINDOW_START_BOUNDS); + final List result = table.get(A_KEY, WINDOW_START_BOUNDS, WINDOW_END_BOUNDS); // Then: assertThat(result, is(empty())); } @Test - public void shouldReturnValuesForClosedBounds() { + public void shouldReturnValuesForClosedStartBounds() { + // Given: + final Range start = Range.closed( + NOW, + NOW.plusSeconds(10) + ); + + when(fetchIterator.hasNext()) + .thenReturn(true) + .thenReturn(true) + .thenReturn(false); + + when(fetchIterator.next()) + .thenReturn(new KeyValue<>(start.lowerEndpoint().toEpochMilli(), VALUE_1)) + .thenReturn(new KeyValue<>(start.upperEndpoint().toEpochMilli(), VALUE_2)) + .thenThrow(new AssertionError()); + + when(tableStore.fetch(any(), any(), any())).thenReturn(fetchIterator); + + // When: + final List result = table.get(A_KEY, start, Range.all()); + + // Then: + assertThat(result, contains( + WindowedRow.of( + SCHEMA, + windowedKey(start.lowerEndpoint()), + VALUE_1.value(), + VALUE_1.timestamp() + ), + WindowedRow.of( + SCHEMA, + windowedKey(start.upperEndpoint()), + VALUE_2.value(), + VALUE_2.timestamp() + ) + )); + } + + @Test + public void shouldReturnValuesForClosedEndBounds() { // Given: - final Range bounds = Range.closed( - Instant.now(), - Instant.now().plusSeconds(10) + final Range end = Range.closed( + NOW, + NOW.plusSeconds(10) + ); + + final Range startEqiv = Range.closed( + end.lowerEndpoint().minus(WINDOW_SIZE), + end.lowerEndpoint().minus(WINDOW_SIZE) ); when(fetchIterator.hasNext()) @@ -199,26 +369,26 @@ public void shouldReturnValuesForClosedBounds() { .thenReturn(false); when(fetchIterator.next()) - .thenReturn(new KeyValue<>(bounds.lowerEndpoint().toEpochMilli(), VALUE_1)) - .thenReturn(new KeyValue<>(bounds.upperEndpoint().toEpochMilli(), VALUE_2)) + .thenReturn(new KeyValue<>(startEqiv.lowerEndpoint().toEpochMilli(), VALUE_1)) + .thenReturn(new KeyValue<>(startEqiv.upperEndpoint().toEpochMilli(), VALUE_2)) .thenThrow(new AssertionError()); when(tableStore.fetch(any(), any(), any())).thenReturn(fetchIterator); // When: - final List result = table.get(A_KEY, bounds); + final List result = table.get(A_KEY, Range.all(), end); // Then: assertThat(result, contains( WindowedRow.of( SCHEMA, - windowedKey(bounds.lowerEndpoint()), + windowedKey(startEqiv.lowerEndpoint()), VALUE_1.value(), VALUE_1.timestamp() ), WindowedRow.of( SCHEMA, - windowedKey(bounds.upperEndpoint()), + windowedKey(startEqiv.upperEndpoint()), VALUE_2.value(), VALUE_2.timestamp() ) @@ -226,11 +396,52 @@ public void shouldReturnValuesForClosedBounds() { } @Test - public void shouldReturnValuesForOpenBounds() { + public void shouldReturnValuesForOpenStartBounds() { // Given: - final Range bounds = Range.open( - Instant.now(), - Instant.now().plusSeconds(10) + final Range start = Range.open( + NOW, + NOW.plusSeconds(10) + ); + + when(fetchIterator.hasNext()) + .thenReturn(true) + .thenReturn(true) + .thenReturn(true) + .thenReturn(false); + + when(fetchIterator.next()) + .thenReturn(new KeyValue<>(start.lowerEndpoint().toEpochMilli(), VALUE_1)) + .thenReturn(new KeyValue<>(start.lowerEndpoint().plusMillis(1).toEpochMilli(), VALUE_2)) + .thenReturn(new KeyValue<>(start.upperEndpoint().toEpochMilli(), VALUE_3)) + .thenThrow(new AssertionError()); + + when(tableStore.fetch(any(), any(), any())).thenReturn(fetchIterator); + + // When: + final List result = table.get(A_KEY, start, Range.all()); + + // Then: + assertThat(result, contains( + WindowedRow.of( + SCHEMA, + windowedKey(start.lowerEndpoint().plusMillis(1)), + VALUE_2.value(), + VALUE_2.timestamp() + ) + )); + } + + @Test + public void shouldReturnValuesForOpenEndBounds() { + // Given: + final Range end = Range.open( + NOW, + NOW.plusSeconds(10) + ); + + final Range startEquiv = Range.open( + end.lowerEndpoint().minus(WINDOW_SIZE), + end.upperEndpoint().minus(WINDOW_SIZE) ); when(fetchIterator.hasNext()) @@ -240,21 +451,22 @@ public void shouldReturnValuesForOpenBounds() { .thenReturn(false); when(fetchIterator.next()) - .thenReturn(new KeyValue<>(bounds.lowerEndpoint().toEpochMilli(), VALUE_1)) - .thenReturn(new KeyValue<>(bounds.lowerEndpoint().plusMillis(1).toEpochMilli(), VALUE_2)) - .thenReturn(new KeyValue<>(bounds.upperEndpoint().toEpochMilli(), VALUE_3)) + .thenReturn(new KeyValue<>(startEquiv.lowerEndpoint().toEpochMilli(), VALUE_1)) + .thenReturn( + new KeyValue<>(startEquiv.lowerEndpoint().plusMillis(1).toEpochMilli(), VALUE_2)) + .thenReturn(new KeyValue<>(startEquiv.upperEndpoint().toEpochMilli(), VALUE_3)) .thenThrow(new AssertionError()); when(tableStore.fetch(any(), any(), any())).thenReturn(fetchIterator); // When: - final List result = table.get(A_KEY, bounds); + final List result = table.get(A_KEY, Range.all(), end); // Then: assertThat(result, contains( WindowedRow.of( SCHEMA, - windowedKey(bounds.lowerEndpoint().plusMillis(1)), + windowedKey(startEquiv.lowerEndpoint().plusMillis(1)), VALUE_2.value(), VALUE_2.timestamp() ) @@ -281,7 +493,7 @@ public void shouldMaintainResultOrder() { when(tableStore.fetch(any(), any(), any())).thenReturn(fetchIterator); // When: - final List result = table.get(A_KEY, WINDOW_START_BOUNDS); + final List result = table.get(A_KEY, Range.all(), Range.all()); // Then: assertThat(result, contains( @@ -309,7 +521,7 @@ public void shouldMaintainResultOrder() { @Test public void shouldSupportRangeAll() { // When: - table.get(A_KEY, Range.all()); + table.get(A_KEY, Range.all(), Range.all()); // Then: verify(tableStore).fetch(