From 353a5c9721a13c578bec0cb55060cdeaadc66599 Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 16 Dec 2024 23:36:40 +0800 Subject: [PATCH] refine --- .../backfill/snapshot_backfill/executor.rs | 203 +++++------------- .../backfill/snapshot_backfill/state.rs | 203 ++++++++++-------- src/stream/src/task/stream_manager.rs | 2 +- 3 files changed, 162 insertions(+), 246 deletions(-) diff --git a/src/stream/src/executor/backfill/snapshot_backfill/executor.rs b/src/stream/src/executor/backfill/snapshot_backfill/executor.rs index 1f3c41389e922..5db8e864bb216 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill/executor.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill/executor.rs @@ -21,7 +21,6 @@ use std::sync::Arc; use anyhow::anyhow; use futures::future::{try_join_all, Either}; use futures::{pin_mut, Stream, TryFutureExt, TryStreamExt}; -use risingwave_common::array::stream_record::Record; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::metrics::LabelGuardedIntCounter; @@ -90,7 +89,7 @@ impl SnapshotBackfillExecutor { barrier_rx: UnboundedReceiver, metrics: Arc, snapshot_epoch: Option, - ) -> StreamExecutorResult { + ) -> Self { assert_eq!(&upstream.info.schema, upstream_table.schema()); let Some(pk_in_output_indices) = upstream_table.pk_in_output_indices() else { panic!( @@ -106,11 +105,7 @@ impl SnapshotBackfillExecutor { "create snapshot backfill executor with rate limit" ); } - VnodeBackfillProgress::validate_progress_table_schema( - progress_state_table.get_data_types(), - upstream_table.pk_serializer().get_data_types(), - )?; - Ok(Self { + Self { upstream_table, pk_in_output_indices, progress_state_table, @@ -123,12 +118,11 @@ impl SnapshotBackfillExecutor { actor_ctx, metrics, snapshot_epoch, - }) + } } #[try_stream(ok = Message, error = StreamExecutorError)] async fn execute_inner(mut self) { - let table_id = self.upstream_table.table_id(); debug!("snapshot backfill executor start"); let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?; debug!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier"); @@ -156,15 +150,18 @@ impl SnapshotBackfillExecutor { }; let first_recv_barrier_epoch = first_recv_barrier.epoch; yield Message::Barrier(first_recv_barrier); - self.progress_state_table - .init_epoch(first_recv_barrier_epoch) - .await?; + let mut backfill_state = BackfillState::new( + self.progress_state_table, + first_recv_barrier_epoch, + self.upstream_table.pk_serializer().clone(), + ) + .await?; - let (mut barrier_epoch, mut need_report_finish, mut backfill_state) = { + let (mut barrier_epoch, mut need_report_finish) = { if should_backfill { - let mut backfill_state = - BackfillState::new([], self.upstream_table.pk_serializer().clone()); - assert!(backfill_state.latest_progress().next().is_none()); + assert!(backfill_state + .latest_progress() + .all(|(_, progress)| progress.is_none())); let table_id_str = format!("{}", self.upstream_table.table_id().table_id); let actor_id_str = format!("{}", self.actor_ctx.id); @@ -198,7 +195,6 @@ impl SnapshotBackfillExecutor { &mut self.barrier_rx, &mut self.progress, &mut backfill_state, - &mut self.progress_state_table, first_recv_barrier_epoch, ); @@ -217,7 +213,7 @@ impl SnapshotBackfillExecutor { let recv_barrier = self.barrier_rx.recv().await.expect("should exist"); assert_eq!(first_upstream_barrier.epoch, recv_barrier.epoch); - self.progress_state_table.commit(recv_barrier.epoch).await?; + backfill_state.commit(recv_barrier.epoch).await?; yield Message::Barrier(recv_barrier); } @@ -287,16 +283,7 @@ impl SnapshotBackfillExecutor { self.upstream_table.vnodes().iter_vnodes(), barrier.epoch.prev, ); - for (_vnode, old_row, new_row) in backfill_state.uncommitted_state() { - let record = if let Some(old_row) = old_row { - Record::Update { old_row, new_row } - } else { - Record::Insert { new_row } - }; - self.progress_state_table.write_record(record); - } - self.progress_state_table.commit(barrier.epoch).await?; - backfill_state.mark_committed(); + backfill_state.commit(barrier.epoch).await?; let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id); yield Message::Barrier(barrier); if update_vnode_bitmap.is_some() { @@ -307,44 +294,28 @@ impl SnapshotBackfillExecutor { } } } - info!( ?barrier_epoch, table_id = self.upstream_table.table_id().table_id, "finish consuming log store" ); - (barrier_epoch, true, backfill_state) + (barrier_epoch, true) } else { - let pk_serializer = self.upstream_table.pk_serializer(); - let progress = try_join_all(self.progress_state_table.vnodes().iter_vnodes().map( - |vnode| { - self.progress_state_table - .get_row([vnode.to_datum()]) - .map_ok(move |progress_row| { - let progress = VnodeBackfillProgress::from_row( - &progress_row.expect( - "should exist because all vnode have same epoch progress", - ), - pk_serializer, - ); - let expected_progress = VnodeBackfillProgress { - epoch: first_upstream_barrier.epoch.prev, - progress: EpochBackfillProgress::Consumed, - }; - assert_eq!(progress, expected_progress, "vnode: {:?}", vnode); - (vnode, progress) - }) - }, - )) - .await?; - let backfill_state = - BackfillState::new(progress, self.upstream_table.pk_serializer().clone()); + backfill_state + .latest_progress() + .for_each(|(vnode, progress)| { + let expected_progress = VnodeBackfillProgress { + epoch: first_upstream_barrier.epoch.prev, + progress: EpochBackfillProgress::Consumed, + }; + assert_eq!(progress, Some(&expected_progress), "vnode: {:?}", vnode); + }); info!( table_id = self.upstream_table.table_id().table_id, "skip backfill" ); assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch); - (first_upstream_barrier.epoch, false, backfill_state) + (first_upstream_barrier.epoch, false) } }; let mut upstream = self.upstream.into_executor(self.barrier_rx).execute(); @@ -358,93 +329,34 @@ impl SnapshotBackfillExecutor { barrier.epoch.prev, ); let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id); - let is_scale = - update_vnode_bitmap.is_some() || barrier.is_stop(self.actor_ctx.id); - if is_scale { - use itertools::Itertools; - info!(?table_id, vnodes = ?self.upstream_table.vnodes().iter_vnodes().collect_vec(), prev_epoch = barrier.epoch.prev, "write finish epoch"); - } barrier_epoch = barrier.epoch; if need_report_finish { need_report_finish = false; self.progress.finish_consuming_log_store(barrier_epoch); } - for (vnode, old_row, new_row) in backfill_state.uncommitted_state() { - let record = if let Some(old_row) = old_row { - Record::Update { old_row, new_row } - } else { - Record::Insert { new_row } - }; - if is_scale { - info!( - ?table_id, - ?vnode, - ?record, - "write record before update vnode bitmap" - ); - } - self.progress_state_table.write_record(record); - } - self.progress_state_table.commit(barrier.epoch).await?; - backfill_state.mark_committed(); + backfill_state.commit(barrier.epoch).await?; + yield Message::Barrier(barrier); if let Some(new_vnode_bitmap) = update_vnode_bitmap { - yield Message::Barrier(barrier); - self.progress_state_table - .try_wait_committed_epoch(barrier_epoch.prev) - .await?; let _prev_vnode_bitmap = self .upstream_table .update_vnode_bitmap(new_vnode_bitmap.clone()); - let (_, _) = self - .progress_state_table - .update_vnode_bitmap(new_vnode_bitmap); - let pk_serializer = self.upstream_table.pk_serializer(); - // try_join_all((0..256).map( - // |vnode| { - // let vnode = VirtualNode::from_index(vnode); - // // TODO: remove the clone - // let pk_serializer = pk_serializer.clone(); - // self.progress_state_table - // .get_row([vnode.to_datum()]) - // .map_ok(move |progress_row| { - // let progress = VnodeBackfillProgress::from_row( - // &progress_row.expect("should exist because all vnode have same epoch progress"), - // &pk_serializer, - // ); - // let expected_progress = VnodeBackfillProgress { - // epoch: barrier_epoch.prev, - // progress: EpochBackfillProgress::Consumed, - // }; - // assert_eq!(progress, expected_progress, "{:?}", vnode); - // }) - // }, - // )) - // .await?; - let progress = - try_join_all(self.progress_state_table.vnodes().iter_vnodes().map( - |vnode| { - // TODO: remove the clone - let pk_serializer = pk_serializer.clone(); - self.progress_state_table - .get_row([vnode.to_datum()]) - .map_ok(move |progress_row| { - let progress = VnodeBackfillProgress::from_row( - &progress_row.expect("should exist because all vnode have same epoch progress"), - &pk_serializer, - ); - let expected_progress = VnodeBackfillProgress { - epoch: barrier_epoch.prev, - progress: EpochBackfillProgress::Consumed, - }; - assert_eq!(progress, expected_progress, "{:?}", vnode); - (vnode, progress) - }) - }, - )) + backfill_state + .update_vnode_bitmap(new_vnode_bitmap, barrier_epoch) .await?; - backfill_state.update_vnode_bitmap(progress); - } else { - yield Message::Barrier(barrier); + let expected_progress = VnodeBackfillProgress { + epoch: barrier_epoch.prev, + progress: EpochBackfillProgress::Consumed, + }; + backfill_state + .latest_progress() + .for_each(|(vnode, progress)| { + assert_eq!( + progress, + Some(&expected_progress), + "vnode {:?} has unexpected progress", + vnode + ); + }); } } msg => { @@ -712,8 +624,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( rate_limit: Option, barrier_rx: &'a mut UnboundedReceiver, progress: &'a mut CreateMviewProgressReporter, - backfill_state: &'a mut BackfillState, - progress_state_table: &'a mut StateTable, + backfill_state: &'a mut BackfillState, first_recv_barrier_epoch: EpochPair, ) { let mut barrier_epoch = first_recv_barrier_epoch; @@ -778,16 +689,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( } }) .await?; - for (_vnode, old_row, new_row) in backfill_state.uncommitted_state() { - let record = if let Some(old_row) = old_row { - Record::Update { old_row, new_row } - } else { - Record::Insert { new_row } - }; - progress_state_table.write_record(record); - } - progress_state_table.commit(barrier.epoch).await?; - backfill_state.mark_committed(); + backfill_state.commit(barrier.epoch).await?; debug!(?barrier_epoch, count, epoch_row_count, "update progress"); progress.update(barrier_epoch, barrier_epoch.prev, count as _); epoch_row_count = 0; @@ -816,16 +718,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( backfill_state.finish_epoch([vnode], snapshot_epoch); }) .await?; - for (_vnode, old_row, new_row) in backfill_state.uncommitted_state() { - let record = if let Some(old_row) = old_row { - Record::Update { old_row, new_row } - } else { - Record::Insert { new_row } - }; - progress_state_table.write_record(record); - } - progress_state_table.commit(barrier_epoch).await?; - backfill_state.mark_committed(); + backfill_state.commit(barrier_epoch).await?; progress.finish(barrier_epoch, count as _); yield Message::Barrier(barrier_to_report_finish); @@ -834,7 +727,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( let barrier = receive_next_barrier(barrier_rx).await?; assert_eq!(barrier.epoch.prev, barrier_epoch.curr); barrier_epoch = barrier.epoch; - progress_state_table.commit(barrier.epoch).await?; + backfill_state.commit(barrier.epoch).await?; yield Message::Barrier(barrier); if barrier_epoch.curr == snapshot_epoch { break; diff --git a/src/stream/src/executor/backfill/snapshot_backfill/state.rs b/src/stream/src/executor/backfill/snapshot_backfill/state.rs index e03bc7878edac..dc8a633099b05 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill/state.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill/state.rs @@ -15,9 +15,13 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::mem::replace; +use std::sync::Arc; use anyhow::anyhow; -use risingwave_common::hash::VirtualNode; +use futures::future::try_join_all; +use futures::TryFutureExt; +use risingwave_common::bitmap::Bitmap; +use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; use risingwave_common::must_match; use risingwave_common::row::{OwnedRow, Row, RowExt}; use risingwave_common::types::{DataType, ScalarImpl}; @@ -29,7 +33,7 @@ pub(super) enum EpochBackfillProgress { Consumed, } -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq)] pub(super) struct VnodeBackfillProgress { pub(super) epoch: u64, pub(super) progress: EpochBackfillProgress, @@ -39,7 +43,7 @@ pub(super) struct VnodeBackfillProgress { const EXTRA_COLUMN_TYPES: [DataType; 3] = [DataType::Int16, DataType::Int64, DataType::Boolean]; impl VnodeBackfillProgress { - pub(super) fn validate_progress_table_schema( + fn validate_progress_table_schema( progress_table_column_types: &[DataType], upstream_pk_column_types: &[DataType], ) -> StreamExecutorResult<()> { @@ -108,8 +112,29 @@ impl VnodeBackfillProgress { }, } } + + fn build_row<'a>( + &'a self, + vnode: VirtualNode, + consumed_pk_rows: &'a OwnedRow, + ) -> impl Row + 'a { + let (is_finished, pk) = match &self.progress { + EpochBackfillProgress::Consuming { latest_pk } => { + assert_eq!(latest_pk.len(), consumed_pk_rows.len()); + (false, latest_pk) + } + EpochBackfillProgress::Consumed => (true, consumed_pk_rows), + }; + [ + Some(ScalarImpl::Int16(vnode.to_scalar())), + Some(ScalarImpl::Int64(self.epoch as _)), + Some(ScalarImpl::Bool(is_finished)), + ] + .chain(pk) + } } +#[derive(Debug, Eq, PartialEq)] enum VnodeBackfillState { New(VnodeBackfillProgress), Update { @@ -160,68 +185,61 @@ impl VnodeBackfillState { } } -mod progress_row { - use risingwave_common::hash::VirtualNode; - use risingwave_common::row::{OwnedRow, Row, RowExt}; - use risingwave_common::types::ScalarImpl; - - use crate::executor::backfill::snapshot_backfill::state::{ - EpochBackfillProgress, VnodeBackfillProgress, - }; - - pub(in super::super) type BackfillProgressRow<'a> = impl Row + 'a; - - impl VnodeBackfillProgress { - pub(super) fn build_row<'a>( - &'a self, - vnode: VirtualNode, - consumed_pk_rows: &'a OwnedRow, - ) -> BackfillProgressRow<'a> { - let (is_finished, pk) = match &self.progress { - EpochBackfillProgress::Consuming { latest_pk } => { - assert_eq!(latest_pk.len(), consumed_pk_rows.len()); - (false, latest_pk) - } - EpochBackfillProgress::Consumed => (true, consumed_pk_rows), - }; - [ - Some(ScalarImpl::Int16(vnode.to_scalar())), - Some(ScalarImpl::Int64(self.epoch as _)), - Some(ScalarImpl::Bool(is_finished)), - ] - .chain(pk) - } - } -} - -pub(super) use progress_row::*; +use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; +use risingwave_storage::StateStore; +use crate::executor::prelude::StateTable; use crate::executor::StreamExecutorResult; -pub(super) struct BackfillState { +pub(super) struct BackfillState { vnode_state: HashMap, pk_serde: OrderedRowSerde, consumed_pk_rows: OwnedRow, + state_table: StateTable, } -impl BackfillState { - pub(super) fn new( - committed_progress: impl IntoIterator, +impl BackfillState { + pub(super) async fn new( + mut state_table: StateTable, + init_epoch: EpochPair, pk_serde: OrderedRowSerde, - ) -> Self { + ) -> StreamExecutorResult { + VnodeBackfillProgress::validate_progress_table_schema( + state_table.get_data_types(), + pk_serde.get_data_types(), + )?; + state_table.init_epoch(init_epoch).await?; let mut vnode_state = HashMap::new(); - for (vnode, progress) in committed_progress { + let committed_progress_row = Self::load_vnode_progress_row(&state_table).await?; + for (vnode, progress_row) in committed_progress_row { + let Some(progress_row) = progress_row else { + continue; + }; + let progress = VnodeBackfillProgress::from_row(&progress_row, &pk_serde); assert!(vnode_state .insert(vnode, VnodeBackfillState::Committed(progress)) .is_none()); } let consumed_pk_rows = OwnedRow::new(vec![None; pk_serde.get_data_types().len()]); - Self { + Ok(Self { vnode_state, pk_serde, consumed_pk_rows, - } + state_table, + }) + } + + async fn load_vnode_progress_row( + state_table: &StateTable, + ) -> StreamExecutorResult)>> { + let rows = try_join_all(state_table.vnodes().iter_vnodes().map(|vnode| { + state_table + .get_row([vnode.to_datum()]) + .map_ok(move |progress_row| (vnode, progress_row)) + })) + .await?; + Ok(rows) } fn update_progress(&mut self, vnode: VirtualNode, progress: VnodeBackfillProgress) { @@ -298,60 +316,65 @@ impl BackfillState { pub(super) fn latest_progress( &self, - ) -> impl Iterator { - self.vnode_state - .iter() - .map(|(vnode, state)| (*vnode, VnodeBackfillState::latest_progress(state))) - } - - pub(super) fn uncommitted_state( - &self, - ) -> impl Iterator< - Item = ( - VirtualNode, - Option>, - BackfillProgressRow<'_>, - ), - > + '_ { - self.vnode_state - .iter() - .filter_map(|(vnode, state)| match state { - VnodeBackfillState::New(progress) => Some(( - *vnode, - None, - progress.build_row(*vnode, &self.consumed_pk_rows), - )), - VnodeBackfillState::Update { latest, committed } => Some(( - *vnode, - Some(committed.build_row(*vnode, &self.consumed_pk_rows)), - latest.build_row(*vnode, &self.consumed_pk_rows), - )), - VnodeBackfillState::Committed(_) => None, - }) + ) -> impl Iterator)> { + self.state_table.vnodes().iter_vnodes().map(|vnode| { + ( + vnode, + self.vnode_state + .get(&vnode) + .map(VnodeBackfillState::latest_progress), + ) + }) } - pub(super) fn mark_committed(&mut self) { + pub(super) async fn commit(&mut self, barrier_epoch: EpochPair) -> StreamExecutorResult<()> { + for (vnode, state) in &self.vnode_state { + match state { + VnodeBackfillState::New(progress) => { + self.state_table + .insert(progress.build_row(*vnode, &self.consumed_pk_rows)); + } + VnodeBackfillState::Update { latest, committed } => { + self.state_table.update( + committed.build_row(*vnode, &self.consumed_pk_rows), + latest.build_row(*vnode, &self.consumed_pk_rows), + ); + } + VnodeBackfillState::Committed(_) => {} + } + } + self.state_table.commit(barrier_epoch).await?; self.vnode_state .values_mut() - .for_each(VnodeBackfillState::mark_committed) + .for_each(VnodeBackfillState::mark_committed); + Ok(()) } - pub(super) fn update_vnode_bitmap( + pub(super) async fn update_vnode_bitmap( &mut self, - new_vnodes: impl IntoIterator, - ) { + new_vnode_bitmap: Arc, + barrier_epoch: EpochPair, + ) -> StreamExecutorResult<()> { + self.state_table + .try_wait_committed_epoch(barrier_epoch.prev) + .await?; + let (prev_vnode_bitmap, _) = self.state_table.update_vnode_bitmap(new_vnode_bitmap); + let committed_progress_rows = Self::load_vnode_progress_row(&self.state_table).await?; let mut new_state = HashMap::new(); - for (vnode, progress) in new_vnodes { - if let Some(prev_progress) = self.vnode_state.get(&vnode) { - let prev_progress = must_match!(prev_progress, VnodeBackfillState::Committed(prev_progress) => { - prev_progress - }); - assert_eq!(prev_progress, &progress); + for (vnode, progress_row) in committed_progress_rows { + if let Some(progress_row) = progress_row { + let progress = VnodeBackfillProgress::from_row(&progress_row, &self.pk_serde); + assert!(new_state + .insert(vnode, VnodeBackfillState::Committed(progress)) + .is_none()); + } + + if prev_vnode_bitmap.is_set(vnode.to_index()) { + // if the vnode exist previously, the new state should be the same as the previous one + assert_eq!(self.vnode_state.get(&vnode), new_state.get(&vnode)); } - assert!(new_state - .insert(vnode, VnodeBackfillState::Committed(progress)) - .is_none()); } self.vnode_state = new_state; + Ok(()) } } diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 818fa7c53fe99..22b5f1f8b3a6e 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -415,7 +415,7 @@ impl StreamActorManager { barrier_rx, self.streaming_metrics.clone(), node.snapshot_backfill_epoch, - )? + ) .boxed(); let info = Self::get_executor_info(