Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): create cdc table reader and source data stream with retry (#19467) #19676

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion e2e_test/source_inline/kafka/issue_19563.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ cat <<EOF | rpk topic produce test-topic-19563
{"v1": "0001-01-01 21:00:00"}
EOF

sleep 3s

sleep 6s

statement ok
flush;

# Below lower bound and above upper bound are not shown
query I
Expand Down
22 changes: 5 additions & 17 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field,
use risingwave_common::types::{Datum, JsonbVal};
use risingwave_common::util::epoch::{test_epoch, EpochExt};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::cdc::external::mock_external_table::MockExternalTableReader;
use risingwave_connector::source::cdc::external::mysql::MySqlOffset;
use risingwave_connector::source::cdc::external::{
DebeziumOffset, DebeziumSourceOffset, ExternalTableReaderImpl, SchemaTableName,
CdcTableType, DebeziumOffset, DebeziumSourceOffset, ExternalTableConfig, SchemaTableName,
};
use risingwave_connector::source::cdc::DebeziumCdcSplit;
use risingwave_connector::source::SplitImpl;
Expand Down Expand Up @@ -160,19 +158,6 @@ async fn test_cdc_backfill() -> StreamResult<()> {
MockOffsetGenExecutor::new(source).boxed(),
);

let binlog_file = String::from("1.binlog");
// mock binlog watermarks for backfill
// initial low watermark: 1.binlog, pos=2 and expected behaviors:
// - ignore events before (1.binlog, pos=2);
// - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot
let binlog_watermarks = vec![
MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark
MySqlOffset::new(binlog_file.clone(), 4),
MySqlOffset::new(binlog_file.clone(), 6),
MySqlOffset::new(binlog_file.clone(), 8),
MySqlOffset::new(binlog_file.clone(), 10),
];

let table_name = SchemaTableName {
schema_name: "public".to_string(),
table_name: "mock_table".to_string(),
Expand All @@ -183,11 +168,14 @@ async fn test_cdc_backfill() -> StreamResult<()> {
]);
let table_pk_indices = vec![0];
let table_pk_order_types = vec![OrderType::ascending()];
let config = ExternalTableConfig::default();

let external_table = ExternalStorageTable::new(
TableId::new(1234),
table_name,
"mydb".to_string(),
ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)),
config,
CdcTableType::Mock,
table_schema.clone(),
table_pk_order_types,
table_pk_indices.clone(),
Expand Down
14 changes: 13 additions & 1 deletion src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,19 @@ pub struct MockExternalTableReader {
}

