Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2094][SQL] "Exactly once" semantics for DDL and command statements #1071

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
import org.apache.spark.sql.catalyst.types.StringType

/**
Expand All @@ -26,35 +26,37 @@ import org.apache.spark.sql.catalyst.types.StringType
*/
abstract class Command extends LeafNode {
self: Product =>
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
def output: Seq[Attribute] = Seq.empty
}

/**
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
* commands that are passed directly to another system.
*/
case class NativeCommand(cmd: String) extends Command
case class NativeCommand(cmd: String) extends Command {
override def output =
Seq(BoundReference(0, AttributeReference("result", StringType, nullable = false)()))
}

/**
* Commands of the form "SET (key) (= value)".
*/
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
override def output = Seq(
AttributeReference("key", StringType, nullable = false)(),
AttributeReference("value", StringType, nullable = false)()
)
BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
}

/**
* Returned by a parser when the users only wants to see what query plan would be executed, without
* actually performing the execution.
*/
case class ExplainCommand(plan: LogicalPlan) extends Command {
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
override def output =
Seq(BoundReference(0, AttributeReference("plan", StringType, nullable = false)()))
}

/**
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command

Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class FilterPushdownSuite extends OptimizerTest {

comparePlans(optimized, correctAnswer)
}

test("joins: push down left outer join #1") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
Expand Down
36 changes: 4 additions & 32 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor

import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
Expand Down Expand Up @@ -147,14 +147,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @group userf
*/
def sql(sqlText: String): SchemaRDD = {
val result = new SchemaRDD(this, parseSql(sqlText))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))

/** Returns the specified table as a SchemaRDD */
def table(tableName: String): SchemaRDD =
Expand Down Expand Up @@ -280,35 +273,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected abstract class QueryExecution {
def logical: LogicalPlan

def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
case SetCommand(key, value) =>
// Only this case needs to be executed eagerly. The other cases will
// be taken care of when the actual results are being extracted.
// In the case of HiveContext, sqlConf is overridden to also pass the
// pair into its HiveConf.
if (key.isDefined && value.isDefined) {
set(key.get, value.get)
}
// It doesn't matter what we return here, since this is only used
// to force the evaluation to happen eagerly. To query the results,
// one must use SchemaRDD operations to extract them.
emptyResult
case _ => executedPlan.execute()
}

