Skip to content

Commit

Permalink
feat(snapshot-backfill): introduce state to snapshot backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Jan 3, 2025
1 parent 972d570 commit 8b58adc
Show file tree
Hide file tree
Showing 9 changed files with 621 additions and 21 deletions.
7 changes: 6 additions & 1 deletion ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ else
RUNTIME_CLUSTER_PROFILE='ci-backfill-3cn-1fe-with-monitoring'
MINIO_RATE_LIMIT_CLUSTER_PROFILE='ci-backfill-3cn-1fe-with-monitoring-and-minio-rate-limit'
fi
export RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \
export RUST_LOG="info,risingwave_stream=info,risingwave_stream::executor::backfill=debug,risingwave_batch=info,risingwave_storage=info,risingwave_meta::barrier=debug" \

run_sql_file() {
psql -h localhost -p 4566 -d dev -U root -f "$@"
Expand Down Expand Up @@ -304,6 +304,11 @@ test_snapshot_backfill() {

wait

TEST_NAME=nexmark_q3 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/scale.slt' &
TEST_NAME=nexmark_q7 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/scale.slt' &

wait

TEST_NAME=nexmark_q3 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/drop_mv.slt' &
TEST_NAME=nexmark_q7 sqllogictest -p 4566 -d dev 'e2e_test/backfill/snapshot_backfill/drop_mv.slt' &

Expand Down
15 changes: 15 additions & 0 deletions e2e_test/backfill/snapshot_backfill/scale.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
control substitution on

statement ok
alter materialized view ${TEST_NAME}_mv set parallelism to 1;

sleep 3s

include ./check_data_equal.slt.part

statement ok
alter materialized view ${TEST_NAME}_mv set parallelism to 4;

sleep 3s

include ./check_data_equal.slt.part
64 changes: 52 additions & 12 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ impl StreamTableScan {

/// Build catalog for backfill state
///
/// When `is_snapshot_backfill` is `false`:
///
/// Schema: | vnode | pk ... | `backfill_finished` | `row_count` |
///
/// key: | vnode |
Expand Down Expand Up @@ -153,9 +155,18 @@ impl StreamTableScan {
/// the corresponding `no_shuffle_backfill`.
/// However this is not high priority, since we are working on supporting arrangement backfill,
/// which already has this capability.
///
///
/// When `is_snapshot_backfill` is true:
///
/// Schema: | vnode | `epoch` | `row_count` | `is_epoch_finished` | pk ...
///
/// key: | vnode |
/// value: | `epoch` | `row_count` | `is_epoch_finished` | pk ...
pub fn build_backfill_state_catalog(
&self,
state: &mut BuildFragmentGraphState,
is_snapshot_backfill: bool,
) -> TableCatalog {
let mut catalog_builder = TableCatalogBuilder::default();
let upstream_schema = &self.core.get_table_columns();
Expand All @@ -165,17 +176,34 @@ impl StreamTableScan {
catalog_builder.add_column(&Field::with_name(VirtualNode::RW_TYPE, "vnode"));
catalog_builder.add_order_column(0, OrderType::ascending());

// pk columns
for col_order in self.core.primary_key() {
let col = &upstream_schema[col_order.column_index];
catalog_builder.add_column(&Field::from(col));
}
if !is_snapshot_backfill {
// pk columns
for col_order in self.core.primary_key() {
let col = &upstream_schema[col_order.column_index];
catalog_builder.add_column(&Field::from(col));
}

// `backfill_finished` column
catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));
// `backfill_finished` column
catalog_builder.add_column(&Field::with_name(DataType::Boolean, "backfill_finished"));

// `row_count` column
catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
// `row_count` column
catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));
} else {
// `epoch` column
catalog_builder.add_column(&Field::with_name(DataType::Int64, "epoch"));

// `row_count` column
catalog_builder.add_column(&Field::with_name(DataType::Int64, "row_count"));

// `is_finished` column
catalog_builder.add_column(&Field::with_name(DataType::Boolean, "is_epoch_finished"));

// pk columns
for col_order in self.core.primary_key() {
let col = &upstream_schema[col_order.column_index];
catalog_builder.add_column(&Field::from(col));
}
}

// Reuse the state store pk (vnode) as the vnode as well.
catalog_builder.set_vnode_col_idx(0);
Expand Down Expand Up @@ -284,9 +312,21 @@ impl StreamTableScan {
column_ids: upstream_column_ids.clone(),
};

let catalog = self
.build_backfill_state_catalog(state)
.to_internal_table_prost();
let catalog = match self.stream_scan_type {
StreamScanType::SnapshotBackfill => self
.build_backfill_state_catalog(state, true)
.to_internal_table_prost(),
StreamScanType::Chain
| StreamScanType::Rearrange
| StreamScanType::Backfill
| StreamScanType::UpstreamOnly
| StreamScanType::ArrangementBackfill => self
.build_backfill_state_catalog(state, false)
.to_internal_table_prost(),
StreamScanType::Unspecified => {
unreachable!()
}
};

// For backfill, we first read pk + output_indices from upstream.
// On this, we need to further project `output_indices` to the downstream.
Expand Down
15 changes: 14 additions & 1 deletion src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ use risingwave_hummock_sdk::key::{
TableKey, TableKeyRange,
};
use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_pb::catalog::Table;
use risingwave_storage::error::{ErrorKind, StorageError};
use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
use risingwave_storage::hummock::CachePolicy;
use risingwave_storage::mem_table::MemTableError;
use risingwave_storage::row_serde::find_columns_by_ids;
Expand All @@ -55,6 +56,7 @@ use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::{
InitOptions, LocalStateStore, NewLocalOptions, OpConsistencyLevel, PrefetchOptions,
ReadLogOptions, ReadOptions, SealCurrentEpochOptions, StateStoreIter, StateStoreIterExt,
TryWaitEpochOptions,
};
use risingwave_storage::table::merge_sort::merge_sort;
use risingwave_storage::table::{
Expand Down Expand Up @@ -183,6 +185,17 @@ where
Ok(())
}

pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
self.store
.try_wait_epoch(
HummockReadEpoch::Committed(prev_epoch),
TryWaitEpochOptions {
table_id: self.table_id,
},
)
.await
}

pub fn state_store(&self) -> &S {
&self.store
}
Expand Down
Loading

0 comments on commit 8b58adc

Please sign in to comment.