Skip to content

Commit

Permalink
init group join implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
maruschin committed Jan 27, 2025
1 parent 09a0844 commit c8b6b5a
Show file tree
Hide file tree
Showing 16 changed files with 250 additions and 23 deletions.
7 changes: 7 additions & 0 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,13 @@ impl FunctionalDependencies {
left_func_dependencies.extend(right_func_dependencies);
left_func_dependencies
}
JoinType::LeftGroup => {
right_func_dependencies.add_offset(left_cols_len);
left_func_dependencies =
left_func_dependencies.with_dependency(Dependency::Single);
right_func_dependencies.downgrade_dependencies();
left_func_dependencies
}
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
// These joins preserve functional dependencies of the left side:
left_func_dependencies
Expand Down
9 changes: 9 additions & 0 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub enum JoinType {
///
/// [1]: http://btw2017.informatik.uni-stuttgart.de/slidesandpapers/F1-10-37/paper_web.pdf
LeftMark,
/// Left Group join
///
/// [2]: https://www.vldb.org/pvldb/vol4/p843-moerkotte.pdf
LeftGroup,
}

impl JoinType {
Expand All @@ -90,6 +94,9 @@ impl JoinType {
JoinType::LeftMark => {
unreachable!("LeftMark join type does not support swapping")
}
JoinType::LeftGroup => {
unreachable!("LeftGroup join type does not support swapping")
}
}
}

Expand Down Expand Up @@ -121,6 +128,7 @@ impl Display for JoinType {
JoinType::LeftAnti => "LeftAnti",
JoinType::RightAnti => "RightAnti",
JoinType::LeftMark => "LeftMark",
JoinType::LeftGroup => "LeftGroup",
};
write!(f, "{join_type}")
}
Expand All @@ -141,6 +149,7 @@ impl FromStr for JoinType {
"LEFTANTI" => Ok(JoinType::LeftAnti),
"RIGHTANTI" => Ok(JoinType::RightAnti),
"LEFTMARK" => Ok(JoinType::LeftMark),
"LEFTGROUP" => Ok(JoinType::LeftGroup),
_ => _not_impl_err!("The join type {s} does not exist or is not implemented"),
}
}
Expand Down
14 changes: 13 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,10 @@ impl LogicalPlanBuilder {
return plan_err!("left_keys and right_keys were not the same length");
}

if join_type == JoinType::LeftGroup {
return plan_err!("Cannot create LefGroup Join manualy.");
}

let filter = if let Some(expr) = filter {
let filter = normalize_col_with_schemas_and_ambiguity_check(
expr,
Expand Down Expand Up @@ -977,6 +981,8 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::On,
schema: DFSchemaRef::new(join_schema),
null_equals_null,
group_expr: None,
aggr_expr: None,
})))
}

Expand Down Expand Up @@ -1041,6 +1047,8 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::Using,
schema: DFSchemaRef::new(join_schema),
null_equals_null: false,
group_expr: None,
aggr_expr: None,
})))
}
}
Expand All @@ -1058,6 +1066,8 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::On,
null_equals_null: false,
schema: DFSchemaRef::new(join_schema),
group_expr: None,
aggr_expr: None,
})))
}

Expand Down Expand Up @@ -1276,6 +1286,8 @@ impl LogicalPlanBuilder {
join_constraint: JoinConstraint::On,
schema: DFSchemaRef::new(join_schema),
null_equals_null: false,
group_expr: None,
aggr_expr: None,
})))
}

Expand Down Expand Up @@ -1389,7 +1401,7 @@ pub fn build_join_schema(
.collect::<Vec<_>>();
left_fields.into_iter().chain(right_fields).collect()
}
JoinType::Left => {
JoinType::Left | JoinType::LeftGroup => {
// left then right, right set to nullable in case of not matched scenario
let left_fields = left_fields
.map(|(q, f)| (q.cloned(), Arc::clone(f)))
Expand Down
3 changes: 2 additions & 1 deletion datafusion/expr/src/logical_plan/invariants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Re
JoinType::Left
| JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::LeftMark => {
| JoinType::LeftMark
| JoinType::LeftGroup => {
check_inner_plan(left, can_contain_outer_ref)?;
check_inner_plan(right, false)
}
Expand Down
24 changes: 20 additions & 4 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,11 @@ impl LogicalPlan {
join_type,
..
}) => match join_type {
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
JoinType::Inner
| JoinType::Left
| JoinType::Right
| JoinType::Full
| JoinType::LeftGroup => {
if left.schema().fields().is_empty() {
right.head_output_expr()
} else {
Expand Down Expand Up @@ -658,6 +662,8 @@ impl LogicalPlan {
on,
schema: _,
null_equals_null,
group_expr,
aggr_expr,
}) => {
let schema =
build_join_schema(left.schema(), right.schema(), &join_type)?;
Expand All @@ -679,6 +685,8 @@ impl LogicalPlan {
filter,
schema: DFSchemaRef::new(schema),
null_equals_null,
group_expr,
aggr_expr,
}))
}
LogicalPlan::Subquery(_) => Ok(self),
Expand Down Expand Up @@ -932,6 +940,8 @@ impl LogicalPlan {
filter: filter_expr,
schema: DFSchemaRef::new(schema),
null_equals_null: *null_equals_null,
group_expr: None,
aggr_expr: None,
}))
}
LogicalPlan::Subquery(Subquery {
Expand Down Expand Up @@ -1330,9 +1340,10 @@ impl LogicalPlan {
(left_max, right_max, _) => Some(left_max * right_max),
}
}
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {
left.max_rows()
}
JoinType::LeftSemi
| JoinType::LeftAnti
| JoinType::LeftMark
| JoinType::LeftGroup => left.max_rows(),
JoinType::RightSemi | JoinType::RightAnti => right.max_rows(),
},
LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
Expand Down Expand Up @@ -3295,6 +3306,9 @@ pub struct Join {
pub schema: DFSchemaRef,
/// If null_equals_null is true, null == null else null != null
pub null_equals_null: bool,
/// Only for GroupJoin
pub group_expr: Option<Vec<Expr>>,
pub aggr_expr: Option<Vec<Expr>>,
}

impl Join {
Expand Down Expand Up @@ -3328,6 +3342,8 @@ impl Join {
join_constraint: original_join.join_constraint,
schema: Arc::new(join_schema),
null_equals_null: original_join.null_equals_null,
group_expr: original_join.group_expr.clone(),
aggr_expr: original_join.aggr_expr.clone(),
})
}
}
Expand Down
8 changes: 8 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ impl TreeNode for LogicalPlan {
join_constraint,
schema,
null_equals_null,
group_expr,
aggr_expr,
}) => (left, right).map_elements(f)?.update_data(|(left, right)| {
LogicalPlan::Join(Join {
left,
Expand All @@ -151,6 +153,8 @@ impl TreeNode for LogicalPlan {
join_constraint,
schema,
null_equals_null,
group_expr,
aggr_expr,
})
}),
LogicalPlan::Limit(Limit { skip, fetch, input }) => input
Expand Down Expand Up @@ -573,6 +577,8 @@ impl LogicalPlan {
join_constraint,
schema,
null_equals_null,
group_expr,
aggr_expr,
}) => (on, filter).map_elements(f)?.update_data(|(on, filter)| {
LogicalPlan::Join(Join {
left,
Expand All @@ -583,6 +589,8 @@ impl LogicalPlan {
join_constraint,
schema,
null_equals_null,
group_expr,
aggr_expr,
})
}),
LogicalPlan::Sort(Sort { expr, input, fetch }) => expr
Expand Down
4 changes: 4 additions & 0 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,8 @@ fn find_inner_join(
filter: None,
schema: join_schema,
null_equals_null: false,
group_expr: None,
aggr_expr: None,
}));
}
}
Expand All @@ -351,6 +353,8 @@ fn find_inner_join(
join_type: JoinType::Inner,
join_constraint: JoinConstraint::On,
null_equals_null: false,
aggr_expr: None,
group_expr: None,
}))
}

Expand Down
8 changes: 1 addition & 7 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,8 @@ impl OptimizerRule for EliminateOuterJoin {
};

let new_join = Arc::new(LogicalPlan::Join(Join {
left: join.left,
right: join.right,
join_type: new_join_type,
join_constraint: join.join_constraint,
on: join.on.clone(),
filter: join.filter.clone(),
schema: Arc::clone(&join.schema),
null_equals_null: join.null_equals_null,
..join
}));
Filter::try_new(filter.predicate, new_join)
.map(|f| Transformed::yes(LogicalPlan::Filter(f)))
Expand Down
6 changes: 6 additions & 0 deletions datafusion/optimizer/src/extract_equijoin_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ impl OptimizerRule for ExtractEquijoinPredicate {
join_constraint,
schema,
null_equals_null,
group_expr,
aggr_expr,
}) => {
let left_schema = left.schema();
let right_schema = right.schema();
Expand All @@ -93,6 +95,8 @@ impl OptimizerRule for ExtractEquijoinPredicate {
join_constraint,
schema,
null_equals_null,
group_expr,
aggr_expr,
})))
} else {
Ok(Transformed::no(LogicalPlan::Join(Join {
Expand All @@ -104,6 +108,8 @@ impl OptimizerRule for ExtractEquijoinPredicate {
join_constraint,
schema,
null_equals_null,
group_expr,
aggr_expr,
})))
}
}
Expand Down
Loading

0 comments on commit c8b6b5a

Please sign in to comment.