Skip to content

Commit

Permalink
refactor(rust): Spawn threads on our rayon pool in new-streaming (#21012
Browse files Browse the repository at this point in the history
)
  • Loading branch information
orlp authored Jan 30, 2025
1 parent 4fc5929 commit ea1ea5a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 63 deletions.
129 changes: 67 additions & 62 deletions crates/polars-stream/src/nodes/joins/equi_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use polars_core::prelude::*;
use polars_core::schema::{Schema, SchemaExt};
use polars_core::series::IsSorted;
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use polars_core::POOL;
use polars_expr::chunked_idx_table::{new_chunked_idx_table, ChunkedIdxTable};
use polars_expr::hash_keys::HashKeys;
use polars_ops::frame::{JoinArgs, JoinType, MaintainOrderJoin};
Expand Down Expand Up @@ -214,51 +215,37 @@ impl BuildState {
}
}

let track_unmatchable = params.emit_unmatched_build();
let table_per_partition: Vec<_> = results_per_partition
.into_par_iter()
.with_max_len(1)
.map(|results| {
// Estimate sizes and cardinality.
let mut sketch = CardinalitySketch::new();
let mut num_frames = 0;
for result in &results {
sketch.combine(result.sketch.as_ref().unwrap());
num_frames += result.frames.len();
}

// Build table for this partition.
let mut combined_frames = Vec::with_capacity(num_frames);
let mut chunk_seq_ids = Vec::with_capacity(num_frames);
let mut table = table.new_empty();
table.reserve(sketch.estimate() * 5 / 4);
if params.preserve_order_build {
let mut combined = Vec::with_capacity(num_frames);
for result in results {
for (hash_keys, (seq, frame)) in
result.hash_keys.into_iter().zip(result.frames)
{
combined.push((seq, hash_keys, frame));
}
POOL.install(|| {
let track_unmatchable = params.emit_unmatched_build();
let table_per_partition: Vec<_> = results_per_partition
.into_par_iter()
.with_max_len(1)
.map(|results| {
// Estimate sizes and cardinality.
let mut sketch = CardinalitySketch::new();
let mut num_frames = 0;
for result in &results {
sketch.combine(result.sketch.as_ref().unwrap());
num_frames += result.frames.len();
}

combined.sort_unstable_by_key(|c| c.0);
for (seq, hash_keys, frame) in combined {
// Zero-sized chunks can get deleted, so skip entirely to avoid messing
// up the chunk counter.
if frame.height() == 0 {
continue;
// Build table for this partition.
let mut combined_frames = Vec::with_capacity(num_frames);
let mut chunk_seq_ids = Vec::with_capacity(num_frames);
let mut table = table.new_empty();
table.reserve(sketch.estimate() * 5 / 4);
if params.preserve_order_build {
let mut combined = Vec::with_capacity(num_frames);
for result in results {
for (hash_keys, (seq, frame)) in
result.hash_keys.into_iter().zip(result.frames)
{
combined.push((seq, hash_keys, frame));
}
}

table.insert_key_chunk(hash_keys, track_unmatchable);
combined_frames.push(frame);
chunk_seq_ids.push(seq);
}
} else {
for result in results {
for (hash_keys, (_, frame)) in
result.hash_keys.into_iter().zip(result.frames)
{
combined.sort_unstable_by_key(|c| c.0);
for (seq, hash_keys, frame) in combined {
// Zero-sized chunks can get deleted, so skip entirely to avoid messing
// up the chunk counter.
if frame.height() == 0 {
Expand All @@ -267,31 +254,47 @@ impl BuildState {

table.insert_key_chunk(hash_keys, track_unmatchable);
combined_frames.push(frame);
chunk_seq_ids.push(seq);
}
} else {
for result in results {
for (hash_keys, (_, frame)) in
result.hash_keys.into_iter().zip(result.frames)
{
// Zero-sized chunks can get deleted, so skip entirely to avoid messing
// up the chunk counter.
if frame.height() == 0 {
continue;
}

table.insert_key_chunk(hash_keys, track_unmatchable);
combined_frames.push(frame);
}
}
}
}

let df = if combined_frames.is_empty() {
if params.left_is_build {
DataFrame::empty_with_schema(&params.left_payload_schema)
let df = if combined_frames.is_empty() {
if params.left_is_build {
DataFrame::empty_with_schema(&params.left_payload_schema)
} else {
DataFrame::empty_with_schema(&params.right_payload_schema)
}
} else {
DataFrame::empty_with_schema(&params.right_payload_schema)
accumulate_dataframes_vertical_unchecked(combined_frames)
};
ProbeTable {
table,
df,
chunk_seq_ids,
}
} else {
accumulate_dataframes_vertical_unchecked(combined_frames)
};
ProbeTable {
table,
df,
chunk_seq_ids,
}
})
.collect();
})
.collect();

ProbeState {
table_per_partition,
max_seq_sent: MorselSeq::default(),
}
ProbeState {
table_per_partition,
max_seq_sent: MorselSeq::default(),
}
})
}
}

Expand Down Expand Up @@ -542,8 +545,10 @@ impl ProbeState {

impl Drop for ProbeState {
fn drop(&mut self) {
// Parallel drop as the state might be quite big.
self.table_per_partition.par_drain(..).for_each(drop);
POOL.install(|| {
// Parallel drop as the state might be quite big.
self.table_per_partition.par_drain(..).for_each(drop);
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-stream/src/utils/in_memory_linearize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub fn linearize(mut morsels_per_pipe: Vec<Vec<Morsel>>) -> Vec<DataFrame> {
let morsels_per_p = &morsels_per_pipe;
let mut dataframes: Vec<DataFrame> = Vec::with_capacity(num_morsels);
let dataframes_ptr = unsafe { SyncPtr::new(dataframes.as_mut_ptr()) };
rayon::scope(|s| {
POOL.scope(|s| {
let mut out_offset = 0;
let mut stop_idx_per_pipe = vec![0; morsels_per_p.len()];
for t in 0..n_threads {
Expand Down

0 comments on commit ea1ea5a

Please sign in to comment.