Skip to content

Commit

Permalink
[SPARK-43838][SQL][FOLLOWUP] Replace HashSet with HashMap to impr…
Browse files Browse the repository at this point in the history
…ove performance of `DeduplicateRelations`

### What changes were proposed in this pull request?
This PR replaces `HashSet` that is currently used with a `HashMap` to improve `DeduplicateRelations` performance.
Additionally, this PR reverts #48053 as that change is no longer needed

### Why are the changes needed?
Current implementation doesn't utilize `HashSet` properly, but instead performs multiple linear searches on the set creating a O(n^2) complexity

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?
Existing tests

### Was this patch authored or co-authored using generative AI tooling?

Closes #48392 from mihailotim-db/mihailotim-db/master.

Authored-by: Mihailo Timotic <mihailo.timotic@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
mihailotim-db authored and HyukjinKwon committed Oct 9, 2024
1 parent fed9a8d commit f69d03e
Show file tree
Hide file tree
Showing 9 changed files with 504 additions and 515 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,12 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._

/**
* A helper class used to detect duplicate relations fast in `DeduplicateRelations`. Two relations
* are duplicated if:
* 1. they are the same class.
* 2. they have the same output attribute IDs.
*
* The first condition is necessary because the CTE relation definition node and reference node have
* the same output attribute IDs but they are not duplicated.
*/
case class RelationWrapper(cls: Class[_], outputAttrIds: Seq[Long])

