diff --git a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs index cc83b15265e9e..da4c00e7b61e6 100644 --- a/src/stream/src/executor/backfill/cdc/cdc_backfill.rs +++ b/src/stream/src/executor/backfill/cdc/cdc_backfill.rs @@ -198,13 +198,6 @@ impl CdcBackfillExecutor { .await?; reader }; - - // emit one barrier if any - if let Some(barrier) = upstream_barriers.next_pending_barrier().await? { - // commit state to bump the epoch of state table - state_impl.commit_state(barrier.epoch).await?; - yield Message::Barrier(barrier); - } match create_result { Ok(reader) => { table_reader = Some(reader); @@ -222,7 +215,14 @@ impl CdcBackfillExecutor { } } } + // emit one barrier if any + if let Some(barrier) = upstream_barriers.next_pending_barrier().await? { + // commit state to bump the epoch of state table + state_impl.commit_state(barrier.epoch).await?; + yield Message::Barrier(barrier); + } } + let upstream_table_reader = UpstreamTableReader::new( self.external_table.clone(), table_reader.expect("table reader must created"),