Skip to content

Commit

Permalink
move join function resolving to module
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 6, 2024
1 parent 23bc80a commit c1da0c6
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 90 deletions.
103 changes: 13 additions & 90 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ macro_rules! failed_here {
format!("'{}' failed", stringify!($($t)*)).into()
}
}
pub(super) use failed_input;
pub(super) use failed_here;
pub(super) use failed_input_args;

pub fn to_alp(
lp: DslPlan,
Expand All @@ -65,7 +68,7 @@ pub fn to_alp(
opt_flags.contains(OptFlags::TYPE_COERCION),
);

let mut ctxt = ConversionContext {
let mut ctxt = DslConversionContext {
expr_arena,
lp_arena,
conversion_optimizer,
Expand All @@ -75,21 +78,21 @@ pub fn to_alp(
to_alp_impl(lp, &mut ctxt)
}

struct ConversionContext<'a> {
expr_arena: &'a mut Arena<AExpr>,
lp_arena: &'a mut Arena<IR>,
conversion_optimizer: ConversionOptimizer,
opt_flags: &'a mut OptFlags,
pub(super) struct DslConversionContext<'a> {
pub(super) expr_arena: &'a mut Arena<AExpr>,
pub(super) lp_arena: &'a mut Arena<IR>,
pub(super) conversion_optimizer: ConversionOptimizer,
pub(super) opt_flags: &'a mut OptFlags,
}

/// converts LogicalPlan to IR
/// it adds expressions & lps to the respective arenas as it traverses the plan
/// finally it returns the top node of the logical plan
#[recursive]
pub fn to_alp_impl(lp: DslPlan, ctxt: &mut ConversionContext) -> PolarsResult<Node> {
pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult<Node> {
let owned = Arc::unwrap_or_clone;

fn run_conversion(lp: IR, ctxt: &mut ConversionContext, name: &str) -> PolarsResult<Node> {
fn run_conversion(lp: IR, ctxt: &mut DslConversionContext, name: &str) -> PolarsResult<Node> {
let lp_node = ctxt.lp_arena.add(lp);
ctxt.conversion_optimizer
.coerce_types(ctxt.expr_arena, ctxt.lp_arena, lp_node)
Expand Down Expand Up @@ -540,88 +543,8 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut ConversionContext) -> PolarsResult<No
predicates,
mut options,
} => {
if matches!(options.args.how, JoinType::Cross) {
polars_ensure!(left_on.len() + right_on.len() == 0, InvalidOperation: "a 'cross' join doesn't expect any join keys");
} else {
let mut turn_off_coalesce = false;
for e in left_on.iter().chain(right_on.iter()) {
if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) {
polars_bail!(
ComputeError:
"'alias' is not allowed in a join key, use 'with_columns' first",
)
}
// Any expression that is not a simple column expression will turn of coalescing.
turn_off_coalesce |= has_expr(e, |e| !matches!(e, Expr::Column(_)));
}
if turn_off_coalesce {
let options = Arc::make_mut(&mut options);
if matches!(options.args.coalesce, JoinCoalesce::CoalesceColumns) {
polars_warn!("coalescing join requested but not all join keys are column references, turning off key coalescing");
}
options.args.coalesce = JoinCoalesce::KeepColumns;
}

options.args.validation.is_valid_join(&options.args.how)?;

polars_ensure!(
left_on.len() == right_on.len(),
ComputeError:
format!(
"the number of columns given as join key (left: {}, right:{}) should be equal",
left_on.len(),
right_on.len()
)
);
}

let input_left = to_alp_impl(owned(input_left), ctxt)
.map_err(|e| e.context(failed_input!(join left)))?;
let input_right = to_alp_impl(owned(input_right), ctxt)
.map_err(|e| e.context(failed_input!(join, right)))?;

let schema_left = ctxt.lp_arena.get(input_left).schema(ctxt.lp_arena);
let schema_right = ctxt.lp_arena.get(input_right).schema(ctxt.lp_arena);

let schema =
det_join_schema(&schema_left, &schema_right, &left_on, &right_on, &options)
.map_err(|e| e.context(failed_here!(join schema resolving)))?;

let left_on = to_expr_irs_ignore_alias(left_on, ctxt.expr_arena)?;
let right_on = to_expr_irs_ignore_alias(right_on, ctxt.expr_arena)?;
let mut joined_on = PlHashSet::new();
for (l, r) in left_on.iter().zip(right_on.iter()) {
polars_ensure!(
joined_on.insert((l.output_name(), r.output_name())),
InvalidOperation: "joining with repeated key names; already joined on {} and {}",
l.output_name(),
r.output_name()
)
}
drop(joined_on);

ctxt.conversion_optimizer
.fill_scratch(&left_on, ctxt.expr_arena);
ctxt.conversion_optimizer
.fill_scratch(&right_on, ctxt.expr_arena);

// Every expression must be elementwise so that we are
// guaranteed the keys for a join are all the same length.
let all_elementwise =
|aexprs: &[ExprIR]| all_streamable(aexprs, &*ctxt.expr_arena, Context::Default);
polars_ensure!(
all_elementwise(&left_on) && all_elementwise(&right_on),
InvalidOperation: "All join key expressions must be elementwise."
);
let lp = IR::Join {
input_left,
input_right,
schema,
left_on,
right_on,
options,
};
return run_conversion(lp, ctxt, "join");
let ir = join::resolve_join(input_left, input_right, left_on, right_on, predicates, options, ctxt)?;
return run_conversion(ir, ctxt, "join");
},
DslPlan::HStack {
input,
Expand Down
101 changes: 101 additions & 0 deletions crates/polars-plan/src/plans/conversion/join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use arrow::legacy::error::PolarsResult;
use polars_utils::arena::Arena;
use crate::dsl::{Expr, FunctionExpr};
use crate::plans::AExpr;
use crate::prelude::FunctionOptions;
use super::*;

pub fn resolve_join(
input_left: Arc<DslPlan>,
input_right: Arc<DslPlan>,
left_on: Vec<Expr>,
right_on: Vec<Expr>,
predicates: Vec<Expr>,
mut options: Arc<JoinOptions>,
ctxt: &mut DslConversionContext
) -> PolarsResult<IR> {
let owned = Arc::unwrap_or_clone;
if matches!(options.args.how, JoinType::Cross) {
polars_ensure!(left_on.len() + right_on.len() == 0, InvalidOperation: "a 'cross' join doesn't expect any join keys");
} else {
let mut turn_off_coalesce = false;
for e in left_on.iter().chain(right_on.iter()) {
if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) {
polars_bail!(
ComputeError:
"'alias' is not allowed in a join key, use 'with_columns' first",
)
}
// Any expression that is not a simple column expression will turn of coalescing.
turn_off_coalesce |= has_expr(e, |e| !matches!(e, Expr::Column(_)));
}
if turn_off_coalesce {
let options = Arc::make_mut(&mut options);
if matches!(options.args.coalesce, JoinCoalesce::CoalesceColumns) {
polars_warn!("coalescing join requested but not all join keys are column references, turning off key coalescing");
}
options.args.coalesce = JoinCoalesce::KeepColumns;
}

options.args.validation.is_valid_join(&options.args.how)?;

polars_ensure!(
left_on.len() == right_on.len(),
ComputeError:
format!(
"the number of columns given as join key (left: {}, right:{}) should be equal",
left_on.len(),
right_on.len()
)
);
}

let input_left = to_alp_impl(owned(input_left), ctxt)
.map_err(|e| e.context(failed_input!(join left)))?;
let input_right = to_alp_impl(owned(input_right), ctxt)
.map_err(|e| e.context(failed_input!(join, right)))?;

let schema_left = ctxt.lp_arena.get(input_left).schema(ctxt.lp_arena);
let schema_right = ctxt.lp_arena.get(input_right).schema(ctxt.lp_arena);

let schema =
det_join_schema(&schema_left, &schema_right, &left_on, &right_on, &options)
.map_err(|e| e.context(failed_here!(join schema resolving)))?;

let left_on = to_expr_irs_ignore_alias(left_on, ctxt.expr_arena)?;
let right_on = to_expr_irs_ignore_alias(right_on, ctxt.expr_arena)?;
let mut joined_on = PlHashSet::new();
for (l, r) in left_on.iter().zip(right_on.iter()) {
polars_ensure!(
joined_on.insert((l.output_name(), r.output_name())),
InvalidOperation: "joining with repeated key names; already joined on {} and {}",
l.output_name(),
r.output_name()
)
}
drop(joined_on);

ctxt.conversion_optimizer
.fill_scratch(&left_on, ctxt.expr_arena);
ctxt.conversion_optimizer
.fill_scratch(&right_on, ctxt.expr_arena);

// Every expression must be elementwise so that we are
// guaranteed the keys for a join are all the same length.
let all_elementwise =
|aexprs: &[ExprIR]| all_streamable(aexprs, &*ctxt.expr_arena, Context::Default);
polars_ensure!(
all_elementwise(&left_on) && all_elementwise(&right_on),
InvalidOperation: "All join key expressions must be elementwise."
);
let lp = IR::Join {
input_left,
input_right,
schema,
left_on,
right_on,
options,
};
Ok(lp)

}
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/conversion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use polars_utils::vec::ConvertVec;
use recursive::recursive;
mod functions;
pub(crate) mod type_coercion;
mod join;

pub(crate) use expr_expansion::{expand_selectors, is_regex_projection, prepare_projection};

Expand Down

0 comments on commit c1da0c6

Please sign in to comment.