Skip to content

Commit

Permalink
perf: Recombine into larger morsels in new-streaming join (#21008)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Jan 30, 2025
1 parent 3c7c2c5 commit 4fc5929
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions crates/polars-stream/src/nodes/joins/equi_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,8 @@ impl ProbeState {
break;
}
} else {
let mut out_frames = Vec::new();
let mut out_len = 0;
for (p, idxs_in_p) in partitions.iter().zip(&partition_idxs) {
let mut offset = 0;
while offset < idxs_in_p.len() {
Expand All @@ -417,9 +419,13 @@ impl ProbeState {
&mut probe_match,
mark_matches,
emit_unmatched,
probe_limit,
probe_limit - out_len,
) as usize;

if table_match.is_empty() {
continue;
}

// Gather output and send.
let mut build_df = if emit_unmatched {
p.df.take_opt_chunked_unchecked(&table_match)
Expand All @@ -438,12 +444,30 @@ impl ProbeState {
};
let out_df = postprocess_join(out_df, params);

let out_morsel = Morsel::new(out_df, seq, src_token.clone());
if send.send(out_morsel).await.is_err() {
break;
out_len = out_len
.checked_add(out_df.height().try_into().unwrap())
.unwrap();
out_frames.push(out_df);

if out_len >= probe_limit {
out_len = 0;
let df =
accumulate_dataframes_vertical_unchecked(out_frames.drain(..));
let out_morsel = Morsel::new(df, seq, src_token.clone());
if send.send(out_morsel).await.is_err() {
break;
}
}
}
}

if out_len > 0 {
let df = accumulate_dataframes_vertical_unchecked(out_frames.drain(..));
let out_morsel = Morsel::new(df, seq, src_token.clone());
if send.send(out_morsel).await.is_err() {
break;
}
}
}
}

Expand Down

0 comments on commit 4fc5929

Please sign in to comment.