diff --git a/src/storage/src/source/persist_source.rs b/src/storage/src/source/persist_source.rs index d20617d5e96bf..b5b84a5cca6e5 100644 --- a/src/storage/src/source/persist_source.rs +++ b/src/storage/src/source/persist_source.rs @@ -266,9 +266,6 @@ where let (mut update_output, update_output_stream) = fetcher_builder.new_output(); let (mut consumed_part_output, consumed_part_output_stream) = fetcher_builder.new_output(); - let update_output_port = update_output_stream.name().port; - let consumed_part_port = consumed_part_output_stream.name().port; - // Re-used state for processing and building rows. let mut datum_vec = mz_repr::DatumVec::new(); let mut row_builder = Row::default(); @@ -305,12 +302,8 @@ where // panicking, so swap them to an owned version. data.swap(&mut buffer); - let update_cap = cap.delayed_for_output(cap.time(), update_output_port); - let mut update_session = output_handle.session(&update_cap); - - let consumed_part_cap = cap.delayed_for_output(cap.time(), consumed_part_port); - let mut consumed_part_session = - consumed_part_output_handle.session(&consumed_part_cap); + let mut update_session = output_handle.session(&cap); + let mut consumed_part_session = consumed_part_output_handle.session(&cap); for (_idx, part) in buffer.drain(..) { let (consumed_part, fetched_part) =