Skip to content

Commit

Permalink
wrap-up and add test
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 6, 2024
1 parent dbc0460 commit 773e61d
Show file tree
Hide file tree
Showing 10 changed files with 250 additions and 146 deletions.
13 changes: 5 additions & 8 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use polars_expr::{create_physical_expr, ExpressionConversionState};
use polars_io::RowIndex;
use polars_mem_engine::{create_physical_plan, Executor};
use polars_ops::frame::JoinCoalesce;
use polars_ops::prelude::InequalityOperator;
pub use polars_plan::frame::{AllowedOptimizations, OptFlags};
use polars_plan::global::FETCH_ROWS;
use polars_utils::pl_str::PlSmallStr;
Expand Down Expand Up @@ -2106,7 +2105,6 @@ impl JoinBuilder {
LazyFrame::from_logical_plan(lp, opt_state)
}


// Finish with join predicates
pub fn join_where(self, predicates: Vec<Expr>) -> LazyFrame {
let mut opt_state = self.lf.opt_state;
Expand All @@ -2126,11 +2124,11 @@ impl JoinBuilder {
coalesce: self.coalesce,
};
let options = JoinOptions {
allow_parallel: self.allow_parallel,
force_parallel: self.force_parallel,
args,
..Default::default()
};
allow_parallel: self.allow_parallel,
force_parallel: self.force_parallel,
args,
..Default::default()
};

let lp = DslPlan::Join {
input_left: Arc::new(self.lf.logical_plan),
Expand All @@ -2142,6 +2140,5 @@ impl JoinBuilder {
};

LazyFrame::from_logical_plan(lp, opt_state)

}
}
103 changes: 59 additions & 44 deletions crates/polars-ops/src/frame/join/iejoin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ use polars_core::chunked_array::ChunkedArray;
use polars_core::datatypes::{IdxCa, NumericNative, PolarsNumericType};
use polars_core::frame::DataFrame;
use polars_core::prelude::*;
use polars_core::utils::{_set_partition_size, split};
use polars_core::{with_match_physical_numeric_polars_type, POOL};
use polars_error::{polars_err, PolarsResult};
use polars_utils::binary_search::ExponentialSearch;
use polars_utils::itertools::Itertools;
use polars_utils::slice::GetSaferUnchecked;
use polars_utils::total_ord::{TotalEq};
use polars_utils::total_ord::TotalEq;
use polars_utils::IdxSize;
use rayon::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use polars_core::utils::{_set_partition_size, split};

use crate::frame::_finish_join;
use rayon::prelude::*;
use polars_utils::itertools::Itertools;

