-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Allows pull queries with generic WHERE clauses #6939
feat: Allows pull queries with generic WHERE clauses #6939
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few meta questions:
-
Is "ksql.query.pull.table.scan.enabled" going to be a permanent config or just temporary before GA? I'm just wondering if we would drop "PullPlannerOptions/PullQueryConfigPlannerOptions" later, or it would stay for possibly other usage?
-
I did not find where do we relax the constraints for "Key range queries", and
"Multiple constraints on the key". Could you point me to the lines of these changes?
@@ -274,8 +274,8 @@ static void executeOrRouteQuery( | |||
.ifPresent(queryExecutorMetrics -> queryExecutorMetrics.recordLocalRequests(1)); | |||
pullPhysicalPlan.execute(locations, pullQueryQueue, rowFactory); | |||
} catch (Exception e) { | |||
LOG.error("Error executing query {} locally at node {} with exception {}", | |||
statement.getStatementText(), node, e.getCause()); | |||
LOG.error(String.format("Error executing query %s locally at node %s with exception", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think just calling node.toString()
and remove the third {}
should be sufficient? https://stackoverflow.com/a/51781806/6131463
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok. I think you're right. I was trying to get the separate stacktrace.
while (result.equals(Optional.empty())) { | ||
row = (TableRow)child.next(); | ||
if (row == null) { | ||
return null; | ||
} | ||
|
||
final GenericRow intermediate = PullPhysicalOperatorUtil.getIntermediateRow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any logical difference in this file diff? Seems that select operator's next()
would still return at most one transformed row from the underlying operator before and after this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there was a bug before.
if result.equals(Optional.empty())
was true, it would try to get a new row and call result = transformRow.apply(row);
The subtle issue is that transformRow
makes reference to intermediate
, which depends on row
, so in the event that it tries to get a new row, transformRow
gives the wrong answer.
I rewrote it so that this process is now within the while loop, including creating intermediate
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks!
); | ||
|
||
// Then: | ||
expectTableScan(expression, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused, shouldn't table scan be expected in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reads a little funny, the false parameter is isWindowed
.
when(plannerOptions.getTableScansEnabled()).thenReturn(true); | ||
final Expression expression1 = new ComparisonExpression( | ||
Type.EQUAL, | ||
new UnqualifiedColumnReferenceExp(ColumnName.of("K")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be K1 and K2 respectively?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of the test is slightly misleading. The test says that if you make multiple references in the same disjunct to a given key column, it must do a table scan. This is a little strict and future optimizations will likely make this unnecessary. An example:
SELECT * FROM T1 WHERE K > 2 AND K = 5;
This could just be simplified as
SELECT * FROM T1 WHERE K = 5;
but we don't yet do this simplification and will instead consider too difficult to extract, resulting in a table scan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test name makes it sound like a multi-column key. I changed it to shouldExtractConstraintWithMultipleKeyExpressions_tableScan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, thanks!
aecd76d
to
ecdeaec
Compare
That's a good question. I think it depends on how well table scans perform. In a shared environment, you might want to prevent people from scanning everything. With the recent async changes though, maybe its not a huge deal. The nice thing about having it as a config is that you can set it on the CLI with Supporting
In both scenarios, if table scans are enabled, we don't throw an exception, but instead mark that a scan is needed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! One small step for Alan, one large step for pull queries 🌔
EDIT: also I think I agree that having the pull query table scan config is a good idea (at least in the medium term)
@@ -487,6 +487,20 @@ the pull query REST endpoint (by including it in the request e.g: `"streamsPrope | |||
By default, any amount of lag is allowed. For using this functionality, the server must be configured with `ksql.heartbeat.enable=true` and | |||
`ksql.lag.reporting.enable=true`, so the servers can exchange lag information between themselves ahead of time, to validate pull queries against the allowed lag. | |||
|
|||
## `ksql.query.pull.table.scan.enabled` | |||
|
|||
**Per query:** yes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a possibility to set this at the server level? some deployments might want to prevent table scans altogether
EDIT: i notice in the code that you can (but you can't override the server default to enable them). probably worth calling out in the docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree. I wanted to be sure that you couldn't turn it on using the CLI when the main server config is off. This mainly allows you to just disable them, if you choose. I'll add that to the doc.
LOG.error("Error executing query {} locally at node {} with exception {}", | ||
statement.getStatementText(), node, e.getCause()); | ||
LOG.error("Error executing query {} locally at node {} with exception", | ||
statement.getStatementText(), node.toString(), e.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of interest, any reason why we needed to add toString
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't think. Removed it.
|
||
final GenericRow intermediate = PullPhysicalOperatorUtil.getIntermediateRow( | ||
row, logicalNode.getAddAdditionalColumnsToIntermediateSchema()); | ||
final Function<TableRow, Optional<TableRow>> transformRow = row -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think it makes sense to just create this function outside of the while loop (or make it not a lambda) to avoid unnecessary function object creation (my preference, I don't see why this needs to be a lambda at all).
in fact, I think encapsulating it in a lambda is probably what caused the bug you described above in the first place (the intermediate
was curried along with the function)!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree. Normally lambdas can only reference final local variables, which minimizes the occurrence of this kind of bug, but when they reference mutable member fields, that's a recipe for bugs like this one.
if (pullPlannerOptions.getTableScansEnabled()) { | ||
requiresTableScan = true; | ||
} else { | ||
throw invalidWhereClauseException("WHERE clause missing key column for disjunct: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we include in this error message that you can enable table scans using the config? (same for the below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, that makes sense. Added that to the general error message on pull queries.
)); | ||
|
||
// Then: | ||
assertThat(e.getMessage(), containsString("Bound on key columns '[`K` INTEGER KEY]' must currently be '='.")); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
@Test | ||
public void shouldExtractConstraintWithLiteralRange_tableScan() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we support key range queries this should no longer require table scan, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. I would then expect it to return a different LookupConstraint
for range queries.
} | ||
}, | ||
{ | ||
"name": "Generalized WHERE: key range", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might make sense to start generating an EXPLAIN
for pull queries so that we know the physical plan that we decided to take (something similar to what we do for historical plans for persistent queries). That way we can make sure that the code changes we make don't accidentally result in regressions that change the execution in a negative way (e.g. if we had implemented primary key scans and this PR turned it into a table scan).
Obviously a bigger change so requires a follow-up, I'll create a ticket from this comment for context and we can plan that work as part of the normal planning process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hehe I just read your response to Guozhang's comments. I think EXPLAIN
is useful for both testing and usability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I totally agree. Disabling the behavior lets us know that it's allowed/disallowed from doing table scans, but once we have more complex combinations, you would have to disable everything you don't want to happen to be confident you know the physical plan. Verifying EXPLAIN
physical plan steps would be the best verification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, with one suggestion.
Co-authored-by: Jim Galasyn <jim.galasyn@confluent.io>
Description
Now that we have both codegen for the filter condition and table scan functionality, this PR removes some restrictions that were formerly in place including disallowing:
These restrictions are only lifted when the config
ksql.query.pull.table.scan.enabled
is set to true. Otherwise, the former restrictions are still in place.One exception to this is that now, if you specify both a key equality and non-key constraint, it's done with a key lookup and no table scan is required.
Testing done
Many RQTT and unit tests.
Reviewer checklist