Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Dec 25, 2024
1 parent a087368 commit 353a5c9
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 246 deletions.
203 changes: 48 additions & 155 deletions src/stream/src/executor/backfill/snapshot_backfill/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::sync::Arc;
use anyhow::anyhow;
use futures::future::{try_join_all, Either};
use futures::{pin_mut, Stream, TryFutureExt, TryStreamExt};
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::hash::VnodeBitmapExt;
use risingwave_common::metrics::LabelGuardedIntCounter;
Expand Down Expand Up @@ -90,7 +89,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
barrier_rx: UnboundedReceiver<Barrier>,
metrics: Arc<StreamingMetrics>,
snapshot_epoch: Option<u64>,
) -> StreamExecutorResult<Self> {
) -> Self {
assert_eq!(&upstream.info.schema, upstream_table.schema());
let Some(pk_in_output_indices) = upstream_table.pk_in_output_indices() else {
panic!(
Expand All @@ -106,11 +105,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
"create snapshot backfill executor with rate limit"
);
}
VnodeBackfillProgress::validate_progress_table_schema(
progress_state_table.get_data_types(),
upstream_table.pk_serializer().get_data_types(),
)?;
Ok(Self {
Self {
upstream_table,
pk_in_output_indices,
progress_state_table,
Expand All @@ -123,12 +118,11 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
actor_ctx,
metrics,
snapshot_epoch,
})
}
}

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
let table_id = self.upstream_table.table_id();
debug!("snapshot backfill executor start");
let first_upstream_barrier = expect_first_barrier(&mut self.upstream).await?;
debug!(epoch = ?first_upstream_barrier.epoch, "get first upstream barrier");
Expand Down Expand Up @@ -156,15 +150,18 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
};
let first_recv_barrier_epoch = first_recv_barrier.epoch;
yield Message::Barrier(first_recv_barrier);
self.progress_state_table
.init_epoch(first_recv_barrier_epoch)
.await?;
let mut backfill_state = BackfillState::new(
self.progress_state_table,
first_recv_barrier_epoch,
self.upstream_table.pk_serializer().clone(),
)
.await?;

let (mut barrier_epoch, mut need_report_finish, mut backfill_state) = {
let (mut barrier_epoch, mut need_report_finish) = {
if should_backfill {
let mut backfill_state =
BackfillState::new([], self.upstream_table.pk_serializer().clone());
assert!(backfill_state.latest_progress().next().is_none());
assert!(backfill_state
.latest_progress()
.all(|(_, progress)| progress.is_none()));
let table_id_str = format!("{}", self.upstream_table.table_id().table_id);
let actor_id_str = format!("{}", self.actor_ctx.id);

Expand Down Expand Up @@ -198,7 +195,6 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
&mut self.barrier_rx,
&mut self.progress,
&mut backfill_state,
&mut self.progress_state_table,
first_recv_barrier_epoch,
);

Expand All @@ -217,7 +213,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {

let recv_barrier = self.barrier_rx.recv().await.expect("should exist");
assert_eq!(first_upstream_barrier.epoch, recv_barrier.epoch);
self.progress_state_table.commit(recv_barrier.epoch).await?;
backfill_state.commit(recv_barrier.epoch).await?;
yield Message::Barrier(recv_barrier);
}

Expand Down Expand Up @@ -287,16 +283,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
self.upstream_table.vnodes().iter_vnodes(),
barrier.epoch.prev,
);
for (_vnode, old_row, new_row) in backfill_state.uncommitted_state() {
let record = if let Some(old_row) = old_row {
Record::Update { old_row, new_row }
} else {
Record::Insert { new_row }
};
self.progress_state_table.write_record(record);
}
self.progress_state_table.commit(barrier.epoch).await?;
backfill_state.mark_committed();
backfill_state.commit(barrier.epoch).await?;
let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
yield Message::Barrier(barrier);
if update_vnode_bitmap.is_some() {
Expand All @@ -307,44 +294,28 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
}
}
}

