Skip to content

Commit

Permalink
feat: support WINDOWEND in WHERE of pull queries (#5680)
Browse files Browse the repository at this point in the history
* feat: support WINDOWEND in WHERE of pull queries

fixes: #5666

Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
  • Loading branch information
big-andy-coates and big-andy-coates authored Jun 25, 2020
1 parent 7d0079a commit 40f2f13
Show file tree
Hide file tree
Showing 12 changed files with 931 additions and 175 deletions.
4 changes: 2 additions & 2 deletions docs/concepts/queries/pull.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Pull query features and limitations
by key.
- WHERE clauses can only have constraints on the key column for non-windowed tables.

- In addition, windowed tables support bounds on WINDOWSTART using operators
- In addition, windowed tables support bounds on `WINDOWSTART` and `WINDOWEND` using operators
`<=`, `<`, `=`, `>`, `>=`.
- JOIN, PARTITION BY, GROUP BY and WINDOW clauses aren't supported.
- SELECT statements can contain column arithmetic and function calls.
Expand All @@ -53,7 +53,7 @@ timestamp within the specified time window.
```sql
SELECT * FROM user_location
WHERE userId = 'user19r7t33'
AND '2019-10-02T21:31:16' <= WINDOWSTART AND WINDOWSTART <= '2019-10-03T21:31:16';
AND '2019-10-02T21:31:16' <= WINDOWSTART AND WINDOWEND <= '2019-10-03T21:31:16';
```

API Reference
Expand Down
10 changes: 5 additions & 5 deletions docs/developer-guide/ksqldb-reference/select-pull-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,31 @@ Execute a pull query by sending an HTTP request to the ksqlDB REST API, and
the API responds with a single response.

The WHERE clause must contain a single primary-key to retrieve and may
optionally include bounds on WINDOWSTART if the materialized table is windowed.
optionally include bounds on `WINDOWSTART` and `WINDOWEND` if the materialized table is windowed.

Example
-------

```sql
SELECT * FROM pageviews_by_region
WHERE regionId = 'Region_1'
AND 1570051876000 <= WINDOWSTART AND WINDOWSTART <= 1570138276000;
AND 1570051876000 <= WINDOWSTART AND WINDOWEND <= 1570138276000;
```

When writing logical expressions using `WINDOWSTART`, you can use ISO-8601
When writing logical expressions using `WINDOWSTART` or `WINDOWEND`, you can use ISO-8601
formatted datestrings to represent date times. For example, the previous
query is equivalent to the following:

```sql
SELECT * FROM pageviews_by_region
WHERE regionId = 'Region_1'
AND '2019-10-02T21:31:16' <= WINDOWSTART AND WINDOWSTART <= '2019-10-03T21:31:16';
AND '2019-10-02T21:31:16' <= WINDOWSTART AND WINDOWEND <= '2019-10-03T21:31:16';
```

You can specify time zones within the datestring. For example,
`2017-11-17T04:53:45-0330` is in the Newfoundland time zone. If no time zone is
specified within the datestring, then timestamps are interpreted in the UTC
time zone.

If no bounds are placed on `WINDOWSTART`, rows are returned for all windows
If no bounds are placed on `WINDOWSTART` or `WINDOWEND`, rows are returned for all windows
in the windowed table.
Original file line number Diff line number Diff line change
Expand Up @@ -320,21 +320,25 @@ public void shouldQueryMaterializedTableForTumblingWindowed() {
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.singleton(w.start())));
withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all()));

assertThat("at exact window start", resultAtWindowStart, hasSize(1));
assertThat(resultAtWindowStart.get(0).schema(), is(schema));
assertThat(resultAtWindowStart.get(0).window(), is(Optional.of(w)));
assertThat(resultAtWindowStart.get(0).key(), is(key));
assertThat(resultAtWindowStart.get(0).value(), is(v));

final List<WindowedRow> resultAtWindowEnd =
withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end())));
assertThat("at exact window end", resultAtWindowEnd, hasSize(1));

final List<WindowedRow> resultFromRange = withRetry(() -> withRetry(() -> table
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)))));
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all())));

assertThat("range including window start", resultFromRange, is(resultAtWindowStart));

final List<WindowedRow> resultPast = withRetry(() -> table
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1))));
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all()));
assertThat("past start", resultPast, is(empty())
);
});
Expand Down Expand Up @@ -369,21 +373,25 @@ public void shouldQueryMaterializedTableForHoppingWindowed() {
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.singleton(w.start())));
withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all()));

assertThat("at exact window start", resultAtWindowStart, hasSize(1));
assertThat(resultAtWindowStart.get(0).schema(), is(schema));
assertThat(resultAtWindowStart.get(0).window(), is(Optional.of(w)));
assertThat(resultAtWindowStart.get(0).key(), is(key));
assertThat(resultAtWindowStart.get(0).value(), is(v));

final List<WindowedRow> resultAtWindowEnd =
withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end())));
assertThat("at exact window end", resultAtWindowEnd, hasSize(1));

final List<WindowedRow> resultFromRange = withRetry(() -> table
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1))));
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all()));

assertThat("range including window start", resultFromRange, is(resultAtWindowStart));

final List<WindowedRow> resultPast = withRetry(() -> table
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1))));
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all()));

assertThat("past start", resultPast, is(empty()));
});
Expand Down Expand Up @@ -417,20 +425,24 @@ public void shouldQueryMaterializedTableForSessionWindowed() {
final Struct key = asKeyStruct(k.key(), query.getPhysicalSchema());

final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.singleton(w.start())));
withRetry(() -> table.get(key, Range.singleton(w.start()), Range.all()));

assertThat("at exact window start", resultAtWindowStart, hasSize(1));
assertThat(resultAtWindowStart.get(0).schema(), is(schema));
assertThat(resultAtWindowStart.get(0).window(), is(Optional.of(w)));
assertThat(resultAtWindowStart.get(0).key(), is(key));
assertThat(resultAtWindowStart.get(0).value(), is(v));

final List<WindowedRow> resultAtWindowEnd =
withRetry(() -> table.get(key, Range.all(), Range.singleton(w.end())));
assertThat("at exact window end", resultAtWindowEnd, hasSize(1));

final List<WindowedRow> resultFromRange = withRetry(() -> table
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1))));
.get(key, Range.closed(w.start().minusMillis(1), w.start().plusMillis(1)), Range.all()));
assertThat("range including window start", resultFromRange, is(resultAtWindowStart));

final List<WindowedRow> resultPast = withRetry(() -> table
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1))));
.get(key, Range.closed(w.start().plusMillis(1), w.start().plusMillis(1)), Range.all()));
assertThat("past start", resultPast, is(empty()));
});
}
Expand Down Expand Up @@ -644,7 +656,9 @@ private static void verifyRetainedWindows(
) {
rows.forEach(record -> {
final Struct key = asKeyStruct(record.key().key(), query.getPhysicalSchema());
final List<WindowedRow> resultAtWindowStart = withRetry(() -> table.get(key, Range.all()));
final List<WindowedRow> resultAtWindowStart =
withRetry(() -> table.get(key, Range.all(), Range.all()));

assertThat("Should have fewer windows retained",
resultAtWindowStart,
hasSize(expectedWindows.size()));
Expand Down
Loading

0 comments on commit 40f2f13

Please sign in to comment.