impl MockExternalTableReader {
pub fn new(binlog_watermarks: Vec<MySqlOffset>) -> Self {
pub fn new() -> Self {
let binlog_file = String::from("1.binlog");
// mock binlog watermarks for backfill
// initial low watermark: 1.binlog, pos=2 and expected behaviors:
// - ignore events before (1.binlog, pos=2);
// - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot
let binlog_watermarks = vec![
MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark
MySqlOffset::new(binlog_file.clone(), 4),
MySqlOffset::new(binlog_file.clone(), 6),
MySqlOffset::new(binlog_file.clone(), 8),
MySqlOffset::new(binlog_file.clone(), 10),
];
Self {
binlog_watermarks,
snapshot_cnt: AtomicUsize::new(0),
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ use crate::source::cdc::external::sql_server::{
use crate::source::cdc::CdcSourceType;
use crate::WithPropertiesExt;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum CdcTableType {
Undefined,
Mock,
MySql,
Postgres,
SqlServer,
Expand Down Expand Up @@ -101,6 +102,7 @@ impl CdcTableType {
Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer(
SqlServerExternalTableReader::new(config, schema, pk_indices).await?,
)),
Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())),
_ => bail!("invalid external table type: {:?}", *self),
}
}
Expand Down Expand Up @@ -218,7 +220,7 @@ pub enum ExternalTableReaderImpl {
Mock(MockExternalTableReader),
}

#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Default, Clone, Deserialize)]
pub struct ExternalTableConfig {
pub connector: String,

Expand Down
111 changes: 95 additions & 16 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::pin::Pin;

use either::Either;
Expand All @@ -27,9 +28,11 @@ use risingwave_connector::parser::{
ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties,
ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig,
};
use risingwave_connector::source::cdc::external::CdcOffset;
use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl};
use risingwave_connector::source::{SourceColumnDesc, SourceContext};
use rw_futures_util::pausable;
use thiserror_ext::AsReport;
use tracing::Instrument;

use crate::executor::backfill::cdc::state::CdcBackfillState;
use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
Expand All @@ -42,6 +45,7 @@ use crate::executor::backfill::utils::{
use crate::executor::backfill::CdcScanOptions;
use crate::executor::monitor::CdcBackfillMetrics;
use crate::executor::prelude::*;
use crate::executor::source::get_infinite_backoff_strategy;
use crate::executor::UpdateMutation;
use crate::task::CreateMviewProgressReporter;

Expand Down Expand Up @@ -140,7 +144,6 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let upstream_table_name = self.external_table.qualified_table_name();
let schema_table_name = self.external_table.schema_table_name().clone();
let external_database_name = self.external_table.database_name().to_owned();
let upstream_table_reader = UpstreamTableReader::new(self.external_table);

let additional_columns = self
.output_columns
Expand All @@ -159,38 +162,94 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let first_barrier = expect_first_barrier(&mut upstream).await?;

let mut is_snapshot_paused = first_barrier.is_pause_on_startup();
let first_barrier_epoch = first_barrier.epoch;
// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);
let mut rate_limit_to_zero = self.rate_limit_rps.is_some_and(|val| val == 0);

// Check whether this parallelism has been assigned splits,
// if not, we should bypass the backfill directly.
let mut state_impl = self.state_impl;

let mut upstream = transform_upstream(upstream, &self.output_columns)
.boxed()
.peekable();

state_impl.init_epoch(first_barrier.epoch);
state_impl.init_epoch(first_barrier_epoch);

// restore backfill state
let state = state_impl.restore_state().await?;
current_pk_pos = state.current_pk_pos.clone();

let to_backfill = !self.options.disable_backfill && !state.is_finished;

// The first barrier message should be propagated.
yield Message::Barrier(first_barrier);
let need_backfill = !self.options.disable_backfill && !state.is_finished;

// Keep track of rows from the snapshot.
let mut total_snapshot_row_count = state.row_count as u64;

// After init the state table and forward the initial barrier to downstream,
// we now try to create the table reader with retry.
// If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader;
// If backfill is finished, we should forward the upstream cdc events to downstream.
let mut table_reader: Option<ExternalTableReaderImpl> = None;
let external_table = self.external_table.clone();
let mut future = Box::pin(async move {
let backoff = get_infinite_backoff_strategy();
tokio_retry::Retry::spawn(backoff, || async {
match external_table.create_table_reader().await {
Ok(reader) => Ok(reader),
Err(e) => {
tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
Err(e)
}
}
})
.instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
.await
.expect("Retry create cdc table reader until success.")
});
loop {
if let Some(msg) =
build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
.await?
{
match msg {
Message::Barrier(barrier) => {
// commit state to bump the epoch of state table
state_impl.commit_state(barrier.epoch).await?;
yield Message::Barrier(barrier);
}
Message::Chunk(chunk) => {
if need_backfill {
// ignore chunk if we need backfill, since we can read the data from the snapshot
} else {
// forward the chunk to downstream
yield Message::Chunk(chunk);
}
}
Message::Watermark(_) => {
// ignore watermark
}
}
} else {
assert!(table_reader.is_some(), "table reader must created");
tracing::info!(
table_id,
upstream_table_name,
"table reader created successfully"
);
break;
}
}

let upstream_table_reader = UpstreamTableReader::new(
self.external_table.clone(),
table_reader.expect("table reader must created"),
);

let mut upstream = transform_upstream(upstream, &self.output_columns)
.boxed()
.peekable();
let mut last_binlog_offset: Option<CdcOffset> = state
.last_cdc_offset
.map_or(upstream_table_reader.current_cdc_offset().await?, Some);

let offset_parse_func = upstream_table_reader
.inner()
.table_reader()
.get_cdc_offset_parser();
let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
let mut consumed_binlog_offset: Option<CdcOffset> = None;

tracing::info!(
Expand Down Expand Up @@ -227,7 +286,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// finished.
//
// Once the backfill loop ends, we forward the upstream directly to the downstream.
if to_backfill {
if need_backfill {
// drive the upstream changelog first to ensure we can receive timely changelog event,
// otherwise the upstream changelog may be blocked by the snapshot read stream
let _ = Pin::new(&mut upstream).peek().await;
Expand Down Expand Up @@ -702,6 +761,26 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
}
}

async fn build_reader_and_poll_upstream(
upstream: &mut BoxedMessageStream,
table_reader: &mut Option<ExternalTableReaderImpl>,
future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
) -> StreamExecutorResult<Option<Message>> {
if table_reader.is_some() {
return Ok(None);
}
tokio::select! {
biased;
reader = &mut *future => {
*table_reader = Some(reader);
Ok(None)
}
msg = upstream.next() => {
msg.transpose()
}
}
}

#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) {
let props = SpecificParserConfig {
Expand Down
36 changes: 30 additions & 6 deletions src/stream/src/executor/backfill/cdc/upstream_table/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

use risingwave_common::catalog::{Schema, TableId};
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::source::cdc::external::{ExternalTableReaderImpl, SchemaTableName};
use risingwave_connector::error::ConnectorResult;
use risingwave_connector::source::cdc::external::{
CdcOffset, CdcTableType, ExternalTableConfig, ExternalTableReader, ExternalTableReaderImpl,
SchemaTableName,
};

/// This struct represents an external table to be read during backfill
#[derive(Debug, Clone)]
pub struct ExternalStorageTable {
/// Id for this table.
table_id: TableId,
Expand All @@ -28,7 +33,9 @@ pub struct ExternalStorageTable {

database_name: String,

table_reader: ExternalTableReaderImpl,
config: ExternalTableConfig,

table_type: CdcTableType,

/// The schema of the output columns, i.e., this table VIEWED BY some executor like
/// `RowSeqScanExecutor`.
Expand All @@ -43,14 +50,16 @@ pub struct ExternalStorageTable {
}

impl ExternalStorageTable {
#[allow(clippy::too_many_arguments)]
pub fn new(
table_id: TableId,
SchemaTableName {
table_name,
schema_name,
}: SchemaTableName,
database_name: String,
table_reader: ExternalTableReaderImpl,
config: ExternalTableConfig,
table_type: CdcTableType,
schema: Schema,
pk_order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
Expand All @@ -60,7 +69,8 @@ impl ExternalStorageTable {
table_name,
schema_name,
database_name,
table_reader,
config,
table_type,
schema,
pk_order_types,
pk_indices,
Expand Down Expand Up @@ -90,8 +100,14 @@ impl ExternalStorageTable {
}
}

pub fn table_reader(&self) -> &ExternalTableReaderImpl {
&self.table_reader
pub async fn create_table_reader(&self) -> ConnectorResult<ExternalTableReaderImpl> {
self.table_type
.create_table_reader(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
)
.await
}

pub fn qualified_table_name(&self) -> String {
Expand All @@ -101,4 +117,12 @@ impl ExternalStorageTable {
pub fn database_name(&self) -> &str {
self.database_name.as_str()
}

pub async fn current_cdc_offset(
&self,
table_reader: &ExternalTableReaderImpl,
) -> ConnectorResult<Option<CdcOffset>> {
let binlog = table_reader.current_cdc_offset().await?;
Ok(Some(binlog))
}
}
Loading
Loading