Skip to content

Commit

Permalink
[SPARK-20854][SQL] Extend hint syntax to support expressions
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

SQL hint syntax:
* support expressions such as strings, numbers, etc. instead of only identifiers as it is currently.
* support multiple hints, which was missing compared to the DataFrame syntax.

DataFrame API:
* support any parameters in DataFrame.hint instead of just strings

## How was this patch tested?
Existing tests. New tests in PlanParserSuite. New suite DataFrameHintSuite.

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #18086 from bogdanrdc/SPARK-20854.
  • Loading branch information
bogdanrdc authored and cloud-fan committed Jun 1, 2017
1 parent 8efc6e9 commit 2134196
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ querySpecification
(RECORDREADER recordReader=STRING)?
fromClause?
(WHERE where=booleanExpression)?)
| ((kind=SELECT hint? setQuantifier? namedExpressionSeq fromClause?
| ((kind=SELECT (hints+=hint)* setQuantifier? namedExpressionSeq fromClause?
| fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?)
lateralView*
(WHERE where=booleanExpression)?
Expand All @@ -381,12 +381,12 @@ querySpecification
;

hint
: '/*+' hintStatement '*/'
: '/*+' hintStatements+=hintStatement (','? hintStatements+=hintStatement)* '*/'
;

hintStatement
: hintName=identifier
| hintName=identifier '(' parameters+=identifier (',' parameters+=identifier)* ')'
| hintName=identifier '(' parameters+=primaryExpression (',' parameters+=primaryExpression)* ')'
;

fromClause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis

import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
Expand Down Expand Up @@ -91,7 +92,12 @@ object ResolveHints {
ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true)))
} else {
// Otherwise, find within the subtree query plans that should be broadcasted.
applyBroadcastHint(h.child, h.parameters.toSet)
applyBroadcastHint(h.child, h.parameters.map {
case tableName: String => tableName
case tableId: UnresolvedAttribute => tableId.name
case unsupported => throw new AnalysisException("Broadcast hint parameter should be " +
s"an identifier or string but was $unsupported (${unsupported.getClass}")
}.toSet)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,9 @@ package object dsl {

def analyze: LogicalPlan =
EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan))

def hint(name: String, parameters: Any*): LogicalPlan =
UnresolvedHint(name, parameters, logicalPlan)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
val withWindow = withDistinct.optionalMap(windows)(withWindows)

// Hint
withWindow.optionalMap(hint)(withHints)
hints.asScala.foldRight(withWindow)(withHints)
}
}

Expand Down Expand Up @@ -533,13 +533,16 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}

/**
* Add a [[UnresolvedHint]] to a logical plan.
* Add [[UnresolvedHint]]s to a logical plan.
*/
private def withHints(
ctx: HintContext,
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
val stmt = ctx.hintStatement
UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
var plan = query
ctx.hintStatements.asScala.reverse.foreach { case stmt =>
plan = UnresolvedHint(stmt.hintName.getText, stmt.parameters.asScala.map(expression), plan)
}
plan
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import org.apache.spark.sql.internal.SQLConf
/**
* A general hint for the child that is not yet resolved. This node is generated by the parser and
* should be removed This node will be eliminated post analysis.
* A pair of (name, parameters).
* @param name the name of the hint
* @param parameters the parameters of the hint
* @param child the [[LogicalPlan]] on which this hint applies
*/
case class UnresolvedHint(name: String, parameters: Seq[String], child: LogicalPlan)
case class UnresolvedHint(name: String, parameters: Seq[Any], child: LogicalPlan)
extends UnaryNode {

override lazy val resolved: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.analysis.AnalysisTest
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._

class DSLHintSuite extends AnalysisTest {
lazy val a = 'a.int
lazy val b = 'b.string
lazy val c = 'c.string
lazy val r1 = LocalRelation(a, b, c)

test("various hint parameters") {
comparePlans(
r1.hint("hint1"),
UnresolvedHint("hint1", Seq(), r1)
)

comparePlans(
r1.hint("hint1", 1, "a"),
UnresolvedHint("hint1", Seq(1, "a"), r1)
)

comparePlans(
r1.hint("hint1", 1, $"a"),
UnresolvedHint("hint1", Seq(1, $"a"), r1)
)

comparePlans(
r1.hint("hint1", Seq(1, 2, 3), Seq($"a", $"b", $"c")),
UnresolvedHint("hint1", Seq(Seq(1, 2, 3), Seq($"a", $"b", $"c")), r1)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.parser

import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedTableValuedFunction}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedGenerator, UnresolvedInlineTable, UnresolvedRelation, UnresolvedTableValuedFunction}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -527,47 +527,117 @@ class PlanParserSuite extends PlanTest {
val m = intercept[ParseException] {
parsePlan("SELECT /*+ HINT() */ * FROM t")
}.getMessage
assert(m.contains("no viable alternative at input"))

// Hive compatibility: No database.
val m2 = intercept[ParseException] {
parsePlan("SELECT /*+ MAPJOIN(default.t) */ * from default.t")
}.getMessage
assert(m2.contains("mismatched input '.' expecting {')', ','}"))
assert(m.contains("mismatched input"))

// Disallow space as the delimiter.
val m3 = intercept[ParseException] {
parsePlan("SELECT /*+ INDEX(a b c) */ * from default.t")
}.getMessage
assert(m3.contains("mismatched input 'b' expecting {')', ','}"))
assert(m3.contains("mismatched input 'b' expecting"))

comparePlans(
parsePlan("SELECT /*+ HINT */ * FROM t"),
UnresolvedHint("HINT", Seq.empty, table("t").select(star())))

comparePlans(
parsePlan("SELECT /*+ BROADCASTJOIN(u) */ * FROM t"),
UnresolvedHint("BROADCASTJOIN", Seq("u"), table("t").select(star())))
UnresolvedHint("BROADCASTJOIN", Seq($"u"), table("t").select(star())))

comparePlans(
parsePlan("SELECT /*+ MAPJOIN(u) */ * FROM t"),
UnresolvedHint("MAPJOIN", Seq("u"), table("t").select(star())))
UnresolvedHint("MAPJOIN", Seq($"u"), table("t").select(star())))

comparePlans(
parsePlan("SELECT /*+ STREAMTABLE(a,b,c) */ * FROM t"),
UnresolvedHint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star())))
UnresolvedHint("STREAMTABLE", Seq($"a", $"b", $"c"), table("t").select(star())))