#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down Expand Up @@ -138,42 +139,46 @@ pub(super) fn iejoin_par(
suffix: Option<PlSmallStr>,
slice: Option<(i64, usize)>,
) -> PolarsResult<DataFrame> {
let l1_descending = matches!(options.operator1, InequalityOperator::Gt | InequalityOperator::GtEq);
let l1_descending = matches!(
options.operator1,
InequalityOperator::Gt | InequalityOperator::GtEq
);

let l1_sort_options = SortOptions::default()
.with_maintain_order(true)
.with_nulls_last(false)
.with_order_descending(l1_descending);

let sl = &selected_left[0];
let l1_s_l = sl.arg_sort(l1_sort_options)
.slice(
sl.null_count() as i64,
sl.len() - sl.null_count(),
);
let l1_s_l = sl
.arg_sort(l1_sort_options)
.slice(sl.null_count() as i64, sl.len() - sl.null_count());

let sr = &selected_right[0];
let l1_s_r = sr.arg_sort(l1_sort_options)
.slice(
sr.null_count() as i64,
sr.len() - sr.null_count(),
);
let l1_s_r = sr
.arg_sort(l1_sort_options)
.slice(sr.null_count() as i64, sr.len() - sr.null_count());

// Because we do a cartesian product, the number of partitions is squared.
// We take the sqrt, but we don't expect every partition to produce results and work can be
// imbalanced, so we multiply the number of partitions by 2, which leads to 2^2= 4
let n_partitions = (_set_partition_size() as f32).sqrt() as usize * 2;
let n_partitions = (_set_partition_size() as f32).sqrt() as usize * 2;
let splitted_a = split(&l1_s_l, n_partitions);
let splitted_b = split(&l1_s_r, n_partitions);

let cartesian_prod = splitted_a.iter()
.flat_map(|l| splitted_b.iter().map(move |r| (l, r))).collect::<Vec<_>>();
let cartesian_prod = splitted_a
.iter()
.flat_map(|l| splitted_b.iter().map(move |r| (l, r)))
.collect::<Vec<_>>();

let iter = cartesian_prod.par_iter().map(|(l_l1_idx, r_l1_idx)| {
if l_l1_idx.is_empty() || r_l1_idx.is_empty() {
return Ok(None)
return Ok(None);
}
fn get_extrema<'a>(l1_idx: &'a IdxCa, s: &'a Series) -> Option<(AnyValue<'a>, AnyValue<'a>)> {
fn get_extrema<'a>(
l1_idx: &'a IdxCa,
s: &'a Series,
) -> Option<(AnyValue<'a>, AnyValue<'a>)> {
let first = l1_idx.first()?;
let last = l1_idx.last()?;

Expand All @@ -186,62 +191,68 @@ pub(super) fn iejoin_par(
(end, start)
})
}
let Some((min_l, max_l)) = get_extrema(l_l1_idx, sl) else {return Ok(None)};
let Some((min_r, max_r)) = get_extrema(r_l1_idx, sr) else {return Ok(None)};
let Some((min_l, max_l)) = get_extrema(l_l1_idx, sl) else {
return Ok(None);
};
let Some((min_r, max_r)) = get_extrema(r_l1_idx, sr) else {
return Ok(None);
};

let include_block = match options.operator1 {
InequalityOperator::Lt => min_l < max_r,
InequalityOperator::LtEq => min_l <= max_r,
InequalityOperator::Gt => max_l > min_r,
InequalityOperator::GtEq => max_l >= min_r
InequalityOperator::GtEq => max_l >= min_r,
};

if include_block {
let (l, r) = unsafe {
(selected_left.iter().map(|s| s.take_unchecked(l_l1_idx)).collect_vec(),
selected_right.iter().map(|s| s.take_unchecked(r_l1_idx)).collect_vec())
(
selected_left
.iter()
.map(|s| s.take_unchecked(l_l1_idx))
.collect_vec(),
selected_right
.iter()
.map(|s| s.take_unchecked(r_l1_idx))
.collect_vec(),
)
};


// Compute the row indexes
let (idx_l, idx_r) = iejoin_tuples(l, r, options, None)?;

if idx_l.is_empty() {
return Ok(None)
return Ok(None);
}

// These are row indexes in the slices we have given, so we use those to gather in the
// original l1 offset arrays. This gives us indexes in the original tables.
unsafe {
Ok(Some((
l_l1_idx.take_unchecked(&idx_l),
r_l1_idx.take_unchecked(&idx_r)
r_l1_idx.take_unchecked(&idx_r),
)))
}
} else {
Ok(None)
}
});

let row_indices = POOL.install(|| {
iter.collect::<PolarsResult<Vec<_>>>()
})?;
let row_indices = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;

let mut left_idx = IdxCa::default();
let mut right_idx = IdxCa::default();
for opt in row_indices {
if let Some((l, r)) = opt {
left_idx.append(&l)?;
right_idx.append(&r)?;
}
for (l, r) in row_indices.into_iter().flatten() {
left_idx.append(&l)?;
right_idx.append(&r)?;
}
if let Some((offset, end)) = slice {
left_idx = left_idx.slice(offset, end);
right_idx = right_idx.slice(offset, end);
}

unsafe { materialize_join(left, right, &left_idx, &right_idx, suffix) }

}

