-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: call KStreams API grace() when GRACE PERIOD is used in joins #7678
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for the great PR description!
I'm half way there and would try to complete by eow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a major question about the added test: with the fix in place, the output seems still spurious? Otherwise, just minor questions.
@@ -196,6 +197,7 @@ static Expression rewriteTimeComparisonForFilter(final Expression expression) { | |||
final SchemaKStream<K> otherSchemaKStream, | |||
final ColumnName keyColName, | |||
final JoinWindows joinWindows, | |||
final Optional<WindowTimeClause> gracePeriod, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we just retrieve the grace period from the JoinWindows? I.e. we just set the grace time in JoinWindows if GRACE
specified, and then pass around that object only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT: I knew this is only used for WITH
and the GRACE PERIOD
is out of the scope; I guess my meta question is: why can't we have the GRACE PERIOD
captured as optional in WithinExpression
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The JoinWindows
has a default grace (24h) if it is not passed one. I cannot know if the grace was optional in the ksql syntax with that default. Say a user set GRACE PERIOD 24 HOURS, then JoinWindows
will be set to 24h, which it could be the default (no fix enabled) or manually set (fix enabled).
The grace is optional in the WithinExpression
. I could pass the WithinExpression
all down to the StreamStreamJoinBuilder
but I would need to add the ksqldb-parser
dependency into the ksqldb-streams
module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say in the future if we do not need to rely on GRACE to enable the fix or not, then would this code be changed again to only pass in the JoinWindows
object still?
If yes, I'd suggest we just keep doing that right away: we could, e.g., a wild idea is to have an extended JoinWindows that adds a flag indicating if user specified grace, and then pass around that object from WithinExpression -> ExecutionStepFactory -> StreamStreamJoin, and let it to return a graceSpecified
rather than join.getGraceMillis()
, used by the StreamStreamJoinBuilder
. In the future we can just remove the extended class and the graceSpecified
from StreamStreamJoin. Does that work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could do that. But it would only be used to pass the window+graceSpecified from SchemaKStream#leftJoin()
-> ExecutionStepFactory#streamStreamJoin()
. After that, the rest should be passed as millis to the StreamStreamJoin
because of compatibility. Look at the StreamStreamJoin
class. It has a constructor that is used by a JSON mapper when reads a plan. That already accepts before/after millis. I cannot change that constructor to pass the JoinWindows
.
I'm not sure if it is worth creating a class around the JoinWindows for one single change and then remove it. I think we can remove the gracePeriod later easier than removing a class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I think I'm convinced. Let's just keep it as is then.
{"topic": "LEFT_JOIN", "key": 0, "value": {"L1": "A", "L2": "a"}, "timestamp": 60000}, | ||
{"topic": "LEFT_JOIN", "key": 1, "value": {"L1": "B", "L2": null}, "timestamp": 330000}, | ||
{"topic": "LEFT_JOIN", "key": 2, "value": {"L1": "C", "L2": null}, "timestamp": 90000}, | ||
{"topic": "LEFT_JOIN", "key": 2, "value": {"L1": "C", "L2": "c"}, "timestamp": 90000}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. is the output expected? With our fix shouldn't we only output one record for key 2, with C-c, and one record for key 3, with D-d?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix is not enabled yet. These results are working with the spurious results bug.
{"topic": "OUTER_JOIN", "key": 1, "value": {"T_ID": null, "TT_ID": 1, "L1": null, "L2": "b"}, "timestamp": 330000}, | ||
{"topic": "OUTER_JOIN", "key": 2, "value": {"T_ID": null, "TT_ID": 2, "L1": null, "L2": "c"}, "timestamp": 90000}, | ||
{"topic": "OUTER_JOIN", "key": 2, "value": {"T_ID": 2, "TT_ID": 2, "L1": "C", "L2": "c"}, "timestamp": 90000}, | ||
{"topic": "OUTER_JOIN", "key": 3, "value": {"T_ID": null, "TT_ID": 3, "L1": null, "L2": "d"}, "timestamp": 60000}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix is not enabled yet. These results are working with the spurious results bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. You mean that we need to wait until the dependency ccs-kafka has @mjsax 's merged code, and then update this PR here and merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to wait for the fix in kafka. But it should not block this PR 'cause enabling the fix will involve other changes in QTT. A follow-up PR for making changes in the QTT framework is here (which is required to enable the fix and update the tests) - #7695
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
Thanks @guozhangwang. I just merged the PR. |
// join window or not. If used, then we'll call the new KStreams API ofSizeAndGrace() which | ||
// enables the bugfix with left/outer joins (see KAFKA-10847). | ||
if (join.getGraceMillis().isPresent()) { | ||
joinWindows = joinWindows.grace(join.getGraceMillis().get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is calling the old/exiting grace()
and thus it does not enable this fix. I think we need to change this (starting in L88):
final JoinWindows joinWindows;
if (join.getGraceMillis().isPresent()) {
joinWindows = JoinsWindow.ofTimeDifferenceAndGrace(
join.getBeforeMillis(),
join.getGraceMillis().get()
).after(join.getAfterMillis();
} else {
joinWindows = JoinWindows.of(join.getBeforeMillis()).after(join.getAfterMillis())
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think #7695 would update this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I need #7695 merged before I make the changes for the new ofTimeDifferenceAndGrace
.
Description
Follow up on #7662. This PR makes the connection from the
GRACE PERIOD
syntax in stream-stream joins and the call to the KStreams grace API, which enables the use of grace in join windows.This is the workflow that happens internally to help review the code:
When a join happens. The parsing creates a
WithinExpression
which has the windows and optional grace (updated in #7662).This window + optional grace are obtained from the
JoinNode
and passed to theSchemaKStream
class, which acceptsan optional grace. The
SchemaKStream
then passes the window + optional grace to theExecutionStepFactory
whichgets the before/after and grace values in milliseconds, and then passes them to the
StreamStreamJoin
.The
StreamStreamJoin
accepts graceMillis as optional because seems it is needed for compatibility with an old plan writtenin JSON?. Anyway, the grace must be optional to continue with the call to the
KSPlanBuilder.visitStreamStreamJoin()
,which later ends up calling the
StreamStreamJoinBuilder.build()
.The
StreamStreamJoinBuilder
is the final class that builds the windows + grace and calls the Kafka streams API. Thegrace is optional in this class in order to know whether to call the new KStreams API ofSizeAndGrace() or just call
the normal of/after calls.
The
ofSizeAndGrace()
is called is not done in this PR. A follow-up PR will be done after the Kafka PR that contains this new method is merged.Testing done
Describe the testing strategy. Unit and integration tests are expected for any behavior changes.
Updated unit tests
Add QTT tests
Reviewer checklist