Skip to content

Commit

Permalink
Renamed physical plan classes for DDL/commands
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jun 12, 2014
1 parent 74789c1 commit cc64f32
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.{SQLConf, SQLContext, execution}
import org.apache.spark.sql.{SQLContext, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -156,7 +156,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val prunePushedDownFilters =
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
(filters: Seq[Expression]) => {
Expand Down Expand Up @@ -185,7 +185,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
filters,
prunePushedDownFilters,
ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil
}

case _ => Nil
}
Expand Down Expand Up @@ -237,12 +236,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case class CommandStrategy(context: SQLContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.SetCommand(key, value) =>
Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
Seq(execution.SetCommand(key, value, plan.output)(context))
case logical.ExplainCommand(child) =>
val executedPlan = context.executePlan(child).executedPlan
Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context))
Seq(execution.ExplainCommand(executedPlan, plan.output)(context))
case logical.CacheCommand(tableName, cache) =>
Seq(execution.CacheCommandPhysical(tableName, cache)(context))
Seq(execution.CacheCommand(tableName, cache)(context))
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}

trait PhysicalCommand {
trait Command {
/**
* A concrete command should override this lazy field to wrap up any side effects caused by the
* command or any other computation that should be evaluated exactly once. The value of this field
* can be used as the contents of the corresponding RDD generated from the physical plan of this
* command.
*
* The `execute()` method of all the physical command classes should reference `sideEffect` so
* that the command can be executed eagerly right after the command query is created.
* The `execute()` method of all the physical command classes should reference `sideEffectResult`
* so that the command can be executed eagerly right after the command query is created.
*/
protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any]
}
Expand All @@ -39,10 +39,10 @@ trait PhysicalCommand {
* :: DeveloperApi ::
*/
@DeveloperApi
case class SetCommandPhysical(
case class SetCommand(
key: Option[String], value: Option[String], output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with PhysicalCommand {
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
// Set value for key k.
Expand Down Expand Up @@ -74,10 +74,10 @@ case class SetCommandPhysical(
* :: DeveloperApi ::
*/
@DeveloperApi
case class ExplainCommandPhysical(
case class ExplainCommand(
child: SparkPlan, output: Seq[Attribute])(
@transient context: SQLContext)
extends UnaryNode with PhysicalCommand {
extends UnaryNode with Command {

// Actually "EXPLAIN" command doesn't cause any side effect.
override protected[sql] lazy val sideEffectResult: Seq[String] = this.toString.split("\n")
Expand All @@ -94,8 +94,8 @@ case class ExplainCommandPhysical(
* :: DeveloperApi ::
*/
@DeveloperApi
case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
extends LeafNode with PhysicalCommand {
case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult = {
if (doCache) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql
package hive
package org.apache.spark.sql.hive

import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
import java.util.{ArrayList => JArrayList}
Expand All @@ -32,11 +31,13 @@ import org.apache.hadoop.hive.ql.session.SessionState

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.{Command => PhysicalCommand}

/**
* Starts up an instance of hive where metadata is stored locally. An in-process metadata data is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private[hive] trait HiveStrategies {
case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.NativeCommand(sql) =>
NativeCommandPhysical(sql, plan.output)(context) :: Nil
NativeCommand(sql, plan.output)(context) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,10 @@ case class InsertIntoHiveTable(
* :: DeveloperApi ::
*/
@DeveloperApi
case class NativeCommandPhysical(
case class NativeCommand(
sql: String, output: Seq[Attribute])(
@transient context: HiveContext)
extends LeafNode with PhysicalCommand {
extends LeafNode with Command {

override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical.ExplainCommand
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.execution.ExplainCommandPhysical
import org.apache.spark.sql.{execution, Row}

/**
* A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
Expand Down Expand Up @@ -173,7 +171,7 @@ class HiveQuerySuite extends HiveComparisonTest {
}
assert(explanation.size == 1)

val explainCommandClassName = classOf[ExplainCommandPhysical].getSimpleName.stripSuffix("$")
val explainCommandClassName = classOf[execution.ExplainCommand].getSimpleName.stripSuffix("$")
assert(explanation.head.contains(explainCommandClassName))

TestHive.reset()
Expand Down

0 comments on commit cc64f32

Please sign in to comment.