pub(super) fn iejoin(
Expand All @@ -253,24 +264,28 @@ pub(super) fn iejoin(
suffix: Option<PlSmallStr>,
slice: Option<(i64, usize)>,
) -> PolarsResult<DataFrame> {

let (left_row_idx, right_row_idx)= iejoin_tuples(selected_left, selected_right, options, slice)?;
let (left_row_idx, right_row_idx) =
iejoin_tuples(selected_left, selected_right, options, slice)?;
unsafe { materialize_join(left, right, &left_row_idx, &right_row_idx, suffix) }
}

unsafe fn materialize_join(left: &DataFrame, right: &DataFrame, left_row_idx: &IdxCa, right_row_idx: &IdxCa, suffix: Option<PlSmallStr>) -> PolarsResult<DataFrame> {
unsafe fn materialize_join(
left: &DataFrame,
right: &DataFrame,
left_row_idx: &IdxCa,
right_row_idx: &IdxCa,
suffix: Option<PlSmallStr>,
) -> PolarsResult<DataFrame> {
let (join_left, join_right) = {
POOL.join(
|| left.take_unchecked(&left_row_idx),
|| right.take_unchecked(&right_row_idx),
|| left.take_unchecked(left_row_idx),
|| right.take_unchecked(right_row_idx),
)
};

_finish_join(join_left, join_right, suffix)

}


/// Inequality join. Matches rows between two DataFrames using two inequality operators
/// (one of [<, <=, >, >=]).
/// Based on Khayyat et al. 2015, "Lightning Fast and Space Efficient Inequality Joins"
Expand Down
34 changes: 2 additions & 32 deletions crates/polars-ops/src/frame/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ pub trait DataFrameJoinOps: IntoDf {
}

if let JoinType::IEJoin(options) = args.how {
let func = if POOL.current_num_threads() > 1 && !left_df.is_empty() && !other.is_empty() {
let func = if POOL.current_num_threads() > 1 && !left_df.is_empty() && !other.is_empty()
{
iejoin::iejoin_par
} else {
iejoin::iejoin
Expand Down Expand Up @@ -535,34 +536,3 @@ pub fn private_left_join_multiple_keys(
let b = prepare_keys_multiple(b.get_columns(), join_nulls)?.into_series();
sort_or_hash_left(&a, &b, false, JoinValidation::ManyToMany, join_nulls)
}

#[test]
fn test_foo() {
let west = df![
"t_id" => [0, 1, 2, 3, 4, 5],
"time" => [100, 140, 100, 80, 90, 90],
"cost" => [6, 11, 11, 10, 5, 5],
]
.unwrap();

let time = west.column("time").unwrap();
let cost = west.column("cost").unwrap();

let selected = vec![time.clone(), cost.clone()];

let out = west
._join_impl(
&west.clone(),
selected.clone(),
selected,
JoinArgs::new(JoinType::IEJoin(IEJoinOptions {
operator1: InequalityOperator::Gt,
operator2: InequalityOperator::LtEq,
})),
false,
false,
)
.unwrap();

dbg!(out);
}
21 changes: 15 additions & 6 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ macro_rules! failed_here {
format!("'{}' failed", stringify!($($t)*)).into()
}
}
pub(super) use failed_input;
pub(super) use failed_here;
pub(super) use failed_input_args;
pub(super) use {failed_here, failed_input, failed_input_args};

pub fn to_alp(
lp: DslPlan,
Expand Down Expand Up @@ -85,7 +83,11 @@ pub(super) struct DslConversionContext<'a> {
pub(super) opt_flags: &'a mut OptFlags,
}

pub(super) fn run_conversion(lp: IR, ctxt: &mut DslConversionContext, name: &str) -> PolarsResult<Node> {
pub(super) fn run_conversion(
lp: IR,
ctxt: &mut DslConversionContext,
name: &str,
) -> PolarsResult<Node> {
let lp_node = ctxt.lp_arena.add(lp);
ctxt.conversion_optimizer
.coerce_types(ctxt.expr_arena, ctxt.lp_arena, lp_node)
Expand All @@ -101,7 +103,6 @@ pub(super) fn run_conversion(lp: IR, ctxt: &mut DslConversionContext, name: &str
pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult<Node> {
let owned = Arc::unwrap_or_clone;


let v = match lp {
DslPlan::Scan {
paths,
Expand Down Expand Up @@ -544,7 +545,15 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
predicates,
options,
} => {
return join::resolve_join(input_left, input_right, left_on, right_on, predicates, options, ctxt)
return join::resolve_join(
input_left,
input_right,
left_on,
right_on,
predicates,
options,
ctxt,
)
},
DslPlan::HStack {
input,
Expand Down
Loading

0 comments on commit 773e61d

Please sign in to comment.