Skip to content

Commit

Permalink
minor
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Nov 20, 2024
1 parent e499aef commit da225d5
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,6 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
.await?;
reader
};

// emit one barrier if any
if let Some(barrier) = upstream_barriers.next_pending_barrier().await? {
// commit state to bump the epoch of state table
state_impl.commit_state(barrier.epoch).await?;
yield Message::Barrier(barrier);
}
match create_result {
Ok(reader) => {
table_reader = Some(reader);
Expand All @@ -222,7 +215,14 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
}
}
}
// emit one barrier if any
if let Some(barrier) = upstream_barriers.next_pending_barrier().await? {
// commit state to bump the epoch of state table
state_impl.commit_state(barrier.epoch).await?;
yield Message::Barrier(barrier);
}
}

let upstream_table_reader = UpstreamTableReader::new(
self.external_table.clone(),
table_reader.expect("table reader must created"),
Expand Down

0 comments on commit da225d5

Please sign in to comment.