Skip to content

Commit

Permalink
remove boxed trait
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 4, 2024
1 parent f917cc0 commit bccaba6
Showing 1 changed file with 104 additions and 81 deletions.
185 changes: 104 additions & 81 deletions crates/polars-ops/src/frame/join/iejoin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,82 +38,17 @@ pub struct IEJoinOptions {
pub operator2: InequalityOperator,
}

/// Inequality join. Matches rows between two DataFrames using two inequality operators
/// (one of [<, <=, >, >=]).
/// Based on Khayyat et al. 2015, "Lightning Fast and Space Efficient Inequality Joins"
/// and extended to work with duplicate values.
pub fn iejoin(
left: &DataFrame,
right: &DataFrame,
selected_left: Vec<Series>,
selected_right: Vec<Series>,
options: &IEJoinOptions,
suffix: Option<PlSmallStr>,
#[allow(clippy::too_many_arguments)]
fn ie_join_impl_t<T: PolarsNumericType>(
slice: Option<(i64, usize)>,
) -> PolarsResult<DataFrame> {
if selected_left.len() != 2 {
return Err(
polars_err!(ComputeError: "IEJoin requires exactly two expressions from the left DataFrame"),
);
};
if selected_right.len() != 2 {
return Err(
polars_err!(ComputeError: "IEJoin requires exactly two expressions from the right DataFrame"),
);
};

let op1 = options.operator1;
let op2 = options.operator2;

let mut x = selected_left[0].to_physical_repr().into_owned();
x.extend(&selected_right[0].to_physical_repr())?;
// Rechunk because we will gather.
let x = x.rechunk();

let mut y = selected_left[1].to_physical_repr().into_owned();
y.extend(&selected_right[1].to_physical_repr())?;
// Rechunk because we will gather.
let y = y.rechunk();

// Determine the sort order based on the comparison operators used.
// We want to sort L1 so that "x[i] op1 x[j]" is true for j > i,
// and L2 so that "y[i] op2 y[j]" is true for j < i
// (except in the case of duplicates and strict inequalities).
// Note that the algorithms published in Khayyat et al. have incorrect logic for
// determining whether to sort descending.
let l1_descending = matches!(op1, InequalityOperator::Gt | InequalityOperator::GtEq);
let l2_descending = matches!(op2, InequalityOperator::Lt | InequalityOperator::LtEq);

let l1_sort_options = SortOptions::default()
.with_maintain_order(true)
.with_nulls_last(false)
.with_order_descending(l1_descending);
// Get ordering of x, skipping any null entries as these cannot be matches
let l1_order = x
.arg_sort(l1_sort_options)
.slice(x.null_count() as i64, x.len() - x.null_count());

let l1_array = with_match_physical_numeric_polars_type!(x.dtype(), |$T| {
let ca: &ChunkedArray<$T> = x.as_ref().as_ref().as_ref();
build_l1_array(ca, &l1_order, left.height() as IdxSize)
})?;

let y_ordered = unsafe { y.take_unchecked(&l1_order) };
let l2_sort_options = SortOptions::default()
.with_maintain_order(true)
.with_nulls_last(false)
.with_order_descending(l2_descending);
// Get the indexes into l1, ordered by y values.
// l2_order is the same as "p" from Khayyat et al.
let l2_order = y_ordered
.arg_sort(l2_sort_options)
.slice(
y_ordered.null_count() as i64,
y_ordered.len() - y_ordered.null_count(),
)
.rechunk();
let l2_order = l2_order.downcast_get(0).unwrap().values().as_slice();

l1_order: IdxCa,
l2_order: &[IdxSize],
op1: InequalityOperator,
op2: InequalityOperator,
x: Series,
y_ordered: Series,
left_height: usize,
) -> PolarsResult<(Vec<IdxSize>, Vec<IdxSize>)> {
// Create a bit array with order corresponding to L1,
// denoting which entries have been visited while traversing L2.
let mut bit_array = FilteredBitArray::from_len_zeroed(l1_order.len());
Expand All @@ -127,6 +62,9 @@ pub fn iejoin(
};
let mut match_count = 0;

let ca: &ChunkedArray<T> = x.as_ref().as_ref();
let l1_array = build_l1_array(ca, &l1_order, left_height as IdxSize)?;

if op2.is_strict() {
// For strict inequalities, we rely on using a stable sort of l2 so that
// p values only increase as we traverse a run of equal y values.
Expand All @@ -149,10 +87,9 @@ pub fn iejoin(
}
}
} else {
let l2_array = with_match_physical_numeric_polars_type!(y.dtype(), |$T| {
let ca: &ChunkedArray<$T> = y_ordered.as_ref().as_ref().as_ref();
build_l2_array(ca, &l2_order)
})?;
let ca: &ChunkedArray<T> = y_ordered.as_ref().as_ref();
let l2_array = build_l2_array(ca, l2_order)?;

// For non-strict inequalities in l2, we need to track runs of equal y values and only
// check for matches after we reach the end of the run and have marked all rhs entries
// in the run as visited.
Expand Down Expand Up @@ -186,6 +123,92 @@ pub fn iejoin(
}
}
}
Ok((left_row_idx, right_row_idx))
}

