-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
docs: klip-10 add suppress to KSQL #3754
Conversation
## Value/Return | ||
|
||
This feature enables functionality in KSQL that is not possible today, and provides stricter | ||
semantic guarantees on KSQL aggregation queries. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
provides stricter semantic guarantees
Why that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose I meant "stricter control on the semantics of aggregations" - you could rely on certain assumptions that previously you could not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was one of the features/observations from the original KIP: when the application is taking actions in response to emitted events, then the emission behavior itself also has semantics, and the app developer needs to know what they are.
However, it's probably worth keeping our heads straight about whether we're talking about SQL/relational semantics vs. emit semantics, since they're different dimensions, and the discussion could become confusing.
design-proposals/klip-10-suppress.md
Outdated
And behaves in the following ways: | ||
- If `CHANGES` is specified, then all intermediate changes will be materialized | ||
- If `FINAL` is specified, then output will be suppressed depending on the `suppression`: | ||
- if `EAGERLY` is specified, the suppression is a best effort attempt to reduce duplicate output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between CHANGES
and FINAL EAGERLY
? To me, they appear to be the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I was under the assumption that CHANGES
would be "one-to-one" (e.g. something like no buffering) - I've had some people in the community ask for that feature. Is that possible with the suppress API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After discussing with @vpapavas a little more - I think I've clarified my own thoughts about the difference:
EMIT CHANGES
uses just the buffer sizeEMIT FINAL EAGERLY
would use either the buffer size or the window size, whatever comes first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we set on EMIT FINAL
? I goes back to #3242 (comment)
I am concerned about introducing a new FINAL
mode here. Could we do something like adding a new SUPRESS UNTIL <supression_mechanism>
clause ? (`SUPRESS
- IMO having supress in the syntax would also make it easier for users to reason across streams and ksql. Its anyway doing a Streams supress
- Can EAGERLY be a supression scheme instead of its own clause.. For with
EAGERLY WHEN WINDOW_CLOSED()
, user would be 1 record per window as long as it fits in buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look at the comments below that John posted - I think it helps clarify a lot of the streams behaviors that will make me need to revamp a lot of this.
Are we set on
EMIT FINAL
?
I think we can change FINAL
to something else if we want, but I think we should stick with EMIT
and not introduce SUPPRESS
- I don't agree that mirroring Streams terminology is necessarily a design goal. One candidate is maybe EMIT WHEN [supression_condition]
.
Can EAGERLY be a suppression scheme instead of its own clause
I think I'm going to remove the concept of EAGERLY
- it looks like I misunderstood how that works. Again, I will defer to John's comments below on the semantics that I misunderstood.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EMIT FINAL
also fits in with the direction other streaming SQL impls are going... i.e. Beam/Flink.
design-proposals/klip-10-suppress.md
Outdated
|
||
We will model the suppression as method calls in order to allow flexibility in the syntax if | ||
different types of suppressions are added in the future. We could also extend this to allow user | ||
implemented suppressions in the same way that we support UDFs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting idea -- this is not supported in Kafka Streams atm, and it's unclear to me who this would work? It's not part of this proposal, but as you outline the idea I am wondering how this could look like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I assumed that anyone who implemented the Suppressed
interface could plug into the interface. If that's not the intention then maybe I can simplify the syntax - but I wanted to leave it open in case that was the idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. For clarity, people don't implement Suppressed
. In fact, this is expressly forbidden. Instead, they use the static methods on Suppressed
to configure the operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vvcephei is there any intent to open this up in the future, or does that just not work?
Hey, thanks @agavra for taking on this design task! I think that a lot of people will get utility from this feature, and I like the way you've posed it. Reading the design, it seems like the expression space looks like this:
With all of the above in mind, and also with some of the battle scars of having people actually use Suppress in Streams, I'm wondering if we should consider an alternative arrangement:
There's one extra semantic difficulty you might want to consider: it does make sense to issue a "point in time" query, but request only final results. I wonder if you want to account for that in the query syntax, or instead just build on the nature of SQL. I.e., if there's a way to define the "final results" relation (as a streaming table) and then collect it into a "point in time" query, like Also, while bikeshedding the strawman above, I'm wondering if you can let us know what kind of syntax other streaming SQL dialects use for this concept. I'm not saying we should use the same syntax, but I think it would help to know what they are. Last comment: You said that grace period would be out of scope. What would be the semantics of |
Thanks for the thoughts @vvcephei! Really helps my understanding of KStreams.
Good to know that this hasn't been useful or well understood in practice. I think trying to support this is what introduced most of the complexity was in this KLIP.
I just saw that
The way I imagined this is that the I think overall given this understanding of KS, it makes a lot of sense to just support With regards to "point in time" queries, we've ditched that terminology in favor of a clearer taxonomy of queries. There are two types of transient user queries - those that stream results back and those that "batch" results back. I don't think
https://arxiv.org/pdf/1905.12133.pdf (they use
The way I see it is that grace period is a property of the window. Today (I think) we use whatever the default is for the grace period and don't supply any syntax to help set it. We will use that grace period to determine what "FINAL" means. Later we could introduce syntax in the |
Hi @agavra , thanks for the response. Regarding eager+final, you're not the only person to consider these semantics and reach the same conclusion, but IMO, something like "try your best to emit only the final, but I prefer that you emit intermediate than hit OOM" is the kind of thing that users ask for, but don't really want. I.e., the semantics would be very esoteric and almost certainly wrong for most use cases. One guiding principle of the feature is that either the downstream can handle duplicates and therefore doesn't need "final results only", or it cannot handle duplicates and therefore really needs "final results only". This seems to come down to fundamental properties of the downstream system, with no grey area. Certainly, even downstreams that can handle duplicates would still be interested in caching/rate-limiting/etc to control the workload, but that's not the same thing as asking for final results only. Regarding the paragraph that starts, "With regards to "point in time" queries...", I think maybe I wasn't clear. Using the taxonomy you clarified, I was talking about "batch results back"-type queries (I issue the query and get a single tabular result, and that's it, just like traditional RDBMS). It still makes sense to issue such a query, and only want to see "final" results for a windowed aggregation. Consider: as a human being, I want to issue an ad-hoc query to find out the answer to the example that you led with. I'm not writing an application, I just need to get an answer to this specific question right now. Still, I don't want to get false positives from incomplete data, so I wish to only query over closed windows. Further, note that I might be querying a persistent object that I didn't create (a table created by another query), so I might not have a-priori knowledge of the exact structure of the window definition and therefore am not in a good position to scope the query itself to only cover closed windows. Thanks for the link to https://arxiv.org/pdf/1905.12133.pdf . What I actually meant was how they define the concepts analogous to Finally, regarding grace period: This is correct, grace period is a property of the window. If KSQL is using the default from Streams, note that the default is 24 hours. This means that if you run an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love this @agavra and I think our users will to!
Some quick thoughts from reading through:
- As @vvcephei mentioned, the default grace period is 24 hours, so we'll need a way of users defining the grace period for a windowed table. This could be another KLIP, but will need doing before / at the same time, as
EMIT FINAL
is pretty useless without it. BTW, there's also a bug we'd be fixing: if you create a window larger than 24 hours it currently blows up. (There's a git hub issue for that somewhere). - I'd with @vvcephei on keeping it simple initially. I think just looking to add
EMIT FINAL
initially would give most benefit. We can then look at a follow up KLIP to add rate limiting viaEMIT
if there is call/need e.g.EMIT AFTER [duration] [time_type]
. As @vvcephei mentioned, what type of time is important and it may not make sense to implement this until we have wall-clock time. - At the moment pull queries are implicitly
EMIT FINAL
. You'll see this in the code once you start working on this. When we add support forEMIT FINAL
to our SQL syntax we should allow, (and ignore),EMIT FINAL
on pull queries, i.e. this should be in scope. SoSELECT * FROM X WHERE ROWKEY=x;
is equivalent toSELECT * FROM X WHERE ROWKEY=x EMIT FINAL
. Why? Because the user only wants the final result for the row.
Let me know once you've updated the KLIP and I'll take another look!
Should this not be invalid syntax? |
From the discussion in KLIP-8, the proposition was that for a given query, e.g. SELECT * FROM FOO WHERE ROWKEY=x; (Where FOO is a table thats computing an aggregate, e.g. You can either request the changes of how the final result was built, using
So in this sense both Of course, that's not taking into account KLIP-11, which proposing using However, this klip currently stands alone, or rather sits on top of klip-8. As such the |
Oh, I see. There was some misunderstanding from my side what was meat by What I am really wondering though is, how current pull queries implement "emit final". I am also wondering about this statement
"current" and "final" seem to contradict each other. My understanding was (maybe it's incorrect), that KSQL implements pull queries via IQ and thus, if a user does a lookup for a key (ie, a windowed-key in our example), the pull query would go against the current table state and there is no notion of "final" for this case. Kafka Streams does not provide any means to know from outside if the retention time passed: hence, how do you know that you can or cannot return the result of such a pull query, if you don't know if the result is final or not? The only way to implement "final pull queries" (and AFAIK KSQL does not do it this way atm) would be to use Furthermore, this example actually illustrates a bigger issue of the KLIP-8 design (that KLIP-10 inherits) and also shows the misunderstanding that I mentioned above. If I understand you correctly, there are two types of queries atm: SELECT * FROM FOO WHERE ROWKEY=x; // pull query; implicitly EMIT FINAL
SELECT * FROM FOO EMIT CHANGE; // push query
// emits a stream with all intermediate updates
// (minus record caching de-duplication) as result However, how can one get a result STREAM, that does only contain the final result per window. One cannot say SELECT * FROM FOO EMIT FINAL; because this would be pull query... Note that the main purpose do Given my current understanding how KSQL works, CREATE TABLE FOO AS SELECT COUNT() FROM BAR GROUP BY ROWKEY WINDOW BY... EMIT FINAL; Only if This is what I meant by "emit final" is a property of the operator, but not the query. A windowed-aggregation can still be a CQ, but only emits a single result per window -- however, "emit final" does not make the CQ a "lookup query". However, KLIP-8/KLIP-10 propose that "emit final" is for pull queries while "emit change" is for push queries---this approach seems to fall short. |
I think you are not right here. |
I guess that is the current open question... What does
Or do I misunderstand what @big-andy-coates says? I am confused atm who claims what... If you have an windowed aggregation persistent push query, you can emit the state store update to the result topic either continuously (ie, for every update) or once (ie, "final" via suppress()). Similarly, you can query the result table state store and want to query any intermediate window result or only final window result. Ie, it's basically 4 types of queries... So far, KSQL only supported continuously pushing all intermediate result into the result table. For the new pull queries, it seems that they would query any intermediate window result, too (correct me if I am wrong). It's unclear to me what |
I just thought of this again while reading over some other streaming SQL papers. Maybe something like the following matrix would clear it up?
... or have I gotten myself confused? |
I think we've dug ourselves into a little bit of a hole with KLIP-8, and now that we have much more experience with what we want we can define things much better. That being said, I think we should hold off this KLIP until #3799 (KLIP-11) is in a stable state (perhaps not necessarily implemented). And now I'm going to contradict myself and offer some more thoughts 😂 I wrote this KLIP with the intention that @vvcephei since streams doesn't really implement a "suppress IQ", I didn't think about the case that you have listed for This indeed does differ from what KLIP-8 suggests, and perhaps I could have prevented lots of confusion by pointing that out. |
Linking issue for reference: #1030 |
Thanks for the input @vvcephei and @agavra -- that clarifies a lot! While also see a dependency with regard to KLIP-11, I am still wondering if both KLIPs could make progress in parallel? What is the importance of KLIP-8 with regard to timeline? Even if we might change some syntax later, the internal implementation would most like not change? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
KLIP 10 - Add Suppress To KSQL
Author: agavra |
Release Target: 5.5 |
Status: In Discussion |
Discussion: link to the design discussion PR
tl;dr: There have been many requests from the community to be able to the control continuous
refinement policy of underlying KStreams applications.
KTable#suppress
allows suchcontrol and should be given corresponding KSQL syntax.