lazy val analyzed = analyzer(logical)
lazy val optimizedPlan = optimizer(analyzed)
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next()
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = {
logical match {
case s: SetCommand => eagerlyProcess(s)
case _ => executedPlan.execute()
}
}
lazy val toRdd: RDD[Row] = executedPlan.execute()

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
Expand All @@ -330,7 +302,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* TODO: We only support primitive types, add support for nested types.
*/
private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
val schema = rdd.first.map { case (fieldName, obj) =>
val schema = rdd.first().map { case (fieldName, obj) =>
val dataType = obj.getClass match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ private[sql] trait SchemaRDDLike {
@DeveloperApi
lazy val queryExecution = sqlContext.executePlan(logicalPlan)

logicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
queryExecution.toRdd
case _ =>
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realized that many SchemaRDD actions other than collect() and DSL methods reuses logicalPlan and breaks the "exactly once" constraints when planning the local plan (new physical plan node for DDL/command statements are created, causing the side effect taking place again).

So I replaced logicalPlan with the executed physical plan wrapped with a SparkLogicalPlan to prevent multiple physical plan instantiations for the same DDL/command statement.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This is probably the problem I was seeing with double "UNCACHE TABLE".


override def toString =
s"""${super.toString}
|== Query Plan ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,69 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}

trait PhysicalCommand {
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
* can be used as the contents of the corresponding RDD generated from the physical plan of this
* command.
*
* The `execute()` method of all the physical command classes should reference `sideEffect` so
* that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, maybe we could have this return "OK" like hive does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. And a little off topic, why is SQLContext.emptyResult an RDD[Row] with a single empty row instead of simply an empty RDD[Row]?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for following the same-name convention

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus Checked Hive 0.12 source code and confirmed that the "OK" is actually a log rather than the result. Thus essentially we can't distinguish a command (e.g. CREATE TABLE) with no result and a command that should return a result but the result happens to be empty (e.g. SHOW TABLES when there's no table). For the latter case, if we put an "OK" there, users (or more likely third-party programs) may mistake the "OK" string for part of returned result. So I don't think we should put an "OK" here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, fine to not return OK.

Regarding emptyResult that is a bad name, but it is used for things like SELECT 1. So, if we fix it to actually be empty we need to create a singleEmptyRow RDD for use in those cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some thought I think it's not only about naming, the semantics is wrong: en RDD[Row] with an empty row indicates that the schema of the result has no fields, while an empty RDD[Row] can fit schemas with any number of fields. And, for SELECT 1, shouldn't it be an RDD[Row] with a single row containing a 1?

There does exist a singleRowRdd in SparkStrategies, used to translate logical.NoRelation.

Currently emptyResult is only used in execution.CacheCommand and execution.NativeCommand. So I'll just fix emptyResult definition and see if all test suites pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some thought I think it's not only about naming, the semantics is wrong: en RDD[Row] with an empty row indicates that the schema of the result has no fields, while an empty RDD[Row] can be fit schemas with any number of fields. And, for SELECT 1, shouldn't it be an RDD[Row] with a single row containing a 1?

We use a Project to build the actual result.

However, looks like I already did the separation... so that plan sounds good to me :)

}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute])
(@transient context: SQLContext) extends LeafNode {
def execute(): RDD[Row] = (key, value) match {
// Set value for key k; the action itself would
// have been performed in QueryExecution eagerly.
case (Some(k), Some(v)) => context.emptyResult
case class SetCommandPhysical(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general we have not been naming our physical operators differently, but have been relying on the package to differentiate as this is less redundant. (i.e. logical.Project vs. execution.Project)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, just followed @concretevitamin's naming style, would love to rename all these physical commands.

key: Option[String], value: Option[String], output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with PhysicalCommand {

override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
context.set(k, v)
Array.empty[(String, String)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we decided to not echo the newly set key-val pair?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just followed the original logic here, but I do agree that echo the pair would be more user friendly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Just confirmed that Hive 0.12 doesn't return anything in this case (even an "OK" string), so I'd prefer to left it as is to mimic Hive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are already differing from Hive in the behavior of "SET". I don't see a reason to stick to hive semantics in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, made the command returns the newly set pair.


// Query the value bound to key k.
case (Some(k), None) =>
val resultString = context.getOption(k) match {
case Some(v) => s"$k=$v"
case None => s"$k is undefined"
}
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1)
case (Some(k), _) =>
Array(k -> context.getOption(k).getOrElse("<undefined>"))

// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
val pairs = context.getAll
val rows = pairs.map { case (k, v) =>
new GenericRow(Array[Any](s"$k=$v"))
}.toSeq
// Assume config parameters can fit into one split (machine) ;)
context.sparkContext.parallelize(rows, 1)
// The only other case is invalid semantics and is impossible.
case _ => context.emptyResult
context.getAll

case _ =>
throw new IllegalArgumentException()
}

def execute(): RDD[Row] = {
val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) }
context.sparkContext.parallelize(rows, 1)
}

override def otherCopyArgs = context :: Nil
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
(@transient context: SQLContext) extends UnaryNode {
case class ExplainCommandPhysical(
child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends UnaryNode with PhysicalCommand {

// Actually "EXPLAIN" command doesn't cause any side effect.
override protected[sql] lazy val sideEffectResult: Seq[String] = this.toString.split("\n")

def execute(): RDD[Row] = {
val planString = new GenericRow(Array[Any](child.toString))
context.sparkContext.parallelize(Seq(planString))
val explanation = sideEffectResult.mkString("\n")
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](explanation))), 1)
}

override def otherCopyArgs = context :: Nil
Expand All @@ -71,18 +95,19 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
*/
@DeveloperApi
case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
extends LeafNode {
extends LeafNode with PhysicalCommand {

lazy val commandSideEffect = {
override protected[sql] lazy val sideEffectResult = {
if (doCache) {
context.cacheTable(tableName)
} else {
context.uncacheTable(tableName)
}
Seq.empty[Any]
}

override def execute(): RDD[Row] = {
commandSideEffect
sideEffectResult
context.emptyResult
}

Expand Down
16 changes: 8 additions & 8 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class SQLQuerySuite extends QueryTest {
sql("SELECT AVG(a),b FROM largeAndSmallInts group by b"),
Seq((2147483645.0,1),(2.0,2)))
}

test("count") {
checkAnswer(
sql("SELECT COUNT(*) FROM testData2"),
Expand Down Expand Up @@ -332,7 +332,7 @@ class SQLQuerySuite extends QueryTest {
(3, "C"),
(4, "D")))
}

test("system function upper()") {
checkAnswer(
sql("SELECT n,UPPER(l) FROM lowerCaseData"),
Expand All @@ -349,7 +349,7 @@ class SQLQuerySuite extends QueryTest {
(2, "ABC"),
(3, null)))
}

test("system function lower()") {
checkAnswer(
sql("SELECT N,LOWER(L) FROM upperCaseData"),
Expand Down Expand Up @@ -382,25 +382,25 @@ class SQLQuerySuite extends QueryTest {
sql(s"SET $testKey=$testVal")
checkAnswer(
sql("SET"),
Seq(Seq(s"$testKey=$testVal"))
Seq(Seq(testKey, testVal))
)

sql(s"SET ${testKey + testKey}=${testVal + testVal}")
checkAnswer(
sql("set"),
Seq(
Seq(s"$testKey=$testVal"),
Seq(s"${testKey + testKey}=${testVal + testVal}"))
Seq(testKey, testVal),
Seq(testKey + testKey, testVal + testVal))
)

// "set key"
checkAnswer(
sql(s"SET $testKey"),
Seq(Seq(s"$testKey=$testVal"))
Seq(Seq(testKey, testVal))
)
checkAnswer(
sql(s"SET $nonexistentKey"),
Seq(Seq(s"$nonexistentKey is undefined"))
Seq(Seq(nonexistentKey, "<undefined>"))
)
clear()
}
Expand Down
Loading