/// Inequality join. Matches rows between two DataFrames using two inequality operators
/// (one of [<, <=, >, >=]).
/// Based on Khayyat et al. 2015, "Lightning Fast and Space Efficient Inequality Joins"
/// and extended to work with duplicate values.
pub fn iejoin(
left: &DataFrame,
right: &DataFrame,
selected_left: Vec<Series>,
selected_right: Vec<Series>,
options: &IEJoinOptions,
suffix: Option<PlSmallStr>,
slice: Option<(i64, usize)>,
) -> PolarsResult<DataFrame> {
if selected_left.len() != 2 {
return Err(
polars_err!(ComputeError: "IEJoin requires exactly two expressions from the left DataFrame"),
);
};
if selected_right.len() != 2 {
return Err(
polars_err!(ComputeError: "IEJoin requires exactly two expressions from the right DataFrame"),
);
};

let op1 = options.operator1;
let op2 = options.operator2;

// Determine the sort order based on the comparison operators used.
// We want to sort L1 so that "x[i] op1 x[j]" is true for j > i,
// and L2 so that "y[i] op2 y[j]" is true for j < i
// (except in the case of duplicates and strict inequalities).
// Note that the algorithms published in Khayyat et al. have incorrect logic for
// determining whether to sort descending.
let l1_descending = matches!(op1, InequalityOperator::Gt | InequalityOperator::GtEq);
let l2_descending = matches!(op2, InequalityOperator::Lt | InequalityOperator::LtEq);

let mut x = selected_left[0].to_physical_repr().into_owned();
x.extend(&selected_right[0].to_physical_repr())?;
// Rechunk because we will gather.
let x = x.rechunk();

let mut y = selected_left[1].to_physical_repr().into_owned();
y.extend(&selected_right[1].to_physical_repr())?;
// Rechunk because we will gather.
let y = y.rechunk();

let l1_sort_options = SortOptions::default()
.with_maintain_order(true)
.with_nulls_last(false)
.with_order_descending(l1_descending);
// Get ordering of x, skipping any null entries as these cannot be matches
let l1_order = x
.arg_sort(l1_sort_options)
.slice(x.null_count() as i64, x.len() - x.null_count());

let y_ordered = unsafe { y.take_unchecked(&l1_order) };
let l2_sort_options = SortOptions::default()
.with_maintain_order(true)
.with_nulls_last(false)
.with_order_descending(l2_descending);
// Get the indexes into l1, ordered by y values.
// l2_order is the same as "p" from Khayyat et al.
let l2_order = y_ordered
.arg_sort(l2_sort_options)
.slice(
y_ordered.null_count() as i64,
y_ordered.len() - y_ordered.null_count(),
)
.rechunk();
let l2_order = l2_order.downcast_get(0).unwrap().values().as_slice();

let (left_row_idx, right_row_idx) = with_match_physical_numeric_polars_type!(x.dtype(), |$T| {
ie_join_impl_t::<$T>(
slice,
l1_order,
l2_order,
op1,
op2,
x,
y_ordered,
left.height()
)
})?;

debug_assert_eq!(left_row_idx.len(), right_row_idx.len());
let left_row_idx = IdxCa::from_vec("".into(), left_row_idx);
Expand Down Expand Up @@ -386,7 +409,7 @@ fn build_l1_array<T>(
ca: &ChunkedArray<T>,
order: &IdxCa,
right_df_offset: IdxSize,
) -> PolarsResult<Box<dyn L1Array>>
) -> PolarsResult<Vec<L1Item<T::Native>>>
where
T: PolarsNumericType,
{
Expand All @@ -413,7 +436,7 @@ where
}
}

Ok(Box::new(array))
Ok(array)
}

/// Create a vector of L2 items from the array of y values ordered according to the L1 order,
Expand Down

0 comments on commit bccaba6

Please sign in to comment.