Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21726][SQL] Check for structural integrity of the plan in Optimzer in test mode. #18956

Closed
wants to merge 6 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Aug 16, 2017

What changes were proposed in this pull request?

We have many optimization rules now in Optimzer. Right now we don't have any checks in the optimizer to check for the structural integrity of the plan (e.g. resolved). When debugging, it is difficult to identify which rules return invalid plans.

It would be great if in test mode, we can check whether a plan is still resolved after the execution of each rule, so we can catch rules that return invalid plans.

How was this patch tested?

Added tests.

@@ -37,6 +37,12 @@ import org.apache.spark.sql.types._
abstract class Optimizer(sessionCatalog: SessionCatalog)
extends RuleExecutor[LogicalPlan] {

// Check for structural integrity of the plan in test mode. Currently we only check if a plan is
// still resolved after the execution of each rule.
override protected def planChecker: Option[LogicalPlan => Boolean] = Some(
Copy link
Contributor

@rxin rxin Aug 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move the checking of whether this is a test in here, then this method simply returns boolean, and by default it returns true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I will update it.

@SparkQA
Copy link

SparkQA commented Aug 16, 2017

Test build #80715 has finished for PR 18956 at commit 21d86ba.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2017

Test build #80717 has finished for PR 18956 at commit 9170ceb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 16, 2017

Test build #80718 has finished for PR 18956 at commit c99011d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Aug 16, 2017

Interesting, existing PullupCorrelatedPredicates produces unresolved plan. I'll figure out the reason.

@viirya
Copy link
Member Author

viirya commented Aug 16, 2017

The reason PullupCorrelatedPredicates produces unresolved plan:

The query causing the problem in subquery_in_having.q looks like:

select b.key, min(b.value)
from src b
group by b.key
having b.key in ( select a.key
                from src a
                where a.value > 'val_9' and a.value = min(b.value)
                )
order by b.key
;

The optimized plan looks like:

'Sort [key#201 ASC NULLS FIRST], true
+- 'Project [key#201, min(value)#204]
   +- 'Filter key#201 IN (list#200 [(value#207 = min(value)#204)])
      :  +- Project [key#206, value#207]
      :     +- Filter (value#207 > val_9)
      :        +- InMemoryRelation [key#206, value#207], true, 5, StorageLevel(disk, memory, deserialized, 1 replicas), src
      :              +- HiveTableScan [key#0, value#1], HiveTableRelation `default`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#0, value#1]
   +- Aggregate [key#201], [key#201, min(value#202) AS min(value)#204, min(value#202) AS min(value#202)#209]
   +- InMemoryRelation [key#201, value#202], true, 5, StorageLevel(disk, memory, deserialized, 1 replicas), src
   +- HiveTableScan [key#0, value#1], HiveTableRelation `default`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#0, value#1]

Before PullupCorrelatedPredicates rule, the subquery in ListQuery looks like:

Project [key#206]
+- Filter ((value#207 > val_9) && (value#207 = outer(min(value)#204)))
   +- InMemoryRelation [key#206, value#207], true, 5, StorageLevel(disk, memory, deserialized, 1 replicas), src
         +- HiveTableScan [key#0, value#1], HiveTableRelation `default`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#0, value#1]

Currently the In predicate in top Filter is resolved.

After the rule, the subquery looks like:

Project [key#206, value#207]
+- Filter (value#207 > val_9)
   +- InMemoryRelation [key#206, value#207], true, 5, StorageLevel(disk, memory, deserialized, 1 replicas), src
         +- HiveTableScan [key#0, value#1], HiveTableRelation `default`.`src`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#0, value#1]

Notice key#206 has been added into Project and the condition value#207 = outer(min(value)#204) has been pulled out to top Filter:

 'Filter key#201 IN (list#200 [(value#207 = min(value)#204)])

Because In.checkInputDataTypes checks if the size of left columns (key#201) matches the size of subquery output (key#206, value#207), so it fails and returns false for In.resolved.

The unresolved Filter doesn't cause problem before because it will be converted to Join by RewritePredicateSubquery rule later. But it has been detected by this structural integrity check.

By modifying In.checkInputDataTypes, we can solve this issue. I'd submit another PR for it.

@viirya
Copy link
Member Author

viirya commented Aug 17, 2017

The PR going to fix the issue described in #18956 (comment) is submitted at #18968.

@viirya
Copy link
Member Author

viirya commented Aug 24, 2017

retest this please.

@viirya
Copy link
Member Author

viirya commented Aug 24, 2017

#18968 is merged. This should pass the tests now.

@SparkQA
Copy link

SparkQA commented Aug 24, 2017

Test build #81079 has finished for PR 18956 at commit c99011d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Aug 25, 2017

Seems there are other issues caused by RewritePredicateSubquery rule. I'll investigate and fix it.

@viirya
Copy link
Member Author

viirya commented Aug 25, 2017

RewritePredicateSubquery fails structural integrity check because it can produce Join with conflicting attributes in its left and right plans.

I submitted #19050 to fix it.

@viirya
Copy link
Member Author

viirya commented Sep 6, 2017

#19050 is merged now. Let's see if there still is any rule can fail this structural integrity check.

@viirya
Copy link
Member Author

viirya commented Sep 6, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Sep 6, 2017

Test build #81460 has finished for PR 18956 at commit c99011d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 7, 2017

Test build #81496 has finished for PR 18956 at commit e1e4aa1.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class NettyMemoryMetrics implements MetricSet
  • class ByteArrayConstructor extends net.razorvine.pickle.objects.ByteArrayConstructor
  • * (4) the main class for the child
  • public class JavaFeatureHasherExample
  • sealed trait LogisticRegressionTrainingSummary extends LogisticRegressionSummary
  • sealed trait BinaryLogisticRegressionSummary extends LogisticRegressionSummary

@viirya
Copy link
Member Author

viirya commented Sep 7, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Sep 7, 2017

Test build #81504 has finished for PR 18956 at commit e1e4aa1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class NettyMemoryMetrics implements MetricSet
  • class ByteArrayConstructor extends net.razorvine.pickle.objects.ByteArrayConstructor
  • * (4) the main class for the child
  • public class JavaFeatureHasherExample
  • sealed trait LogisticRegressionTrainingSummary extends LogisticRegressionSummary
  • sealed trait BinaryLogisticRegressionSummary extends LogisticRegressionSummary

@SparkQA
Copy link

SparkQA commented Sep 7, 2017

Test build #81518 has finished for PR 18956 at commit 959e315.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (!planChecker(result)) {
val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
"the structural integrity of the plan is broken."
throw new TreeNodeException(result, message, null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the exception throwing logics into the planChecker ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm. The message also has rule and batch names.

import org.apache.spark.sql.internal.SQLConf


class OptimizerSICheckerkSuite extends PlanTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> OptimizerStructuralIntegrityCheckerkSuite

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

* `Optimizer`, so we can catch rules that return invalid plans. The check function will returns
* `false` if the given plan doesn't pass the structural integrity check.
*/
protected def planChecker(plan: TreeType): Boolean = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

planChecker -> isPlanIntegral?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good.

@@ -64,6 +64,14 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
protected def batches: Seq[Batch]

/**
* Defines a check function which checks for structural integrity of the plan after the execution
* of each rule. For example, we can check whether a plan is still resolved after each rule in
* `Optimizer`, so we can catch rules that return invalid plans. The check function will returns
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will returns -> returns

@@ -64,6 +64,14 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
protected def batches: Seq[Batch]

/**
* Defines a check function which checks for structural integrity of the plan after the execution
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which -> that

@gatorsmile
Copy link
Member

LGTM except two minor comments

@SparkQA
Copy link

SparkQA commented Sep 8, 2017

Test build #81529 has finished for PR 18956 at commit d1db7cf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 8, 2017

Test build #81531 has finished for PR 18956 at commit ecdfb7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master.

@viirya
Copy link
Member Author

viirya commented Sep 8, 2017

Thanks @rxin @gatorsmile

@asfgit asfgit closed this in 6e37524 Sep 8, 2017
@viirya viirya deleted the SPARK-21726 branch December 27, 2023 18:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants