Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum committed Feb 13, 2021
1 parent 9e9a633 commit ba71ead
Show file tree
Hide file tree
Showing 102 changed files with 3,535 additions and 3,901 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -921,8 +921,18 @@ object InferFiltersFromGenerate extends Rule[LogicalPlan] {
}
}

abstract class InferFiltersRule extends Rule[LogicalPlan]
with PredicateHelper with ConstraintHelper {
/**
* Generate a list of additional filters from an operator's existing constraint but remove those
* that are either already part of the operator's condition or are part of the operator's child
* constraints. These filters are currently inserted to the existing conditions in the Filter
* operators and on either side of Join operators.
*
* Note: While this optimization is applicable to a lot of types of join, it primarily benefits
* Inner and LeftSemi joins.
*/
object InferFiltersFromConstraints extends Rule[LogicalPlan]
with PredicateHelper with ConstraintHelper {

def apply(plan: LogicalPlan): LogicalPlan = {
if (SQLConf.get.constraintPropagationEnabled) {
inferFilters(plan)
Expand All @@ -931,14 +941,6 @@ abstract class InferFiltersRule extends Rule[LogicalPlan]
}
}

protected def getBaseConstraints(
left: LogicalPlan,
right: LogicalPlan,
conditionOpt: Option[Expression]): ExpressionSet = {
left.constraints.union(right.constraints)
.union(ExpressionSet(conditionOpt.map(splitConjunctivePredicates).getOrElse(Nil)))
}

private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition, child) =>
val newFilters = filter.constraints --
Expand All @@ -949,33 +951,40 @@ abstract class InferFiltersRule extends Rule[LogicalPlan]
filter
}

case join @ Join(left, right, joinType, _, _) =>
case join @ Join(left, right, joinType, conditionOpt, _) =>
joinType match {
// For inner join, we can infer additional filters for both sides. LeftSemi is kind of an
// inner join, it just drops the right side in the final output.
case _: InnerLike | LeftSemi =>
val allConstraints = getAllConstraints(join)
val allConstraints = getAllConstraints(left, right, conditionOpt)
val newLeft = inferNewFilter(left, allConstraints)
val newRight = inferNewFilter(right, allConstraints)
join.copy(left = newLeft, right = newRight)

// For right outer join, we can only infer additional filters for left side.
case RightOuter =>
val allConstraints = getAllConstraints(join)
val allConstraints = getAllConstraints(left, right, conditionOpt)
val newLeft = inferNewFilter(left, allConstraints)
join.copy(left = newLeft)

// For left join, we can only infer additional filters for right side.
case LeftOuter | LeftAnti =>
val allConstraints = getAllConstraints(join)
val allConstraints = getAllConstraints(left, right, conditionOpt)
val newRight = inferNewFilter(right, allConstraints)
join.copy(right = newRight)

case _ => join
}
}

protected def getAllConstraints(join: Join): ExpressionSet
private def getAllConstraints(
left: LogicalPlan,
right: LogicalPlan,
conditionOpt: Option[Expression]): ExpressionSet = {
val baseConstraints = left.constraints.union(right.constraints)
.union(ExpressionSet(conditionOpt.map(splitConjunctivePredicates).getOrElse(Nil)))
baseConstraints.union(inferAdditionalConstraints(baseConstraints))
}

private def inferNewFilter(plan: LogicalPlan, constraints: ExpressionSet): LogicalPlan = {
val newPredicates = constraints
Expand All @@ -991,22 +1000,6 @@ abstract class InferFiltersRule extends Rule[LogicalPlan]
}
}

/**
* Generate a list of additional filters from an operator's existing constraint but remove those
* that are either already part of the operator's condition or are part of the operator's child
* constraints. These filters are currently inserted to the existing conditions in the Filter
* operators and on either side of Join operators.
*
* Note: While this optimization is applicable to a lot of types of join, it primarily benefits
* Inner and LeftSemi joins.
*/
object InferFiltersFromConstraints extends InferFiltersRule {
override def getAllConstraints(join: Join): ExpressionSet = {
val baseConstraints = getBaseConstraints(join.left, join.right, join.condition)
baseConstraints.union(inferAdditionalConstraints(baseConstraints))
}
}

