-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: expression support in JOINs #4278
Conversation
ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
Outdated
Show resolved
Hide resolved
// a no-op if a repartition is not required. if the source is a table, and | ||
// a repartition is needed, then an exception will be thrown | ||
buildRepartitionNode(leftSourceNode, joinInfo.get().getLeftJoinExpression()), | ||
buildRepartitionNode(rightSourceNode, joinInfo.get().getRightJoinExpression()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure adding repartition nodes in the logical plan is the right way to go here. IMO the transformation performed by the nodes in the logical plan should correspond closely to the semantics of the corresponding transformation in the query, e.g.
RepartitionNode
implements PARTITION BY
ProjectNode
implements SELECT <ITEMS>
FilterNode
implements WHERE <expr>
I see two problems in this particular case:
-
The semantics of the repartition for the join might be different from the semantics of the repartition needed to perform a
PARTITION BY
. APARTITION BY
transforms the schema in a very specific way that may not make sense here. It's kind of hard to give a concrete example since we're still deciding on PARTITION BY semantics. But one example might be adding default column names for the new key columns (depending on the semantics we choose for it). IfPARTITION BY
semantics support adding columns in thePARTITION BY
, those columns should get some generated name. But it wouldn't make sense to add those columns for the repartition done here. -
The repartition here is really an implementation detail of the join. One place that comes to mind where this might come up down the line: we've discussed changing the join semantics to preserve the original key of the left side. The current behavior is really exposing implementation details in our query semantics. If we did that it would be awkward to have the original rekey in these repartition nodes and the post-join rekey in JoinNode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO the transformation performed by the nodes in the logical plan should correspond closely to the semantics of the corresponding transformation in the query
My knowledge on SQL theory and relational algebra is pretty minimal, but I feel that the fact that the nodes mirror the SQL statements is coincidental (a reflection of how new our codebase is) not characteristic. Most databases have a step that optimizes the logical plan independent of the physical plan so the output of the logical planner may only loosely resemble the original SQL query. Otherwise, why have the extra step to translate between the query nodes and the logical plan nodes?
Instead, I think the logical nodes should define steps that describe the logical transformation and they should constitute the smallest reusable unit of logical work - otherwise reusing (composing) them and optimizing complex queries becomes unmanageable. The LogicalPlanner
's job is to turn a SQL query into a logical plan.
The semantics of the repartition for the join might be different from the semantics of the repartition needed to perform a
PARTITION BY
... IfPARTITION BY
semantics support adding columns in thePARTITION BY
, those columns should get some generated name. But it wouldn't make sense to add those columns for the repartition done here.
This falls pretty well into my view of logical plan. In my view, if PARTITION BY
supports adding columns then a single PARTITION BY
should map to two logical nodes - a projection node (or a rename) and a partition node.
The repartition here is really an implementation detail of the join.
I think things get a little tricky with PARTITION BY
because it's the only step that spans the logical and the physical world. Really, we don't have a "RekeyNode" (which is perhaps a better name for the RepartitionNode
) - but we have defined our Joins as only working on streams with the same key (and this a logical, not a physical, requirement).
If we did that it would be awkward to have the original rekey in these repartition nodes and the post-join rekey in JoinNode
It wouldn't be in the JoinNode - it would be yet another RepartitionNode
(which, again, should probably be named RekeyNode
).
ksql-engine/src/main/java/io/confluent/ksql/structured/SchemaKStream.java
Outdated
Show resolved
Hide resolved
ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java
Show resolved
Hide resolved
ksql-engine/src/main/java/io/confluent/ksql/analyzer/ExpressionAnalyzer.java
Outdated
Show resolved
Hide resolved
try { | ||
return columnRefs.stream() | ||
.map(ColumnRef::source) | ||
.filter(Optional::isPresent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the column is not qualified, don't you need to check that it belongs to exactly one source, and include that source in the stream? If we are assuming all the columns are qualified then we should throw if the source is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is checked earlier that it belongs to only one source (in ExpressionAnalyzer#throwOnUnknownOrAmbiguousColumn
) - I'll add a comment explaining this
EDIT: actually, there's a possibility that they're all unambiguous but also not qualified. I'll make the ExpressionAnalyzer
add the qualifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
fixes #4130
Description
This PR introduces the ability to do arbitrary expressions in join conditions as long as on each side of the equality exactly one join source is used, and both join sources are represented. As an added bonus, this feature actually removes more code than it adds!
To implement this, I simply removed all repartition logic from the
JoinNode
and instead added a repartition step (reusing the recently introducedRepartitionNode
) to each of the source nodes. This path is always hit, if no repartition is required then the repartition node simply returns the same source as was passed into it. Now, theJoinNode
always joins on the keys of its input.Review Guide
To review, I recommend first looking at the
Analyzer
which has been simplified just to check the condition outlined above. Then, look at theLogicalPlanner
and finally at theJoinNode
.Testing done
Reviewer checklist