From 11fce17a3426b6e563e90e6564b4269d11a90ae2 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 23 May 2017 18:44:49 +0200 Subject: [PATCH] [SPARK-20857][SQL] Generic resolved hint node ## What changes were proposed in this pull request? This patch renames BroadcastHint to ResolvedHint (and Hint to UnresolvedHint) so the hint framework is more generic and would allow us to introduce other hint types in the future without introducing new hint nodes. ## How was this patch tested? Updated test cases. Author: Reynold Xin Closes #18072 from rxin/SPARK-20857. --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../sql/catalyst/analysis/ResolveHints.scala | 12 ++--- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../sql/catalyst/optimizer/expressions.scala | 2 +- .../sql/catalyst/parser/AstBuilder.scala | 4 +- .../sql/catalyst/planning/patterns.scala | 4 +- .../catalyst/plans/logical/Statistics.scala | 5 ++ .../plans/logical/basicLogicalOperators.scala | 22 +-------- .../sql/catalyst/plans/logical/hints.scala | 49 +++++++++++++++++++ .../catalyst/analysis/ResolveHintsSuite.scala | 41 +++++++++------- .../optimizer/ColumnPruningSuite.scala | 5 +- .../optimizer/FilterPushdownSuite.scala | 4 +- .../optimizer/JoinOptimizationSuite.scala | 4 +- .../sql/catalyst/parser/PlanParserSuite.scala | 15 +++--- .../BasicStatsEstimationSuite.scala | 2 +- .../scala/org/apache/spark/sql/Dataset.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../org/apache/spark/sql/functions.scala | 5 +- .../execution/joins/BroadcastJoinSuite.scala | 14 +++--- 20 files changed, 118 insertions(+), 80 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d58b8acefdade..d130962c63918 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1336,7 +1336,7 @@ class Analyzer( // Category 1: // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias - case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias => + case _: ResolvedHint | _: Distinct | _: LeafNode | _: Repartition | _: SubqueryAlias => // Category 2: // These operators can be anywhere in a correlated subquery. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index ea4560aac7259..2e3ac3e474866 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -399,7 +399,7 @@ trait CheckAnalysis extends PredicateHelper { |in operator ${operator.simpleString} """.stripMargin) - case _: Hint => + case _: UnresolvedHint => throw new IllegalStateException( "Internal error: logical hint operator should have been removed during analysis") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index df688fa0e58ae..9dfd84cbc9941 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -57,11 +57,11 @@ object ResolveHints { val newNode = CurrentOrigin.withOrigin(plan.origin) { plan match { case u: UnresolvedRelation if toBroadcast.exists(resolver(_, u.tableIdentifier.table)) => - BroadcastHint(plan) + ResolvedHint(plan, isBroadcastable = Option(true)) case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) => - BroadcastHint(plan) + ResolvedHint(plan, isBroadcastable = Option(true)) - case _: BroadcastHint | _: View | _: With | _: SubqueryAlias => + case _: ResolvedHint | _: View | _: With | _: SubqueryAlias => // Don't traverse down these nodes. // For an existing broadcast hint, there is no point going down (if we do, we either // won't change the structure, or will introduce another broadcast hint that is useless. @@ -85,10 +85,10 @@ object ResolveHints { } def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case h: Hint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => + case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) => if (h.parameters.isEmpty) { // If there is no table alias specified, turn the entire subtree into a BroadcastHint. - BroadcastHint(h.child) + ResolvedHint(h.child, isBroadcastable = Option(true)) } else { // Otherwise, find within the subtree query plans that should be broadcasted. applyBroadcastHint(h.child, h.parameters.toSet) @@ -102,7 +102,7 @@ object ResolveHints { */ object RemoveAllHints extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case h: Hint => h.child + case h: UnresolvedHint => h.child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1802cd4bb131b..ae2f6bfa94ae7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -862,7 +862,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // Note that some operators (e.g. project, aggregate, union) are being handled separately // (earlier in this rule). case _: AppendColumns => true - case _: BroadcastHint => true + case _: ResolvedHint => true case _: Distinct => true case _: Generate => true case _: Pivot => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index d3ef5ea840919..8931eb2c8f3b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -478,7 +478,7 @@ object FoldablePropagation extends Rule[LogicalPlan] { case _: Distinct => true case _: AppendColumns => true case _: AppendColumnsWithObject => true - case _: BroadcastHint => true + case _: ResolvedHint => true case _: RepartitionByExpression => true case _: Repartition => true case _: Sort => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f033fd4834c96..7d2e3a6fe7580 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -533,13 +533,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Add a [[Hint]] to a logical plan. + * Add a [[UnresolvedHint]] to a logical plan. */ private def withHints( ctx: HintContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { val stmt = ctx.hintStatement - Hint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query) + UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index d39b0ef7e1d8a..ef925f92ecc7e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -65,8 +65,8 @@ object PhysicalOperation extends PredicateHelper { val substitutedCondition = substitute(aliases)(condition) (fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases) - case BroadcastHint(child) => - collectProjectsAndFilters(child) + case h: ResolvedHint => + collectProjectsAndFilters(h.child) case other => (None, Nil, other, Map.empty) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala index 3d4efef953a64..81bb374cb0500 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala @@ -68,6 +68,11 @@ case class Statistics( s"isBroadcastable=$isBroadcastable" ).filter(_.nonEmpty).mkString(", ") } + + /** Must be called when computing stats for a join operator to reset hints. */ + def resetHintsForJoin(): Statistics = copy( + isBroadcastable = false + ) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index d291ca0020838..9f34b371740bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -364,7 +364,7 @@ case class Join( case _ => // Make sure we don't propagate isBroadcastable in other joins, because // they could explode the size. - super.computeStats(conf).copy(isBroadcastable = false) + super.computeStats(conf).resetHintsForJoin() } if (conf.cboEnabled) { @@ -375,26 +375,6 @@ case class Join( } } -/** - * A hint for the optimizer that we should broadcast the `child` if used in a join operator. - */ -case class BroadcastHint(child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output - - // set isBroadcastable to true so the child will be broadcasted - override def computeStats(conf: SQLConf): Statistics = - child.stats(conf).copy(isBroadcastable = true) -} - -/** - * A general hint for the child. This node will be eliminated post analysis. - * A pair of (name, parameters). - */ -case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) extends UnaryNode { - override lazy val resolved: Boolean = false - override def output: Seq[Attribute] = child.output -} - /** * Insert some data into a table. Note that this plan is unresolved and has to be replaced by the * concrete implementations during analysis. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala new file mode 100644 index 0000000000000..9bcbfbb4d1397 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.internal.SQLConf + +/** + * A general hint for the child that is not yet resolved. This node is generated by the parser and + * should be removed This node will be eliminated post analysis. + * A pair of (name, parameters). + */ +case class UnresolvedHint(name: String, parameters: Seq[String], child: LogicalPlan) + extends UnaryNode { + + override lazy val resolved: Boolean = false + override def output: Seq[Attribute] = child.output +} + +/** + * A resolved hint node. The analyzer should convert all [[UnresolvedHint]] into [[ResolvedHint]]. + */ +case class ResolvedHint( + child: LogicalPlan, + isBroadcastable: Option[Boolean] = None) + extends UnaryNode { + + override def output: Seq[Attribute] = child.output + + override def computeStats(conf: SQLConf): Statistics = { + val stats = child.stats(conf) + isBroadcastable.map(x => stats.copy(isBroadcastable = x)).getOrElse(stats) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala index d101e2227462d..bb914e11a139a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -28,68 +28,70 @@ class ResolveHintsSuite extends AnalysisTest { test("invalid hints should be ignored") { checkAnalysis( - Hint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")), + UnresolvedHint("some_random_hint_that_does_not_exist", Seq("TaBlE"), table("TaBlE")), testRelation, caseSensitive = false) } test("case-sensitive or insensitive parameters") { checkAnalysis( - Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), - BroadcastHint(testRelation), + UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), + ResolvedHint(testRelation, isBroadcastable = Option(true)), caseSensitive = false) checkAnalysis( - Hint("MAPJOIN", Seq("table"), table("TaBlE")), - BroadcastHint(testRelation), + UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")), + ResolvedHint(testRelation, isBroadcastable = Option(true)), caseSensitive = false) checkAnalysis( - Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), - BroadcastHint(testRelation), + UnresolvedHint("MAPJOIN", Seq("TaBlE"), table("TaBlE")), + ResolvedHint(testRelation, isBroadcastable = Option(true)), caseSensitive = true) checkAnalysis( - Hint("MAPJOIN", Seq("table"), table("TaBlE")), + UnresolvedHint("MAPJOIN", Seq("table"), table("TaBlE")), testRelation, caseSensitive = true) } test("multiple broadcast hint aliases") { checkAnalysis( - Hint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))), - Join(BroadcastHint(testRelation), BroadcastHint(testRelation2), Inner, None), + UnresolvedHint("MAPJOIN", Seq("table", "table2"), table("table").join(table("table2"))), + Join(ResolvedHint(testRelation, isBroadcastable = Option(true)), + ResolvedHint(testRelation2, isBroadcastable = Option(true)), Inner, None), caseSensitive = false) } test("do not traverse past existing broadcast hints") { checkAnalysis( - Hint("MAPJOIN", Seq("table"), BroadcastHint(table("table").where('a > 1))), - BroadcastHint(testRelation.where('a > 1)).analyze, + UnresolvedHint("MAPJOIN", Seq("table"), + ResolvedHint(table("table").where('a > 1), isBroadcastable = Option(true))), + ResolvedHint(testRelation.where('a > 1), isBroadcastable = Option(true)).analyze, caseSensitive = false) } test("should work for subqueries") { checkAnalysis( - Hint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")), - BroadcastHint(testRelation), + UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").as("tableAlias")), + ResolvedHint(testRelation, isBroadcastable = Option(true)), caseSensitive = false) checkAnalysis( - Hint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)), - BroadcastHint(testRelation), + UnresolvedHint("MAPJOIN", Seq("tableAlias"), table("table").subquery('tableAlias)), + ResolvedHint(testRelation, isBroadcastable = Option(true)), caseSensitive = false) // Negative case: if the alias doesn't match, don't match the original table name. checkAnalysis( - Hint("MAPJOIN", Seq("table"), table("table").as("tableAlias")), + UnresolvedHint("MAPJOIN", Seq("table"), table("table").as("tableAlias")), testRelation, caseSensitive = false) } test("do not traverse past subquery alias") { checkAnalysis( - Hint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)), + UnresolvedHint("MAPJOIN", Seq("table"), table("table").where('a > 1).subquery('tableAlias)), testRelation.where('a > 1).analyze, caseSensitive = false) } @@ -102,7 +104,8 @@ class ResolveHintsSuite extends AnalysisTest { |SELECT /*+ BROADCAST(ctetable) */ * FROM ctetable """.stripMargin ), - BroadcastHint(testRelation.where('a > 1).select('a)).select('a).analyze, + ResolvedHint(testRelation.where('a > 1).select('a), isBroadcastable = Option(true)) + .select('a).analyze, caseSensitive = false) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 589607e3ad5cb..a0a0daea7d075 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -321,15 +321,14 @@ class ColumnPruningSuite extends PlanTest { Project(Seq($"x.key", $"y.key"), Join( SubqueryAlias("x", input), - BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze + ResolvedHint(SubqueryAlias("y", input)), Inner, None)).analyze val optimized = Optimize.execute(query) val expected = Join( Project(Seq($"x.key"), SubqueryAlias("x", input)), - BroadcastHint( - Project(Seq($"y.key"), SubqueryAlias("y", input))), + ResolvedHint(Project(Seq($"y.key"), SubqueryAlias("y", input))), Inner, None).analyze comparePlans(optimized, expected) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 950aa2379517e..d4d281e7e05db 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -798,12 +798,12 @@ class FilterPushdownSuite extends PlanTest { } test("broadcast hint") { - val originalQuery = BroadcastHint(testRelation) + val originalQuery = ResolvedHint(testRelation) .where('a === 2L && 'b + Rand(10).as("rnd") === 3) val optimized = Optimize.execute(originalQuery.analyze) - val correctAnswer = BroadcastHint(testRelation.where('a === 2L)) + val correctAnswer = ResolvedHint(testRelation.where('a === 2L)) .where('b + Rand(10).as("rnd") === 3) .analyze diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index a43d78c7bd447..105407d43bf39 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -129,14 +129,14 @@ class JoinOptimizationSuite extends PlanTest { Project(Seq($"x.key", $"y.key"), Join( SubqueryAlias("x", input), - BroadcastHint(SubqueryAlias("y", input)), Cross, None)).analyze + ResolvedHint(SubqueryAlias("y", input)), Cross, None)).analyze val optimized = Optimize.execute(query) val expected = Join( Project(Seq($"x.key"), SubqueryAlias("x", input)), - BroadcastHint(Project(Seq($"y.key"), SubqueryAlias("y", input))), + ResolvedHint(Project(Seq($"y.key"), SubqueryAlias("y", input))), Cross, None).analyze comparePlans(optimized, expected) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index d78741d032f38..134e761460881 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -534,30 +534,31 @@ class PlanParserSuite extends PlanTest { comparePlans( parsePlan("SELECT /*+ HINT */ * FROM t"), - Hint("HINT", Seq.empty, table("t").select(star()))) + UnresolvedHint("HINT", Seq.empty, table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ BROADCASTJOIN(u) */ * FROM t"), - Hint("BROADCASTJOIN", Seq("u"), table("t").select(star()))) + UnresolvedHint("BROADCASTJOIN", Seq("u"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ MAPJOIN(u) */ * FROM t"), - Hint("MAPJOIN", Seq("u"), table("t").select(star()))) + UnresolvedHint("MAPJOIN", Seq("u"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ STREAMTABLE(a,b,c) */ * FROM t"), - Hint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star()))) + UnresolvedHint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ INDEX(t, emp_job_ix) */ * FROM t"), - Hint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star()))) + UnresolvedHint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star()))) comparePlans( parsePlan("SELECT /*+ MAPJOIN(`default.t`) */ * from `default.t`"), - Hint("MAPJOIN", Seq("default.t"), table("default.t").select(star()))) + UnresolvedHint("MAPJOIN", Seq("default.t"), table("default.t").select(star()))) comparePlans( parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order by a"), - Hint("MAPJOIN", Seq("t"), table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc)) + UnresolvedHint("MAPJOIN", Seq("t"), + table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala index b06871f96f0d8..81b91e63b8f67 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala @@ -45,7 +45,7 @@ class BasicStatsEstimationSuite extends StatsEstimationTestBase { expectedStatsCboOn = filterStatsCboOn, expectedStatsCboOff = filterStatsCboOff) - val broadcastHint = BroadcastHint(filter) + val broadcastHint = ResolvedHint(filter, isBroadcastable = Option(true)) checkStats( broadcastHint, expectedStatsCboOn = filterStatsCboOn.copy(isBroadcastable = true), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 53773f18ce553..cbab029b87b2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1174,7 +1174,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan { - Hint(name, parameters, logicalPlan) + UnresolvedHint(name, parameters, logicalPlan) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 73541c22c6308..5981b49da277e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -433,7 +433,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil case r: LogicalRDD => RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil - case BroadcastHint(child) => planLater(child) :: Nil + case h: ResolvedHint => planLater(h.child) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 5edf03666ac22..563eae0b6483f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint +import org.apache.spark.sql.catalyst.plans.logical.ResolvedHint import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.internal.SQLConf @@ -1019,7 +1019,8 @@ object functions { * @since 1.5.0 */ def broadcast[T](df: Dataset[T]): Dataset[T] = { - Dataset[T](df.sparkSession, BroadcastHint(df.logicalPlan))(df.exprEnc) + Dataset[T](df.sparkSession, + ResolvedHint(df.logicalPlan, isBroadcastable = Option(true)))(df.exprEnc) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala index 26c45e092dc65..afb8ced53e25c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala @@ -157,7 +157,7 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { } test("broadcast hint in SQL") { - import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, Join} + import org.apache.spark.sql.catalyst.plans.logical.{ResolvedHint, Join} spark.range(10).createOrReplaceTempView("t") spark.range(10).createOrReplaceTempView("u") @@ -170,12 +170,12 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { val plan3 = sql(s"SELECT /*+ $name(v) */ * FROM t JOIN u ON t.id = u.id").queryExecution .optimizedPlan - assert(plan1.asInstanceOf[Join].left.isInstanceOf[BroadcastHint]) - assert(!plan1.asInstanceOf[Join].right.isInstanceOf[BroadcastHint]) - assert(!plan2.asInstanceOf[Join].left.isInstanceOf[BroadcastHint]) - assert(plan2.asInstanceOf[Join].right.isInstanceOf[BroadcastHint]) - assert(!plan3.asInstanceOf[Join].left.isInstanceOf[BroadcastHint]) - assert(!plan3.asInstanceOf[Join].right.isInstanceOf[BroadcastHint]) + assert(plan1.asInstanceOf[Join].left.isInstanceOf[ResolvedHint]) + assert(!plan1.asInstanceOf[Join].right.isInstanceOf[ResolvedHint]) + assert(!plan2.asInstanceOf[Join].left.isInstanceOf[ResolvedHint]) + assert(plan2.asInstanceOf[Join].right.isInstanceOf[ResolvedHint]) + assert(!plan3.asInstanceOf[Join].left.isInstanceOf[ResolvedHint]) + assert(!plan3.asInstanceOf[Join].right.isInstanceOf[ResolvedHint]) } }