object DeduplicateRelations extends Rule[LogicalPlan] {

type ExprIdMap = mutable.HashMap[Class[_], mutable.HashSet[Long]]

override def apply(plan: LogicalPlan): LogicalPlan = {
val newPlan = renewDuplicatedRelations(mutable.HashSet.empty, plan)._1
val newPlan = renewDuplicatedRelations(mutable.HashMap.empty, plan)._1

// Wait for `ResolveMissingReferences` to resolve missing attributes first
def noMissingInput(p: LogicalPlan) = !p.exists(_.missingInput.nonEmpty)
Expand Down Expand Up @@ -86,10 +78,10 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
}

private def existDuplicatedExprId(
existingRelations: mutable.HashSet[RelationWrapper],
plan: RelationWrapper): Boolean = {
existingRelations.filter(_.cls == plan.cls)
.exists(_.outputAttrIds.intersect(plan.outputAttrIds).nonEmpty)
existingRelations: ExprIdMap,
planClass: Class[_], exprIds: Seq[Long]): Boolean = {
val attrSet = existingRelations.getOrElse(planClass, mutable.HashSet.empty)
exprIds.exists(attrSet.contains)
}

/**
Expand All @@ -100,20 +92,16 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
* whether the plan is changed or not)
*/
private def renewDuplicatedRelations(
existingRelations: mutable.HashSet[RelationWrapper],
existingRelations: ExprIdMap,
plan: LogicalPlan): (LogicalPlan, Boolean) = plan match {
case p: LogicalPlan if p.isStreaming => (plan, false)

case m: MultiInstanceRelation =>
val planWrapper = RelationWrapper(m.getClass, m.output.map(_.exprId.id))
if (existingRelations.contains(planWrapper)) {
val newNode = m.newInstance()
newNode.copyTagsFrom(m)
(newNode, true)
} else {
existingRelations.add(planWrapper)
(m, false)
}
deduplicateAndRenew[LogicalPlan with MultiInstanceRelation](
existingRelations,
m,
_.output.map(_.exprId.id),
node => node.newInstance().asInstanceOf[LogicalPlan with MultiInstanceRelation])

case p: Project =>
deduplicateAndRenew[Project](
Expand Down Expand Up @@ -207,7 +195,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
}

private def deduplicate(
existingRelations: mutable.HashSet[RelationWrapper],
existingRelations: ExprIdMap,
plan: LogicalPlan): (LogicalPlan, Boolean) = {
var planChanged = false
val newPlan = if (plan.children.nonEmpty) {
Expand Down Expand Up @@ -291,20 +279,21 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
}

private def deduplicateAndRenew[T <: LogicalPlan](
existingRelations: mutable.HashSet[RelationWrapper], plan: T,
existingRelations: ExprIdMap, plan: T,
getExprIds: T => Seq[Long],
copyNewPlan: T => T): (LogicalPlan, Boolean) = {
var (newPlan, planChanged) = deduplicate(existingRelations, plan)
if (newPlan.resolved) {
val exprIds = getExprIds(newPlan.asInstanceOf[T])
if (exprIds.nonEmpty) {
val planWrapper = RelationWrapper(newPlan.getClass, exprIds)
if (existDuplicatedExprId(existingRelations, planWrapper)) {
if (existDuplicatedExprId(existingRelations, newPlan.getClass, exprIds)) {
newPlan = copyNewPlan(newPlan.asInstanceOf[T])
newPlan.copyTagsFrom(plan)
(newPlan, true)
} else {
existingRelations.add(planWrapper)
val attrSet = existingRelations.getOrElseUpdate(newPlan.getClass, mutable.HashSet.empty)
exprIds.foreach(attrSet.add)
existingRelations.put(newPlan.getClass, attrSet)
(newPlan, planChanged)
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,125 +175,125 @@ Input [6]: [i_product_name#12, i_brand#9, i_class#10, i_category#11, sum#21, cou
Keys [4]: [i_product_name#12, i_brand#9, i_class#10, i_category#11]
Functions [1]: [avg(qoh#18)]
Aggregate Attributes [1]: [avg(qoh#18)#23]
Results [5]: [i_product_name#12, i_brand#9, i_class#10, i_category#11, avg(qoh#18)#23 AS qoh#24]
Results [5]: [i_product_name#12 AS i_product_name#24, i_brand#9 AS i_brand#25, i_class#10 AS i_class#26, i_category#11 AS i_category#27, avg(qoh#18)#23 AS qoh#28]

(27) ReusedExchange [Reuses operator id: 23]
Output [6]: [i_product_name#25, i_brand#26, i_class#27, i_category#28, sum#29, count#30]
Output [6]: [i_product_name#29, i_brand#30, i_class#31, i_category#32, sum#33, count#34]

(28) HashAggregate [codegen id : 16]
Input [6]: [i_product_name#25, i_brand#26, i_class#27, i_category#28, sum#29, count#30]
Keys [4]: [i_product_name#25, i_brand#26, i_class#27, i_category#28]
Functions [1]: [avg(inv_quantity_on_hand#31)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#31)#17]
Results [4]: [i_product_name#25, i_brand#26, i_class#27, avg(inv_quantity_on_hand#31)#17 AS qoh#32]
Input [6]: [i_product_name#29, i_brand#30, i_class#31, i_category#32, sum#33, count#34]
Keys [4]: [i_product_name#29, i_brand#30, i_class#31, i_category#32]
Functions [1]: [avg(inv_quantity_on_hand#35)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#35)#17]
Results [4]: [i_product_name#29, i_brand#30, i_class#31, avg(inv_quantity_on_hand#35)#17 AS qoh#36]

(29) HashAggregate [codegen id : 16]
Input [4]: [i_product_name#25, i_brand#26, i_class#27, qoh#32]
Keys [3]: [i_product_name#25, i_brand#26, i_class#27]
Functions [1]: [partial_avg(qoh#32)]
Aggregate Attributes [2]: [sum#33, count#34]
Results [5]: [i_product_name#25, i_brand#26, i_class#27, sum#35, count#36]
Input [4]: [i_product_name#29, i_brand#30, i_class#31, qoh#36]
Keys [3]: [i_product_name#29, i_brand#30, i_class#31]
Functions [1]: [partial_avg(qoh#36)]
Aggregate Attributes [2]: [sum#37, count#38]
Results [5]: [i_product_name#29, i_brand#30, i_class#31, sum#39, count#40]

(30) Exchange
Input [5]: [i_product_name#25, i_brand#26, i_class#27, sum#35, count#36]
Arguments: hashpartitioning(i_product_name#25, i_brand#26, i_class#27, 5), ENSURE_REQUIREMENTS, [plan_id=5]
Input [5]: [i_product_name#29, i_brand#30, i_class#31, sum#39, count#40]
Arguments: hashpartitioning(i_product_name#29, i_brand#30, i_class#31, 5), ENSURE_REQUIREMENTS, [plan_id=5]

(31) HashAggregate [codegen id : 17]
Input [5]: [i_product_name#25, i_brand#26, i_class#27, sum#35, count#36]
Keys [3]: [i_product_name#25, i_brand#26, i_class#27]
Functions [1]: [avg(qoh#32)]
Aggregate Attributes [1]: [avg(qoh#32)#37]
Results [5]: [i_product_name#25, i_brand#26, i_class#27, null AS i_category#38, avg(qoh#32)#37 AS qoh#39]
Input [5]: [i_product_name#29, i_brand#30, i_class#31, sum#39, count#40]
Keys [3]: [i_product_name#29, i_brand#30, i_class#31]
Functions [1]: [avg(qoh#36)]
Aggregate Attributes [1]: [avg(qoh#36)#41]
Results [5]: [i_product_name#29, i_brand#30, i_class#31, null AS i_category#42, avg(qoh#36)#41 AS qoh#43]

(32) ReusedExchange [Reuses operator id: 23]
Output [6]: [i_product_name#40, i_brand#41, i_class#42, i_category#43, sum#44, count#45]
Output [6]: [i_product_name#44, i_brand#45, i_class#46, i_category#47, sum#48, count#49]

(33) HashAggregate [codegen id : 25]
Input [6]: [i_product_name#40, i_brand#41, i_class#42, i_category#43, sum#44, count#45]
Keys [4]: [i_product_name#40, i_brand#41, i_class#42, i_category#43]
Functions [1]: [avg(inv_quantity_on_hand#46)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#46)#17]
Results [3]: [i_product_name#40, i_brand#41, avg(inv_quantity_on_hand#46)#17 AS qoh#47]
Input [6]: [i_product_name#44, i_brand#45, i_class#46, i_category#47, sum#48, count#49]
Keys [4]: [i_product_name#44, i_brand#45, i_class#46, i_category#47]
Functions [1]: [avg(inv_quantity_on_hand#50)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#50)#17]
Results [3]: [i_product_name#44, i_brand#45, avg(inv_quantity_on_hand#50)#17 AS qoh#51]

(34) HashAggregate [codegen id : 25]
Input [3]: [i_product_name#40, i_brand#41, qoh#47]
Keys [2]: [i_product_name#40, i_brand#41]
Functions [1]: [partial_avg(qoh#47)]
Aggregate Attributes [2]: [sum#48, count#49]
Results [4]: [i_product_name#40, i_brand#41, sum#50, count#51]
Input [3]: [i_product_name#44, i_brand#45, qoh#51]
Keys [2]: [i_product_name#44, i_brand#45]
Functions [1]: [partial_avg(qoh#51)]
Aggregate Attributes [2]: [sum#52, count#53]
Results [4]: [i_product_name#44, i_brand#45, sum#54, count#55]

(35) Exchange
Input [4]: [i_product_name#40, i_brand#41, sum#50, count#51]
Arguments: hashpartitioning(i_product_name#40, i_brand#41, 5), ENSURE_REQUIREMENTS, [plan_id=6]
Input [4]: [i_product_name#44, i_brand#45, sum#54, count#55]
Arguments: hashpartitioning(i_product_name#44, i_brand#45, 5), ENSURE_REQUIREMENTS, [plan_id=6]

(36) HashAggregate [codegen id : 26]
Input [4]: [i_product_name#40, i_brand#41, sum#50, count#51]
Keys [2]: [i_product_name#40, i_brand#41]
Functions [1]: [avg(qoh#47)]
Aggregate Attributes [1]: [avg(qoh#47)#52]
Results [5]: [i_product_name#40, i_brand#41, null AS i_class#53, null AS i_category#54, avg(qoh#47)#52 AS qoh#55]
Input [4]: [i_product_name#44, i_brand#45, sum#54, count#55]
Keys [2]: [i_product_name#44, i_brand#45]
Functions [1]: [avg(qoh#51)]
Aggregate Attributes [1]: [avg(qoh#51)#56]
Results [5]: [i_product_name#44, i_brand#45, null AS i_class#57, null AS i_category#58, avg(qoh#51)#56 AS qoh#59]

(37) ReusedExchange [Reuses operator id: 23]
Output [6]: [i_product_name#56, i_brand#57, i_class#58, i_category#59, sum#60, count#61]
Output [6]: [i_product_name#60, i_brand#61, i_class#62, i_category#63, sum#64, count#65]

(38) HashAggregate [codegen id : 34]
Input [6]: [i_product_name#56, i_brand#57, i_class#58, i_category#59, sum#60, count#61]
Keys [4]: [i_product_name#56, i_brand#57, i_class#58, i_category#59]
Functions [1]: [avg(inv_quantity_on_hand#62)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#62)#17]
Results [2]: [i_product_name#56, avg(inv_quantity_on_hand#62)#17 AS qoh#63]
Input [6]: [i_product_name#60, i_brand#61, i_class#62, i_category#63, sum#64, count#65]
Keys [4]: [i_product_name#60, i_brand#61, i_class#62, i_category#63]
Functions [1]: [avg(inv_quantity_on_hand#66)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#66)#17]
Results [2]: [i_product_name#60, avg(inv_quantity_on_hand#66)#17 AS qoh#67]

(39) HashAggregate [codegen id : 34]
Input [2]: [i_product_name#56, qoh#63]
Keys [1]: [i_product_name#56]
Functions [1]: [partial_avg(qoh#63)]
Aggregate Attributes [2]: [sum#64, count#65]
Results [3]: [i_product_name#56, sum#66, count#67]
Input [2]: [i_product_name#60, qoh#67]
Keys [1]: [i_product_name#60]
Functions [1]: [partial_avg(qoh#67)]
Aggregate Attributes [2]: [sum#68, count#69]
Results [3]: [i_product_name#60, sum#70, count#71]

(40) Exchange
Input [3]: [i_product_name#56, sum#66, count#67]
Arguments: hashpartitioning(i_product_name#56, 5), ENSURE_REQUIREMENTS, [plan_id=7]
Input [3]: [i_product_name#60, sum#70, count#71]
Arguments: hashpartitioning(i_product_name#60, 5), ENSURE_REQUIREMENTS, [plan_id=7]

(41) HashAggregate [codegen id : 35]
Input [3]: [i_product_name#56, sum#66, count#67]
Keys [1]: [i_product_name#56]
Functions [1]: [avg(qoh#63)]
Aggregate Attributes [1]: [avg(qoh#63)#68]
Results [5]: [i_product_name#56, null AS i_brand#69, null AS i_class#70, null AS i_category#71, avg(qoh#63)#68 AS qoh#72]
Input [3]: [i_product_name#60, sum#70, count#71]
Keys [1]: [i_product_name#60]
Functions [1]: [avg(qoh#67)]
Aggregate Attributes [1]: [avg(qoh#67)#72]
Results [5]: [i_product_name#60, null AS i_brand#73, null AS i_class#74, null AS i_category#75, avg(qoh#67)#72 AS qoh#76]

(42) ReusedExchange [Reuses operator id: 23]
Output [6]: [i_product_name#73, i_brand#74, i_class#75, i_category#76, sum#77, count#78]
Output [6]: [i_product_name#77, i_brand#78, i_class#79, i_category#80, sum#81, count#82]

(43) HashAggregate [codegen id : 43]
Input [6]: [i_product_name#73, i_brand#74, i_class#75, i_category#76, sum#77, count#78]
Keys [4]: [i_product_name#73, i_brand#74, i_class#75, i_category#76]
Functions [1]: [avg(inv_quantity_on_hand#79)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#79)#17]
Results [1]: [avg(inv_quantity_on_hand#79)#17 AS qoh#80]
Input [6]: [i_product_name#77, i_brand#78, i_class#79, i_category#80, sum#81, count#82]
Keys [4]: [i_product_name#77, i_brand#78, i_class#79, i_category#80]
Functions [1]: [avg(inv_quantity_on_hand#83)]
Aggregate Attributes [1]: [avg(inv_quantity_on_hand#83)#17]
Results [1]: [avg(inv_quantity_on_hand#83)#17 AS qoh#84]

(44) HashAggregate [codegen id : 43]
Input [1]: [qoh#80]
Input [1]: [qoh#84]
Keys: []
Functions [1]: [partial_avg(qoh#80)]
Aggregate Attributes [2]: [sum#81, count#82]
Results [2]: [sum#83, count#84]
Functions [1]: [partial_avg(qoh#84)]
Aggregate Attributes [2]: [sum#85, count#86]
Results [2]: [sum#87, count#88]

(45) Exchange
Input [2]: [sum#83, count#84]
Input [2]: [sum#87, count#88]
Arguments: SinglePartition, ENSURE_REQUIREMENTS, [plan_id=8]

(46) HashAggregate [codegen id : 44]
Input [2]: [sum#83, count#84]
Input [2]: [sum#87, count#88]
Keys: []
Functions [1]: [avg(qoh#80)]
Aggregate Attributes [1]: [avg(qoh#80)#85]
Results [5]: [null AS i_product_name#86, null AS i_brand#87, null AS i_class#88, null AS i_category#89, avg(qoh#80)#85 AS qoh#90]
Functions [1]: [avg(qoh#84)]
Aggregate Attributes [1]: [avg(qoh#84)#89]
Results [5]: [null AS i_product_name#90, null AS i_brand#91, null AS i_class#92, null AS i_category#93, avg(qoh#84)#89 AS qoh#94]

(47) Union

(48) TakeOrderedAndProject
Input [5]: [i_product_name#12, i_brand#9, i_class#10, i_category#11, qoh#24]
Arguments: 100, [qoh#24 ASC NULLS FIRST, i_product_name#12 ASC NULLS FIRST, i_brand#9 ASC NULLS FIRST, i_class#10 ASC NULLS FIRST, i_category#11 ASC NULLS FIRST], [i_product_name#12, i_brand#9, i_class#10, i_category#11, qoh#24]
Input [5]: [i_product_name#24, i_brand#25, i_class#26, i_category#27, qoh#28]
Arguments: 100, [qoh#28 ASC NULLS FIRST, i_product_name#24 ASC NULLS FIRST, i_brand#25 ASC NULLS FIRST, i_class#26 ASC NULLS FIRST, i_category#27 ASC NULLS FIRST], [i_product_name#24, i_brand#25, i_class#26, i_category#27, qoh#28]

===== Subqueries =====

Expand All @@ -306,22 +306,22 @@ BroadcastExchange (53)


(49) Scan parquet spark_catalog.default.date_dim
Output [2]: [d_date_sk#7, d_month_seq#91]
Output [2]: [d_date_sk#7, d_month_seq#95]
Batched: true
Location [not included in comparison]/{warehouse_dir}/date_dim]
PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223), IsNotNull(d_date_sk)]
ReadSchema: struct<d_date_sk:int,d_month_seq:int>

(50) ColumnarToRow [codegen id : 1]
Input [2]: [d_date_sk#7, d_month_seq#91]
Input [2]: [d_date_sk#7, d_month_seq#95]

(51) Filter [codegen id : 1]
Input [2]: [d_date_sk#7, d_month_seq#91]
Condition : (((isnotnull(d_month_seq#91) AND (d_month_seq#91 >= 1212)) AND (d_month_seq#91 <= 1223)) AND isnotnull(d_date_sk#7))
Input [2]: [d_date_sk#7, d_month_seq#95]
Condition : (((isnotnull(d_month_seq#95) AND (d_month_seq#95 >= 1212)) AND (d_month_seq#95 <= 1223)) AND isnotnull(d_date_sk#7))

(52) Project [codegen id : 1]
Output [1]: [d_date_sk#7]
Input [2]: [d_date_sk#7, d_month_seq#91]
Input [2]: [d_date_sk#7, d_month_seq#95]

(53) BroadcastExchange
Input [1]: [d_date_sk#7]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
TakeOrderedAndProject [qoh,i_product_name,i_brand,i_class,i_category]
Union
WholeStageCodegen (8)
HashAggregate [i_product_name,i_brand,i_class,i_category,sum,count] [avg(qoh),qoh,sum,count]
HashAggregate [i_product_name,i_brand,i_class,i_category,sum,count] [avg(qoh),i_product_name,i_brand,i_class,i_category,qoh,sum,count]
HashAggregate [i_product_name,i_brand,i_class,i_category,qoh] [sum,count,sum,count]
HashAggregate [i_product_name,i_brand,i_class,i_category,sum,count] [avg(inv_quantity_on_hand),qoh,sum,count]
InputAdapter
Expand Down
Loading

0 comments on commit f69d03e

Please sign in to comment.