Skip to content

Commit

Permalink
Added physical plan for DDL and commands to ensure the "exactly once"…
Browse files Browse the repository at this point in the history
… semantics
  • Loading branch information
liancheng committed Jun 12, 2014
1 parent fe78b8b commit 0ad343a
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BoundReference}
import org.apache.spark.sql.catalyst.types.StringType

/**
Expand All @@ -26,35 +26,37 @@ import org.apache.spark.sql.catalyst.types.StringType
*/
abstract class Command extends LeafNode {
self: Product =>
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
def output: Seq[Attribute] = Seq.empty
}

/**
* Returned for commands supported by a given parser, but not catalyst. In general these are DDL
* commands that are passed directly to another system.
*/
case class NativeCommand(cmd: String) extends Command
case class NativeCommand(cmd: String) extends Command {
override def output =
Seq(BoundReference(0, AttributeReference("result", StringType, nullable = false)()))
}

/**
* Commands of the form "SET (key) (= value)".
*/
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
override def output = Seq(
AttributeReference("key", StringType, nullable = false)(),
AttributeReference("value", StringType, nullable = false)()
)
BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
}

/**
* Returned by a parser when the users only wants to see what query plan would be executed, without
* actually performing the execution.
*/
case class ExplainCommand(plan: LogicalPlan) extends Command {
override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
override def output =
Seq(BoundReference(0, AttributeReference("plan", StringType, nullable = false)()))
}

/**
* Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
*/
case class CacheCommand(tableName: String, doCache: Boolean) extends Command

Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class FilterPushdownSuite extends OptimizerTest {

comparePlans(optimized, correctAnswer)
}

test("joins: push down left outer join #1") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
Expand Down
36 changes: 4 additions & 32 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor

import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
Expand Down Expand Up @@ -147,14 +147,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @group userf
*/
def sql(sqlText: String): SchemaRDD = {
val result = new SchemaRDD(this, parseSql(sqlText))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}
def sql(sqlText: String): SchemaRDD = new SchemaRDD(this, parseSql(sqlText))

/** Returns the specified table as a SchemaRDD */
def table(tableName: String): SchemaRDD =
Expand Down Expand Up @@ -280,35 +273,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected abstract class QueryExecution {
def logical: LogicalPlan

def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
case SetCommand(key, value) =>
// Only this case needs to be executed eagerly. The other cases will
// be taken care of when the actual results are being extracted.
// In the case of HiveContext, sqlConf is overridden to also pass the
// pair into its HiveConf.
if (key.isDefined && value.isDefined) {
set(key.get, value.get)
}
// It doesn't matter what we return here, since this is only used
// to force the evaluation to happen eagerly. To query the results,
// one must use SchemaRDD operations to extract them.
emptyResult
case _ => executedPlan.execute()
}

lazy val analyzed = analyzer(logical)
lazy val optimizedPlan = optimizer(analyzed)
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next()
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = {
logical match {
case s: SetCommand => eagerlyProcess(s)
case _ => executedPlan.execute()
}
}
lazy val toRdd: RDD[Row] = executedPlan.execute()

