Skip to content

Commit

Permalink
storage: use CapabilityRefs directly
Browse files Browse the repository at this point in the history
After TimelyDataflow/timely-dataflow#429 holding
onto CapabilityRefs across await points is safe

Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
  • Loading branch information
petrosagg committed Sep 12, 2022
1 parent fcaf3c0 commit 66af74c
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 12 deletions.
11 changes: 2 additions & 9 deletions src/storage/src/source/persist_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,6 @@ where
let (mut update_output, update_output_stream) = fetcher_builder.new_output();
let (mut consumed_part_output, consumed_part_output_stream) = fetcher_builder.new_output();

let update_output_port = update_output_stream.name().port;
let consumed_part_port = consumed_part_output_stream.name().port;

// Re-used state for processing and building rows.
let mut datum_vec = mz_repr::DatumVec::new();
let mut row_builder = Row::default();
Expand Down Expand Up @@ -305,12 +302,8 @@ where
// panicking, so swap them to an owned version.
data.swap(&mut buffer);

let update_cap = cap.delayed_for_output(cap.time(), update_output_port);
let mut update_session = output_handle.session(&update_cap);

let consumed_part_cap = cap.delayed_for_output(cap.time(), consumed_part_port);
let mut consumed_part_session =
consumed_part_output_handle.session(&consumed_part_cap);
let mut update_session = output_handle.session(&cap);
let mut consumed_part_session = consumed_part_output_handle.session(&cap);

for (_idx, part) in buffer.drain(..) {
let (consumed_part, fetched_part) =
Expand Down
4 changes: 1 addition & 3 deletions src/storage/src/source/source_reader_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,6 @@ where
let (mut batch_output, batch_stream) = demux_op.new_output();
let (mut summary_output, summary_stream) = demux_op.new_output();
let (_feedback_output, feedback_stream) = demux_op.new_output();
let summary_output_port = summary_stream.name().port;

demux_op.build(move |_caps| {
let mut buffer = Vec::new();
Expand All @@ -551,8 +550,7 @@ where
let mut session = batch_output.session(&cap);
session.give(message_batch);

let summary_cap = cap.delayed_for_output(cap.time(), summary_output_port);
let mut session = summary_output.session(&summary_cap);
let mut session = summary_output.session(&cap);
session.give(source_upper);
}
});
Expand Down

0 comments on commit 66af74c

Please sign in to comment.