/**
* Combines all adjacent [[Union]] operators into a single [[Union]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,79 @@

package org.apache.spark.sql.execution.dynamicpruning

import org.apache.spark.sql.catalyst.expressions.{DynamicPruningSubquery, ExpressionSet}
import org.apache.spark.sql.catalyst.optimizer.InferFiltersRule
import org.apache.spark.sql.catalyst.plans.logical.Join

object InferDynamicPruningFilters extends InferFiltersRule {
override def getAllConstraints(join: Join): ExpressionSet = {
val baseConstraints = getBaseConstraints(join.left, join.right, join.condition)
val filtered = inferAdditionalConstraints(baseConstraints, true).filter {
import org.apache.spark.sql.catalyst.expressions.{And, DynamicPruningSubquery, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.{InnerLike, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{ConstraintHelper, Filter, Join, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.dynamicpruning.PartitionPruning._
import org.apache.spark.sql.internal.SQLConf

/**
* Similar to InferFiltersFromConstraints, this one only infer DynamicPruning filters.
*/
object InferDynamicPruningFilters extends Rule[LogicalPlan]
with PredicateHelper with ConstraintHelper {

def apply(plan: LogicalPlan): LogicalPlan = {
if (SQLConf.get.constraintPropagationEnabled) {
inferFilters(plan)
} else {
plan
}
}

private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform {
case join @ Join(left, right, joinType, _, _) =>
joinType match {
// For inner join, we can infer additional filters for both sides. LeftSemi is kind of an
// inner join, it just drops the right side in the final output.
case _: InnerLike | LeftSemi =>
val allConstraints = inferDynamicPrunings(join)
val newLeft = inferNewFilter(left, allConstraints)
val newRight = inferNewFilter(right, allConstraints)
join.copy(left = newLeft, right = newRight)

// For right outer join, we can only infer additional filters for left side.
case RightOuter =>
val allConstraints = inferDynamicPrunings(join)
val newLeft = inferNewFilter(left, allConstraints)
join.copy(left = newLeft)

// For left join, we can only infer additional filters for right side.
case LeftOuter | LeftAnti =>
val allConstraints = inferDynamicPrunings(join)
val newRight = inferNewFilter(right, allConstraints)
join.copy(right = newRight)

case _ => join
}
}

def inferDynamicPrunings(join: Join): ExpressionSet = {
val baseConstraints = join.left.constraints.union(join.right.constraints)
.union(ExpressionSet(join.condition.map(splitConjunctivePredicates).getOrElse(Nil)))
inferAdditionalConstraints(baseConstraints, true).filter {
case DynamicPruningSubquery(
pruningKey, buildQuery, buildKeys, broadcastKeyIndex, _, _) =>
PartitionPruning.getPartitionTableScan(pruningKey, join) match {
getPartitionTableScan(pruningKey, join) match {
case Some(partScan) =>
val otherExpr = buildKeys(broadcastKeyIndex)
PartitionPruning.pruningHasBenefit(pruningKey, partScan, otherExpr, buildQuery)
pruningHasBenefit(pruningKey, partScan, buildKeys(broadcastKeyIndex), buildQuery)
case _ =>
false
}
case _ => false
}
baseConstraints.union(filtered)
}

private def inferNewFilter(plan: LogicalPlan, dynamicPrunings: ExpressionSet): LogicalPlan = {
val newPredicates = dynamicPrunings
.filter { c =>
c.references.nonEmpty && c.references.subsetOf(plan.outputSet) && c.deterministic
} -- plan.constraints
if (newPredicates.isEmpty) {
plan
} else {
Filter(newPredicates.reduce(And), plan)
}
}
}
Loading

0 comments on commit ba71ead

Please sign in to comment.