info!(
?barrier_epoch,
table_id = self.upstream_table.table_id().table_id,
"finish consuming log store"
);
(barrier_epoch, true, backfill_state)
(barrier_epoch, true)
} else {
let pk_serializer = self.upstream_table.pk_serializer();
let progress = try_join_all(self.progress_state_table.vnodes().iter_vnodes().map(
|vnode| {
self.progress_state_table
.get_row([vnode.to_datum()])
.map_ok(move |progress_row| {
let progress = VnodeBackfillProgress::from_row(
&progress_row.expect(
"should exist because all vnode have same epoch progress",
),
pk_serializer,
);
let expected_progress = VnodeBackfillProgress {
epoch: first_upstream_barrier.epoch.prev,
progress: EpochBackfillProgress::Consumed,
};
assert_eq!(progress, expected_progress, "vnode: {:?}", vnode);
(vnode, progress)
})
},
))
.await?;
let backfill_state =
BackfillState::new(progress, self.upstream_table.pk_serializer().clone());
backfill_state
.latest_progress()
.for_each(|(vnode, progress)| {
let expected_progress = VnodeBackfillProgress {
epoch: first_upstream_barrier.epoch.prev,
progress: EpochBackfillProgress::Consumed,
};
assert_eq!(progress, Some(&expected_progress), "vnode: {:?}", vnode);
});
info!(
table_id = self.upstream_table.table_id().table_id,
"skip backfill"
);
assert_eq!(first_upstream_barrier.epoch, first_recv_barrier_epoch);
(first_upstream_barrier.epoch, false, backfill_state)
(first_upstream_barrier.epoch, false)
}
};
let mut upstream = self.upstream.into_executor(self.barrier_rx).execute();
Expand All @@ -358,93 +329,34 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
barrier.epoch.prev,
);
let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id);
let is_scale =
update_vnode_bitmap.is_some() || barrier.is_stop(self.actor_ctx.id);
if is_scale {
use itertools::Itertools;
info!(?table_id, vnodes = ?self.upstream_table.vnodes().iter_vnodes().collect_vec(), prev_epoch = barrier.epoch.prev, "write finish epoch");
}
barrier_epoch = barrier.epoch;
if need_report_finish {
need_report_finish = false;
self.progress.finish_consuming_log_store(barrier_epoch);
}
for (vnode, old_row, new_row) in backfill_state.uncommitted_state() {
let record = if let Some(old_row) = old_row {
Record::Update { old_row, new_row }
} else {
Record::Insert { new_row }
};
if is_scale {
info!(
?table_id,
?vnode,
?record,
"write record before update vnode bitmap"
);
}
self.progress_state_table.write_record(record);
}
self.progress_state_table.commit(barrier.epoch).await?;
backfill_state.mark_committed();
backfill_state.commit(barrier.epoch).await?;
yield Message::Barrier(barrier);
if let Some(new_vnode_bitmap) = update_vnode_bitmap {
yield Message::Barrier(barrier);
self.progress_state_table
.try_wait_committed_epoch(barrier_epoch.prev)
.await?;
let _prev_vnode_bitmap = self
.upstream_table
.update_vnode_bitmap(new_vnode_bitmap.clone());
let (_, _) = self
.progress_state_table
.update_vnode_bitmap(new_vnode_bitmap);
let pk_serializer = self.upstream_table.pk_serializer();
// try_join_all((0..256).map(
// |vnode| {
// let vnode = VirtualNode::from_index(vnode);
// // TODO: remove the clone
// let pk_serializer = pk_serializer.clone();
// self.progress_state_table
// .get_row([vnode.to_datum()])
// .map_ok(move |progress_row| {
// let progress = VnodeBackfillProgress::from_row(
// &progress_row.expect("should exist because all vnode have same epoch progress"),
// &pk_serializer,
// );
// let expected_progress = VnodeBackfillProgress {
// epoch: barrier_epoch.prev,
// progress: EpochBackfillProgress::Consumed,
// };
// assert_eq!(progress, expected_progress, "{:?}", vnode);
// })
// },
// ))
// .await?;
let progress =
try_join_all(self.progress_state_table.vnodes().iter_vnodes().map(
|vnode| {
// TODO: remove the clone
let pk_serializer = pk_serializer.clone();
self.progress_state_table
.get_row([vnode.to_datum()])
.map_ok(move |progress_row| {
let progress = VnodeBackfillProgress::from_row(
&progress_row.expect("should exist because all vnode have same epoch progress"),
&pk_serializer,
);
let expected_progress = VnodeBackfillProgress {
epoch: barrier_epoch.prev,
progress: EpochBackfillProgress::Consumed,
};
assert_eq!(progress, expected_progress, "{:?}", vnode);
(vnode, progress)
})
},
))
backfill_state
.update_vnode_bitmap(new_vnode_bitmap, barrier_epoch)
.await?;
backfill_state.update_vnode_bitmap(progress);
} else {
yield Message::Barrier(barrier);
let expected_progress = VnodeBackfillProgress {
epoch: barrier_epoch.prev,
progress: EpochBackfillProgress::Consumed,
};
backfill_state
.latest_progress()
.for_each(|(vnode, progress)| {
assert_eq!(
progress,
Some(&expected_progress),
"vnode {:?} has unexpected progress",
vnode
);
});
}
}
msg => {
Expand Down Expand Up @@ -712,8 +624,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
rate_limit: Option<usize>,
barrier_rx: &'a mut UnboundedReceiver<Barrier>,
progress: &'a mut CreateMviewProgressReporter,
backfill_state: &'a mut BackfillState,
progress_state_table: &'a mut StateTable<S>,
backfill_state: &'a mut BackfillState<S>,
first_recv_barrier_epoch: EpochPair,
) {
let mut barrier_epoch = first_recv_barrier_epoch;
Expand Down Expand Up @@ -778,16 +689,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
}
})
.await?;
for (_vnode, old_row, new_row) in backfill_state.uncommitted_state() {
let record = if let Some(old_row) = old_row {
Record::Update { old_row, new_row }
} else {
Record::Insert { new_row }
};
progress_state_table.write_record(record);
}
progress_state_table.commit(barrier.epoch).await?;
backfill_state.mark_committed();
backfill_state.commit(barrier.epoch).await?;
debug!(?barrier_epoch, count, epoch_row_count, "update progress");
progress.update(barrier_epoch, barrier_epoch.prev, count as _);
epoch_row_count = 0;
Expand Down Expand Up @@ -816,16 +718,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
backfill_state.finish_epoch([vnode], snapshot_epoch);
})
.await?;
for (_vnode, old_row, new_row) in backfill_state.uncommitted_state() {
let record = if let Some(old_row) = old_row {
Record::Update { old_row, new_row }
} else {
Record::Insert { new_row }
};
progress_state_table.write_record(record);
}
progress_state_table.commit(barrier_epoch).await?;
backfill_state.mark_committed();
backfill_state.commit(barrier_epoch).await?;
progress.finish(barrier_epoch, count as _);
yield Message::Barrier(barrier_to_report_finish);

Expand All @@ -834,7 +727,7 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>(
let barrier = receive_next_barrier(barrier_rx).await?;
assert_eq!(barrier.epoch.prev, barrier_epoch.curr);
barrier_epoch = barrier.epoch;
progress_state_table.commit(barrier.epoch).await?;
backfill_state.commit(barrier.epoch).await?;
yield Message::Barrier(barrier);
if barrier_epoch.curr == snapshot_epoch {
break;
Expand Down
Loading

0 comments on commit 353a5c9

Please sign in to comment.