Skip to content

Commit

Permalink
Replace HashSet with HashMap
Browse files Browse the repository at this point in the history
  • Loading branch information
mihailotim-db committed Oct 9, 2024
1 parent 92e79e3 commit 8219901
Showing 1 changed file with 20 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,12 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._

/**
* A helper class used to detect duplicate relations fast in `DeduplicateRelations`. Two relations
* are duplicated if:
* 1. they are the same class.
* 2. they have the same output attribute IDs.
*
* The first condition is necessary because the CTE relation definition node and reference node have
* the same output attribute IDs but they are not duplicated.
*/
case class RelationWrapper(cls: Class[_], outputAttrIds: Seq[Long])

object DeduplicateRelations extends Rule[LogicalPlan] {

type ExprIdMap = mutable.HashMap[Class[_], mutable.HashSet[Long]]

override def apply(plan: LogicalPlan): LogicalPlan = {
val newPlan = renewDuplicatedRelations(mutable.HashSet.empty, plan)._1
val newPlan = renewDuplicatedRelations(mutable.HashMap.empty, plan)._1

// Wait for `ResolveMissingReferences` to resolve missing attributes first
def noMissingInput(p: LogicalPlan) = !p.exists(_.missingInput.nonEmpty)
Expand Down Expand Up @@ -86,10 +78,10 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
}

private def existDuplicatedExprId(
existingRelations: mutable.HashSet[RelationWrapper],
plan: RelationWrapper): Boolean = {
existingRelations.filter(_.cls == plan.cls)
.exists(_.outputAttrIds.intersect(plan.outputAttrIds).nonEmpty)
existingRelations: ExprIdMap,
planClass: Class[_], exprIds: Seq[Long]): Boolean = {
val attrSet = existingRelations.getOrElse(planClass, mutable.HashSet.empty)
exprIds.exists(attrSet.contains)
}

/**
Expand All @@ -100,20 +92,16 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
* whether the plan is changed or not)
*/
private def renewDuplicatedRelations(
existingRelations: mutable.HashSet[RelationWrapper],
existingRelations: ExprIdMap,
plan: LogicalPlan): (LogicalPlan, Boolean) = plan match {
case p: LogicalPlan if p.isStreaming => (plan, false)

case m: MultiInstanceRelation =>
val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
if (existingRelations.contains(planWrapper)) {
val newNode = m.newInstance()
newNode.copyTagsFrom(m)
(newNode, true)
} else {
existingRelations.add(planWrapper)
(m, false)
}
deduplicateAndRenew[LogicalPlan with MultiInstanceRelation](
existingRelations,
m,
_.output.map(_.exprId.id),
node => node.newInstance().asInstanceOf[LogicalPlan with MultiInstanceRelation])

case p: Project =>
deduplicateAndRenew[Project](
Expand Down Expand Up @@ -207,7 +195,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
}

private def deduplicate(
existingRelations: mutable.HashSet[RelationWrapper],
existingRelations: ExprIdMap,
plan: LogicalPlan): (LogicalPlan, Boolean) = {
var planChanged = false
val newPlan = if (plan.children.nonEmpty) {
Expand Down Expand Up @@ -291,20 +279,21 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
}

private def deduplicateAndRenew[T <: LogicalPlan](
existingRelations: mutable.HashSet[RelationWrapper], plan: T,
existingRelations: ExprIdMap, plan: T,
getExprIds: T => Seq[Long],
copyNewPlan: T => T): (LogicalPlan, Boolean) = {
var (newPlan, planChanged) = deduplicate(existingRelations, plan)
if (newPlan.resolved) {
val exprIds = getExprIds(newPlan.asInstanceOf[T])
if (exprIds.nonEmpty) {
val planWrapper = RelationWrapper(newPlan.getClass, exprIds)
if (existDuplicatedExprId(existingRelations, planWrapper)) {
if (existDuplicatedExprId(existingRelations, newPlan.getClass, exprIds)) {
newPlan = copyNewPlan(newPlan.asInstanceOf[T])
newPlan.copyTagsFrom(plan)
(newPlan, true)
} else {
existingRelations.add(planWrapper)
val attrSet = existingRelations.getOrElseUpdate(newPlan.getClass, mutable.HashSet.empty)
exprIds.foreach(attrSet.add)
existingRelations.put(newPlan.getClass, attrSet)
(newPlan, planChanged)
}
} else {
Expand Down

0 comments on commit 8219901

Please sign in to comment.