-
Notifications
You must be signed in to change notification settings - Fork 608
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(stream): create cdc table reader and source data stream with retry #19467
Conversation
59a7b72
to
3d94aa5
Compare
1886913
to
bb5f52b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM!
let mut need_resume_after_build = false; | ||
// loop to create source stream until success | ||
loop { | ||
if let Some(barrier) = build_source_stream_and_poll_barrier( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI. Link a Slack conversation here.
@tabVersion: It also affects the process of creating table, auto retry here means the create mv / create table command will always succeed, even if meeting some runtime error, such as Kafka group auth issue. It prevents rw from consuming data and requires the user setting up the ACL. We can only find this issue when building an actor.
So does this issue get noticed here? What happen if I create a source with some bad settings (such the permission issue mentioned above)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It prevents rw from consuming data and requires the user setting up the ACL. We can only find this issue when building an actor.
What happen if I create a source with some bad settings (such the permission issue mentioned above)?
I think MQ source enumerator should employ a validation logic to check these prerequisites, so that prerequisite for connectivity should be no problem when we are creating executors. But I am not sure whether we have the check for MQ sources. cc @tabVersion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for relying on the validation logic.
(Just saying, if someone is too lazy to write a full set of validation, they may just build a real Consumer
and see if anything goes well.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SourceExecutor generally LGTM. It would be better to have some e2e tests though.
loop { | ||
if let Some(barrier) = build_source_stream_and_poll_barrier( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: while let
might be better than loop { if let
@@ -736,6 +828,29 @@ impl<S: StateStore> SourceExecutor<S> { | |||
} | |||
} | |||
|
|||
async fn build_source_stream_and_poll_barrier( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe this should be called
async fn build_source_stream_and_poll_barrier( | |
async fn build_source_stream_or_receive_barrier( |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Previously we try to create the table reader in
src/stream/src/from_proto/stream_cdc_scan.rs
, if upstream database is down, the executor cannot be built so that the cluster will enter recovery loop. This pr try to create the table reader after we forward the initial barrier, so that it won't block recovery if cannot connect to upstream db.We also add retry logic to the source executor when we create the data stream at first time.
close #17807
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.