protected def stringOrError[A](f: => A): String =
try f.toString catch { case e: Throwable => e.toString }
Expand All @@ -330,7 +302,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* TODO: We only support primitive types, add support for nested types.
*/
private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
val schema = rdd.first.map { case (fieldName, obj) =>
val schema = rdd.first().map { case (fieldName, obj) =>
val dataType = obj.getClass match {
case c: Class[_] if c == classOf[java.lang.String] => StringType
case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ private[sql] trait SchemaRDDLike {
@DeveloperApi
lazy val queryExecution = sqlContext.executePlan(logicalPlan)

logicalPlan match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
queryExecution.toRdd
case _ =>
}

override def toString =
s"""${super.toString}
|== Query Plan ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,45 +22,69 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}

trait PhysicalCommand {
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
* can be used as the contents of the corresponding RDD generated from the physical plan of this
* command.
*
* The `execute()` method of all the physical command classes should reference `sideEffect` so
* that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute])
(@transient context: SQLContext) extends LeafNode {
def execute(): RDD[Row] = (key, value) match {
// Set value for key k; the action itself would
// have been performed in QueryExecution eagerly.
case (Some(k), Some(v)) => context.emptyResult
case class SetCommandPhysical(
key: Option[String], value: Option[String], output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with PhysicalCommand {

override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
context.set(k, v)
Array.empty[(String, String)]

// Query the value bound to key k.
case (Some(k), None) =>
val resultString = context.getOption(k) match {
case Some(v) => s"$k=$v"
case None => s"$k is undefined"
}
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1)
case (Some(k), _) =>
Array(k -> context.getOption(k).getOrElse("<undefined>"))

// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
val pairs = context.getAll
val rows = pairs.map { case (k, v) =>
new GenericRow(Array[Any](s"$k=$v"))
}.toSeq
// Assume config parameters can fit into one split (machine) ;)
context.sparkContext.parallelize(rows, 1)
// The only other case is invalid semantics and is impossible.
case _ => context.emptyResult
context.getAll

case _ =>
throw new IllegalArgumentException()
}

def execute(): RDD[Row] = {
val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) }
context.sparkContext.parallelize(rows, 1)
}

override def otherCopyArgs = context :: Nil
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
(@transient context: SQLContext) extends UnaryNode {
case class ExplainCommandPhysical(
child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends UnaryNode with PhysicalCommand {

// Actually "EXPLAIN" command doesn't cause any side effect.
override protected[sql] lazy val sideEffectResult: Seq[String] = child.toString.split("\n")

def execute(): RDD[Row] = {
val planString = new GenericRow(Array[Any](child.toString))
context.sparkContext.parallelize(Seq(planString))
val explanation = sideEffectResult.mkString("\n")
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](explanation))), 1)
}

override def otherCopyArgs = context :: Nil
Expand All @@ -71,18 +95,19 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
*/
@DeveloperApi
case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
extends LeafNode {
extends LeafNode with PhysicalCommand {

lazy val commandSideEffect = {
override protected[sql] lazy val sideEffectResult = {
if (doCache) {
context.cacheTable(tableName)
} else {
context.uncacheTable(tableName)
}
Seq.empty[Any]
}

override def execute(): RDD[Row] = {
commandSideEffect
sideEffectResult
context.emptyResult
}

Expand Down
16 changes: 8 additions & 8 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class SQLQuerySuite extends QueryTest {
sql("SELECT AVG(a),b FROM largeAndSmallInts group by b"),
Seq((2147483645.0,1),(2.0,2)))
}

test("count") {
checkAnswer(
sql("SELECT COUNT(*) FROM testData2"),
Expand Down Expand Up @@ -332,7 +332,7 @@ class SQLQuerySuite extends QueryTest {
(3, "C"),
(4, "D")))
}

test("system function upper()") {
checkAnswer(
sql("SELECT n,UPPER(l) FROM lowerCaseData"),
Expand All @@ -349,7 +349,7 @@ class SQLQuerySuite extends QueryTest {
(2, "ABC"),
(3, null)))
}

test("system function lower()") {
checkAnswer(
sql("SELECT N,LOWER(L) FROM upperCaseData"),
Expand Down Expand Up @@ -382,25 +382,25 @@ class SQLQuerySuite extends QueryTest {
sql(s"SET $testKey=$testVal")
checkAnswer(
sql("SET"),
Seq(Seq(s"$testKey=$testVal"))
Seq(Seq(testKey, testVal))
)

sql(s"SET ${testKey + testKey}=${testVal + testVal}")
checkAnswer(
sql("set"),
Seq(
Seq(s"$testKey=$testVal"),
Seq(s"${testKey + testKey}=${testVal + testVal}"))
Seq(testKey, testVal),
Seq(testKey + testKey, testVal + testVal))
)

// "set key"
checkAnswer(
sql(s"SET $testKey"),
Seq(Seq(s"$testKey=$testVal"))
Seq(Seq(testKey, testVal))
)
checkAnswer(
sql(s"SET $nonexistentKey"),
Seq(Seq(s"$nonexistentKey is undefined"))
Seq(Seq(nonexistentKey, "<undefined>"))
)
clear()
}
Expand Down
Loading

0 comments on commit 0ad343a

Please sign in to comment.