diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f2f95dfe27e69..f92dd18320ded 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.{SQLConf, SQLContext, execution} +import org.apache.spark.sql.{SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ @@ -156,7 +156,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil - case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => { + case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => val prunePushedDownFilters = if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { (filters: Seq[Expression]) => { @@ -185,7 +185,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { filters, prunePushedDownFilters, ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil - } case _ => Nil } @@ -237,12 +236,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case class CommandStrategy(context: SQLContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.SetCommand(key, value) => - Seq(execution.SetCommandPhysical(key, value, plan.output)(context)) + Seq(execution.SetCommand(key, value, plan.output)(context)) case logical.ExplainCommand(child) => val executedPlan = context.executePlan(child).executedPlan - Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context)) + Seq(execution.ExplainCommand(executedPlan, plan.output)(context)) case logical.CacheCommand(tableName, cache) => - Seq(execution.CacheCommandPhysical(tableName, cache)(context)) + Seq(execution.CacheCommand(tableName, cache)(context)) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index da28b4a879056..e9efe28723745 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -22,15 +22,15 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute} -trait PhysicalCommand { +trait Command { /** * A concrete command should override this lazy field to wrap up any side effects caused by the * command or any other computation that should be evaluated exactly once. The value of this field * can be used as the contents of the corresponding RDD generated from the physical plan of this * command. * - * The `execute()` method of all the physical command classes should reference `sideEffect` so - * that the command can be executed eagerly right after the command query is created. + * The `execute()` method of all the physical command classes should reference `sideEffectResult` + * so that the command can be executed eagerly right after the command query is created. */ protected[sql] lazy val sideEffectResult: Seq[Any] = Seq.empty[Any] } @@ -39,10 +39,10 @@ trait PhysicalCommand { * :: DeveloperApi :: */ @DeveloperApi -case class SetCommandPhysical( +case class SetCommand( key: Option[String], value: Option[String], output: Seq[Attribute])( @transient context: SQLContext) - extends LeafNode with PhysicalCommand { + extends LeafNode with Command { override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match { // Set value for key k. @@ -74,10 +74,10 @@ case class SetCommandPhysical( * :: DeveloperApi :: */ @DeveloperApi -case class ExplainCommandPhysical( +case class ExplainCommand( child: SparkPlan, output: Seq[Attribute])( @transient context: SQLContext) - extends UnaryNode with PhysicalCommand { + extends UnaryNode with Command { // Actually "EXPLAIN" command doesn't cause any side effect. override protected[sql] lazy val sideEffectResult: Seq[String] = this.toString.split("\n") @@ -94,8 +94,8 @@ case class ExplainCommandPhysical( * :: DeveloperApi :: */ @DeveloperApi -case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext) - extends LeafNode with PhysicalCommand { +case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext) + extends LeafNode with Command { override protected[sql] lazy val sideEffectResult = { if (doCache) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index b3c87513a59c0..d6ceaaf0fe080 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -15,8 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql -package hive +package org.apache.spark.sql.hive import java.io.{BufferedReader, File, InputStreamReader, PrintStream} import java.util.{ArrayList => JArrayList} @@ -32,11 +31,13 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.sql.execution.{Command => PhysicalCommand} /** * Starts up an instance of hive where metadata is stored locally. An in-process metadata data is diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index e0ea7826fa70d..3ca8ac708bf8f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -79,7 +79,7 @@ private[hive] trait HiveStrategies { case class HiveCommandStrategy(context: HiveContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.NativeCommand(sql) => - NativeCommandPhysical(sql, plan.output)(context) :: Nil + NativeCommand(sql, plan.output)(context) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala index e5fbd1c6f15f5..a839231449161 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala @@ -434,10 +434,10 @@ case class InsertIntoHiveTable( * :: DeveloperApi :: */ @DeveloperApi -case class NativeCommandPhysical( +case class NativeCommand( sql: String, output: Seq[Attribute])( @transient context: HiveContext) - extends LeafNode with PhysicalCommand { + extends LeafNode with Command { override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index ca06c5c25800b..9233b2e9b6e88 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -17,11 +17,9 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.plans.logical.ExplainCommand import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ -import org.apache.spark.sql.execution.ExplainCommandPhysical +import org.apache.spark.sql.{execution, Row} /** * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution. @@ -173,7 +171,7 @@ class HiveQuerySuite extends HiveComparisonTest { } assert(explanation.size == 1) - val explainCommandClassName = classOf[ExplainCommandPhysical].getSimpleName.stripSuffix("$") + val explainCommandClassName = classOf[execution.ExplainCommand].getSimpleName.stripSuffix("$") assert(explanation.head.contains(explainCommandClassName)) TestHive.reset()