comparePlans(
parsePlan("SELECT /*+ INDEX(t, emp_job_ix) */ * FROM t"),
UnresolvedHint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star())))
UnresolvedHint("INDEX", Seq($"t", $"emp_job_ix"), table("t").select(star())))

comparePlans(
parsePlan("SELECT /*+ MAPJOIN(`default.t`) */ * from `default.t`"),
UnresolvedHint("MAPJOIN", Seq("default.t"), table("default.t").select(star())))
UnresolvedHint("MAPJOIN", Seq(UnresolvedAttribute.quoted("default.t")),
table("default.t").select(star())))

comparePlans(
parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order by a"),
UnresolvedHint("MAPJOIN", Seq("t"),
UnresolvedHint("MAPJOIN", Seq($"t"),
table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc))
}

test("SPARK-20854: select hint syntax with expressions") {
comparePlans(
parsePlan("SELECT /*+ HINT1(a, array(1, 2, 3)) */ * from t"),
UnresolvedHint("HINT1", Seq($"a",
UnresolvedFunction("array", Literal(1) :: Literal(2) :: Literal(3) :: Nil, false)),
table("t").select(star())
)
)

comparePlans(
parsePlan("SELECT /*+ HINT1(a, array(1, 2, 3)) */ * from t"),
UnresolvedHint("HINT1", Seq($"a",
UnresolvedFunction("array", Literal(1) :: Literal(2) :: Literal(3) :: Nil, false)),
table("t").select(star())
)
)

comparePlans(
parsePlan("SELECT /*+ HINT1(a, 5, 'a', b) */ * from t"),
UnresolvedHint("HINT1", Seq($"a", Literal(5), Literal("a"), $"b"),
table("t").select(star())
)
)

comparePlans(
parsePlan("SELECT /*+ HINT1('a', (b, c), (1, 2)) */ * from t"),
UnresolvedHint("HINT1",
Seq(Literal("a"),
CreateStruct($"b" :: $"c" :: Nil),
CreateStruct(Literal(1) :: Literal(2) :: Nil)),
table("t").select(star())
)
)
}

test("SPARK-20854: multiple hints") {
comparePlans(
parsePlan("SELECT /*+ HINT1(a, 1) hint2(b, 2) */ * from t"),
UnresolvedHint("HINT1", Seq($"a", Literal(1)),
UnresolvedHint("hint2", Seq($"b", Literal(2)),
table("t").select(star())
)
)
)

comparePlans(
parsePlan("SELECT /*+ HINT1(a, 1),hint2(b, 2) */ * from t"),
UnresolvedHint("HINT1", Seq($"a", Literal(1)),
UnresolvedHint("hint2", Seq($"b", Literal(2)),
table("t").select(star())
)
)
)

comparePlans(
parsePlan("SELECT /*+ HINT1(a, 1) */ /*+ hint2(b, 2) */ * from t"),
UnresolvedHint("HINT1", Seq($"a", Literal(1)),
UnresolvedHint("hint2", Seq($"b", Literal(2)),
table("t").select(star())
)
)
)

comparePlans(
parsePlan("SELECT /*+ HINT1(a, 1), hint2(b, 2) */ /*+ hint3(c, 3) */ * from t"),
UnresolvedHint("HINT1", Seq($"a", Literal(1)),
UnresolvedHint("hint2", Seq($"b", Literal(2)),
UnresolvedHint("hint3", Seq($"c", Literal(3)),
table("t").select(star())
)
)
)
)
}
}
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ class Dataset[T] private[sql](
* @since 2.2.0
*/
@scala.annotation.varargs
def hint(name: String, parameters: String*): Dataset[T] = withTypedPlan {
def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan {
UnresolvedHint(name, parameters, logicalPlan)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.test.SharedSQLContext

class DataFrameHintSuite extends PlanTest with SharedSQLContext {
import testImplicits._
lazy val df = spark.range(10)

private def check(df: Dataset[_], expected: LogicalPlan) = {
comparePlans(
df.queryExecution.logical,
expected
)
}

test("various hint parameters") {
check(
df.hint("hint1"),
UnresolvedHint("hint1", Seq(),
df.logicalPlan
)
)

check(
df.hint("hint1", 1, "a"),
UnresolvedHint("hint1", Seq(1, "a"), df.logicalPlan)
)

check(
df.hint("hint1", 1, $"a"),
UnresolvedHint("hint1", Seq(1, $"a"),
df.logicalPlan
)
)

check(
df.hint("hint1", Seq(1, 2, 3), Seq($"a", $"b", $"c")),
UnresolvedHint("hint1", Seq(Seq(1, 2, 3), Seq($"a", $"b", $"c")),
df.logicalPlan
)
)
}
}

0 comments on commit 2134196

Please sign in to comment.