Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27393][SQL] Show ReusedSubquery in the plan when the subquery is reused #24258

Closed
wants to merge 3 commits into from

Conversation

gatorsmile
Copy link
Member

What changes were proposed in this pull request?

With this change, we can easily identify the plan difference when subquery is reused.

When the reuse is enabled, the plan looks like

== Physical Plan ==
CollectLimit 1
+- *(1) Project [(Subquery subquery240 + ReusedSubquery Subquery subquery240) AS (scalarsubquery() + scalarsubquery())#253]
   :  :- Subquery subquery240
   :  :  +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#250])
   :  :     +- Exchange SinglePartition
   :  :        +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#256, count#257L])
   :  :           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
   :  :              +- Scan[obj#12]
   :  +- ReusedSubquery Subquery subquery240
   +- *(1) SerializeFromObject
      +- Scan[obj#12]

When the reuse is disabled, the plan looks like

== Physical Plan ==
CollectLimit 1
+- *(1) Project [(Subquery subquery286 + Subquery subquery287) AS (scalarsubquery() + scalarsubquery())#299]
   :  :- Subquery subquery286
   :  :  +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#296])
   :  :     +- Exchange SinglePartition
   :  :        +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#302, count#303L])
   :  :           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
   :  :              +- Scan[obj#12]
   :  +- Subquery subquery287
   :     +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#298])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#306, count#307L])
   :              +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
   :                 +- Scan[obj#12]
   +- *(1) SerializeFromObject
      +- Scan[obj#12]

How was this patch tested?

Modified the existing test.

@gatorsmile
Copy link
Member Author

gatorsmile commented Mar 31, 2019

@@ -113,33 +113,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("Reuse Subquery") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to SubquerySuite.scala

@SparkQA
Copy link

SparkQA commented Mar 31, 2019

Test build #104128 has finished for PR 24258 at commit 14f439b.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 31, 2019

Test build #104127 has finished for PR 24258 at commit 2ebdcab.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class BaseSubqueryExec extends SparkPlan
  • case class SubqueryExec(name: String, child: SparkPlan)
  • case class ReusedSubqueryExec(child: BaseSubqueryExec)
  • abstract class ExecSubqueryExpression extends PlanExpression[BaseSubqueryExec]

@adrian-wang
Copy link
Contributor

retest this please.

@SparkQA
Copy link

SparkQA commented Mar 31, 2019

Test build #104138 has finished for PR 24258 at commit 14f439b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Normalize the outer references in the subquery plan.
val normalizedPlan = plan.transformAllExpressions {
case OuterReference(r) => OuterReference(QueryPlan.normalizeExprId(r, attrs))
}
}.canonicalized
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to the canonicalized plan here? It seems all the canonicalization should happen in line 69?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.{aggregate, ScalarSubquery, SubqueryExec}
import org.apache.spark.sql.execution.aggregate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can remove this import by changing aggregate.HashAggregateExec -> HashAggregateExec in line 263?


override def executeCollect(): Array[InternalRow] = {
child.executeCollect()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit:

  protected override def doPrepare(): Unit = child.prepare()
  protected override def doExecute(): RDD[InternalRow] = child.execute()
  override def executeCollect(): Array[InternalRow] = child.executeCollect()

@maropu
Copy link
Member

maropu commented Apr 1, 2019

btw, is it not worth filing a new jira for this refactoring?

"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect"))
abstract class BaseSubqueryExec extends SparkPlan {
def name: String
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need name in this base class for subquery exec?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both ReusedSubqueryExec and SubqueryExec have the name

plan transformAllExpressions {
case sub: ExecSubqueryExpression =>
val sameSchema = subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[SubqueryExec]())
val sameSchema =
subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[BaseSubqueryExec]())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change it to BaseSubqueryExec

@cloud-fan
Copy link
Contributor

Shall we create a new JIRA? This improves the SQL web UI

@gatorsmile gatorsmile changed the title [SPARK-27279][SQL][FOLLOW-UP] Reuse subquery should compare child plan of SubqueryExec [SPARK-27393][SQL] Show ReusedSubquery in the plan when the subquery is reused Apr 4, 2019
@SparkQA
Copy link

SparkQA commented Apr 5, 2019

Test build #104306 has finished for PR 24258 at commit 7036cfa.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 5, 2019

Test build #4691 has finished for PR 24258 at commit 7036cfa.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Apr 5, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Apr 5, 2019

Test build #104314 has finished for PR 24258 at commit 7036cfa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

Thanks! Merged to master.

@gatorsmile gatorsmile closed this in 5678e68 Apr 5, 2019
j-baker pushed a commit to palantir/spark that referenced this pull request Jan 25, 2020
…is reused

With this change, we can easily identify the plan difference when subquery is reused.

When the reuse is enabled, the plan looks like
```
== Physical Plan ==
CollectLimit 1
+- *(1) Project [(Subquery subquery240 + ReusedSubquery Subquery subquery240) AS (scalarsubquery() + scalarsubquery())#253]
   :  :- Subquery subquery240
   :  :  +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#250])
   :  :     +- Exchange SinglePartition
   :  :        +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#256, count#257L])
   :  :           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
   :  :              +- Scan[obj#12]
   :  +- ReusedSubquery Subquery subquery240
   +- *(1) SerializeFromObject
      +- Scan[obj#12]
```

When the reuse is disabled, the plan looks like
```
== Physical Plan ==
CollectLimit 1
+- *(1) Project [(Subquery subquery286 + Subquery subquery287) AS (scalarsubquery() + scalarsubquery())#299]
   :  :- Subquery subquery286
   :  :  +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#296])
   :  :     +- Exchange SinglePartition
   :  :        +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#302, count#303L])
   :  :           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
   :  :              +- Scan[obj#12]
   :  +- Subquery subquery287
   :     +- *(2) HashAggregate(keys=[], functions=[avg(cast(key#13 as bigint))], output=[avg(key)#298])
   :        +- Exchange SinglePartition
   :           +- *(1) HashAggregate(keys=[], functions=[partial_avg(cast(key#13 as bigint))], output=[sum#306, count#307L])
   :              +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13]
   :                 +- Scan[obj#12]
   +- *(1) SerializeFromObject
      +- Scan[obj#12]
```

Modified the existing test.

Closes apache#24258 from gatorsmile/followupSPARK-27279.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants