-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support PARTITION BY on multiple expressions #6803
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I went commit-by-commit so some of my comments may already be out-of-date, in which case you can just ignore them :)
ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
Outdated
Show resolved
Hide resolved
@@ -405,7 +407,7 @@ private PreJoinRepartitionNode buildInternalRepartitionNode( | |||
ExpressionTreeRewriter.rewriteWith(plugin, joinExpression); | |||
|
|||
final LogicalSchema schema = | |||
buildRepartitionedSchema(source, rewrittenPartitionBy); | |||
buildRepartitionedSchema(source, Collections.singletonList(rewrittenPartitionBy)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since we already import ImmutableList
let's just use ImmutableList.of
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean we import ImmutableList in this file or in the module? I don't see it in the file but I can add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh huh, I must've been looking at a separate file. - in PreJoinRepartitionNode
for example it was already there before this PR. It's nice to just standardize on one (and most of hte code uses ImmutableList#of
, though there isn't really any good reason to use that over Collections.singletonList
except for mabye that ImmutableList
has more general usages)
@@ -111,7 +112,7 @@ public void setKeyFormat(final KeyFormat format) { | |||
return getSource().buildStream(builder) | |||
.selectKey( | |||
valueFormat.getFormatInfo(), | |||
partitionBy, | |||
Collections.singletonList(partitionBy), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same thing about ImmutableList
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. For my edification, why is ImmutableList preferred?
ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/UserRepartitionNode.java
Outdated
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
Outdated
Show resolved
Hide resolved
ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json
Show resolved
Hide resolved
final HashSet<Object> partitionBys = new HashSet<>(partitionByExpressions.size()); | ||
partitionByExpressions.forEach(exp -> { | ||
if (!partitionBys.add(exp)) { | ||
throw new KsqlException("Duplicate PARTITION BY expression: " + exp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's wrong with a duplicate partition by expression? I don't see any reason why a user might want it, but I don't see why not either (e.g. maybe their output data expects userId, userSpecialId
in the key and this stream always has the same value for both)
I get that the key name conflicts might be a little weird, so we can do this in a follow-up PR, but I don't think we should prohibit it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is consistent with what we do for multi-column GROUP BY (which has been around for a long time):
ksql/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/GroupBy.java
Lines 49 to 53 in e7f1f47
groupingExpressions.forEach(exp -> { | |
if (!groupBys.add(exp)) { | |
throw new KsqlException("Duplicate GROUP BY expression: " + exp); | |
} | |
}); |
I believe the reason indeed has to do with naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, we can keep it like that for now
ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/PartitionBy.java
Outdated
Show resolved
Hide resolved
|
||
if (row != null) { | ||
for (int i = 0; i < partitionByCol.size(); i++) { | ||
if (partitionByCol.get(i).shouldAppend) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of relying on the implicit ordering of the partition by, it might make sense to lookup the partitionByCol.name
in the resultSchema
at the cost of a bit of performance. thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean replacing shouldAppend
with a check to see whether the partitionByCol.name
is present in the resultSchema
as a value column, or do you mean iterating through resultSchema
rather than partitionByCols
? The former doesn't seem like an improvement to me, if we're still relying on the ordering of partitionByCols
in the iteration. To remove reliance on ordering, we can do the latter and replace the lists (of columns and evaluators) with maps keyed on column name instead, but it's not clear to me that's better. It feels slightly harder to reason about code-wise but I wouldn't mind making the change.
What's your concern regarding relying the ordering? Are you worried it's brittle, or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I was reviewing the code I thought we could just set it at the index (e.g. row.set(resultSchema.get(partitionCol.name).index())
). So we still keep the iteration on partitionByCol
but we set it in the row based on it's index in the schema.
What's your concern regarding relying the ordering? Are you worried it's brittle, or something else?
yeah - I'm worried that it's brittle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, interesting. (We'd have to first re-size the row before calling .set()
but that's an implementation detail.) The advantage of your proposal is that we can change the details of the result schema without needing to update the logic here. If we extended that further, really we should also be setting the existing value fields in the new row based on the result schema, rather than leaving those intact and appending key columns.
I guess I'm not convinced this change is necessary since our test coverage in this area is quite good -- lots of tests would break if someone modified the result schema without corresponding updates here. OTOH, it's very possible I'm biased towards thinking this code is understandable as is since I've been working on it for a while. If your assessment differs as someone who hasn't worked with this code as much, I'm inclined to go with your judgment.
As for performance, LogicalSchema stores a list of columns so finding a particular column might be slow. If we implemented this we'd want to build the index mapping outside the creation of the actual mapper, and have the mapper use the index mapping directly. I'm not opposed to this. If you think it's preferable I can open a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to leave it as is, though I suspect we'll run into this discussion again if/when we stop copying things from the key into the value
"example. To fix this, we'd have to add special handling to detect when a key expression", | ||
"depends only on ROWTIME, similar to how today we have special handling to detect when a key", | ||
"expression depends only on key columns." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or... we can finally swap over to using PAPI :)
Description
Fixes #5754
This PR adds support for PARTITION BY with multiple expressions, resulting in multiple key columns. There are no backwards compatibility concerns as the ksqlDB syntax did not support this prior to this PR.
Docs will come in a separate PR.
Testing done
QTT.
Reviewer checklist