-
Notifications
You must be signed in to change notification settings - Fork 606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(streaming): watermark on append only table #8207
Conversation
…hao/watermark_append_only_table
Codecov Report
@@ Coverage Diff @@
## main #8207 +/- ##
==========================================
- Coverage 71.70% 71.70% -0.01%
==========================================
Files 1131 1131
Lines 182341 182390 +49
==========================================
+ Hits 130748 130775 +27
- Misses 51593 51615 +22
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! we can add some e2e tests to test if we can filter the outdated data. (also fine in next pr)
@@ -13,3 +13,13 @@ | |||
└─StreamRowIdGen { row_id_index: 1 } | |||
└─StreamWatermarkFilter { watermark_descs: [idx: 0, expr: (v1 - '00:00:01':Interval)] } | |||
└─StreamSource { source: "t", columns: ["v1", "_row_id"] } | |||
- name: watermark on append only table | |||
sql: | | |||
explain create table t (v1 timestamp with time zone, watermark for v1 as v1 - INTERVAL '1' SECOND) with (connector = 'kafka', kafka.topic = 'kafka_3_partition_topic', kafka.brokers = '127.0.0.1:1234', kafka.scan.startup.mode='earliest', appendonly=true) ROW FORMAT JSON; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
emm, our usual usage is without the external connector. So maybe removing it is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can keep both behaviors undocumented for test. I'll add a new planner test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM. BTW where is the "append only" part of this PR? 👀
if !append_only && !watermark_descs.is_empty() { | ||
return Err(ErrorCode::NotSupported( | ||
"Defining watermarks on table requires the table to be append only.".to_owned(), | ||
"Set the option `appendonly=true`".to_owned(), | ||
) | ||
.into()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xx01cyx append only part
if let Some(catalog) = self.source_catalog() && !catalog.watermark_descs.is_empty() && !self.core.for_table{ | ||
plan = StreamWatermarkFilter::new(plan, catalog.watermark_descs.clone()).into(); | ||
} | ||
|
||
assert!(!(self.core.gen_row_id && self.core.for_table)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May add some comments here.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Checklist For Contributors
Checklist For Reviewers
Documentation
Click here for Documentation
Types of user-facing changes
Please keep the types that apply to your changes, and remove the others.
Release note