diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 7abeb032964e1..a0e25775da6dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.{errors, trees} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.BaseRelation +import org.apache.spark.sql.catalyst.plans.logical.LeafNode import org.apache.spark.sql.catalyst.trees.TreeNode /** @@ -36,7 +36,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str case class UnresolvedRelation( databaseName: Option[String], tableName: String, - alias: Option[String] = None) extends BaseRelation { + alias: Option[String] = None) extends LeafNode { override def output = Nil override lazy val resolved = false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala deleted file mode 100644 index 582334aa42590..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.plans.logical - -abstract class BaseRelation extends LeafNode { - self: Product => - - def tableName: String -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index f302b0f20cb43..738231cc13eb3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -29,8 +29,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] { protected class Estimates { lazy val childrenEstimations = children.map(_.estimates) lazy val cardinality: Long = childrenEstimations.map(_.cardinality).sum - lazy val numTuples: Long = childrenEstimations.map(_.numTuples).sum - lazy val size: Long = childrenEstimations.map(_.size).sum + lazy val sizeInBytes: Long = childrenEstimations.map(_.sizeInBytes).sum } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 41920c00b5a2c..df76195723071 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -46,9 +46,6 @@ trait SQLConf { */ private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt - /** A comma-separated list of table names marked to be broadcasted during joins. */ - private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "") - /** ********************** SQLConf functionality methods ************ */ @transient @@ -94,7 +91,6 @@ trait SQLConf { object SQLConf { val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size" val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" - val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables" object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 4abd89955bd27..568a64951def3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -170,11 +170,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = { - val name = tableName - val newPlan = rdd.logicalPlan transform { - case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name) - } - catalog.registerTable(None, tableName, newPlan) + catalog.registerTable(None, tableName, rdd.logicalPlan) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 27dc091b85812..b07f0df7de4b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.plans.logical.BaseRelation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ /** @@ -66,8 +66,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { * linking. */ @DeveloperApi -case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "SparkLogicalPlan") - extends BaseRelation with MultiInstanceRelation { +case class SparkLogicalPlan(alreadyPlanned: SparkPlan) + extends LogicalPlan with MultiInstanceRelation { def output = alreadyPlanned.output override def references = Set.empty @@ -78,8 +78,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "Spar alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) case _ => sys.error("Multiple instance of the same relation detected.") - }, tableName) - .asInstanceOf[this.type] + }).asInstanceOf[this.type] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 809ccf5b54013..58cc3f92cf851 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{SQLContext, execution} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} import org.apache.spark.sql.parquet._ @@ -61,8 +61,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil } - def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ExtractEquiJoinKeys( Inner, @@ -70,20 +68,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { rightKeys, condition, left, - right @ PhysicalOperation(_, _, b: BaseRelation)) - if broadcastTables.contains(b.tableName) - || (right.estimates.size <= sqlContext.autoConvertJoinSize) => - broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) + right) + if right.estimates.sizeInBytes <= sqlContext.autoConvertJoinSize => + broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight) case ExtractEquiJoinKeys( Inner, leftKeys, rightKeys, condition, - left @ PhysicalOperation(_, _, b: BaseRelation), + left, right) - if broadcastTables.contains(b.tableName) - || (left.estimates.size <= sqlContext.autoConvertJoinSize) => + if left.estimates.sizeInBytes <= sqlContext.autoConvertJoinSize => broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft) case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) => @@ -285,7 +281,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.ExistingRdd(Nil, singleRowRdd) :: Nil case logical.Repartition(expressions, child) => execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil - case SparkLogicalPlan(existingPlan, _) => existingPlan :: Nil + case SparkLogicalPlan(existingPlan) => existingPlan :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 91610ea95e747..6e24bc951d612 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -46,16 +46,16 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} * * @param path The path to the Parquet file. */ -// TODO: make me a BaseRelation? For HashJoin strategy. private[sql] case class ParquetRelation( path: String, - @transient conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation { + @transient conf: Option[Configuration] = None) + extends LeafNode with MultiInstanceRelation { self: Product => @transient override lazy val estimates = new Estimates { // TODO: investigate getting encoded column statistics in the parquet file? - override lazy val size: Long = { + override lazy val sizeInBytes: Long = { val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job()))) fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent? diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index e7e3fa9e7a617..025c396ef0629 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -19,9 +19,6 @@ package org.apache.spark.sql import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner} -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ class JoinSuite extends QueryTest { @@ -29,14 +26,6 @@ class JoinSuite extends QueryTest { // Ensures tables are loaded. TestData - test("parquet") { - val data = parquetFile("../../points.parquet") // local file! - val sizes = data.logicalPlan.collect { case j: ParquetRelation => - j.newInstance.estimates.size // also works without .newInstance - }.toSeq - assert(sizes.size === 1 && sizes(0) > 0) - } - test("equi-join is hash-join") { val x = testData2.as('x) val y = testData2.as('y) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 7b6a48cf269d7..88e65b8838bf7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -253,7 +253,7 @@ private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: TTable, val partitions: Seq[TPartition]) (@transient hiveConf: HiveConf, @transient path: Path) - extends BaseRelation { + extends LeafNode { self: Product => @@ -271,9 +271,8 @@ private[hive] case class MetastoreRelation // TODO: are there any stats in hiveQlTable.getSkewedInfo that we can use? @transient override lazy val estimates = new Estimates { - // Size getters adapted from - // https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java - override lazy val size: Long = + // Size getters adapted from SizeBasedBigTableSelectorForAutoSMJ.java in Hive (version 0.13). + override lazy val sizeInBytes: Long = maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path) private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala new file mode 100644 index 0000000000000..f67646e81dc73 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.reflect.ClassTag + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.execution.BroadcastHashJoin +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTestData} +import org.apache.spark.util.Utils + +class EstimatesSuite extends QueryTest { + + test("estimates the size of a test ParquetRelation") { + ParquetTestData.writeFile() + val testRDD = parquetFile(ParquetTestData.testDir.toString) + + val sizes = testRDD.logicalPlan.collect { case j: ParquetRelation => + (j.estimates.sizeInBytes, j.newInstance.estimates.sizeInBytes) + } + assert(sizes.size === 1) + assert(sizes(0)._1 == sizes(0)._2, "after .newInstance, estimates are different from before") + assert(sizes(0)._1 > 0) + + Utils.deleteRecursively(ParquetTestData.testDir) + } + + test("estimates the size of a test MetastoreRelation") { + val rdd = hql("""SELECT * FROM src""") + val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => + mr.estimates.sizeInBytes + } + assert(sizes.size === 1 && sizes(0) > 0) + } + + test("auto converts to broadcast hash join, by size estimate of a relation") { + def mkTest( + before: () => Unit, + after: () => Unit, + query: String, + expectedAnswer: Seq[Any], + ct: ClassTag[_]) = { + before() + + var rdd = hql(query) + + // Assert src has a size smaller than the threshold. + val sizes = rdd.queryExecution.analyzed.collect { + case r if ct.runtimeClass.isAssignableFrom(r.getClass) => + r.estimates.sizeInBytes + } + assert(sizes.size === 2 && sizes(0) <= autoConvertJoinSize, + s"query should contain two relations, each of which has size smaller than autoConvertSize") + + // Using `sparkPlan` because for relevant patterns in HashJoin to be + // matched, other strategies need to be applied. + var bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } + assert(bhj.size === 1, + s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") + + checkAnswer(rdd, expectedAnswer) + + // TODO(zongheng): synchronize on TestHive.settings, or use Sequential/Stepwise. + val tmp = autoConvertJoinSize + hql("""SET spark.sql.auto.convert.join.size=0""") + rdd = hql(query) + bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j } + assert(bhj.isEmpty) + + hql(s"""SET spark.sql.auto.convert.join.size=$tmp""") + + after() + } + + /** Tests for ParquetRelation */ + val parquetQuery = + """SELECT a.mystring, b.myint + |FROM psrc a + |JOIN psrc b + |ON a.mylong = 0 AND a.mylong = b.mylong""".stripMargin + val parquetAnswer = Seq(("abc", 5)) + def parquetBefore(): Unit = { + ParquetTestData.writeFile() + val testRDD = parquetFile(ParquetTestData.testDir.toString) + testRDD.registerAsTable("psrc") + } + mkTest( + parquetBefore, reset, parquetQuery, parquetAnswer, implicitly[ClassTag[ParquetRelation]]) + + /** Tests for MetastoreRelation */ + val metastoreQuery = """SELECT * FROM src a JOIN src b ON a.key = 238 AND a.key = b.key""" + val metastoreAnswer = Seq.fill(4)((238, "val_238", 238, "val_238")) + mkTest( + () => (), () => (), metastoreQuery, metastoreAnswer, implicitly[ClassTag[MetastoreRelation]]) + } + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index b4dbf2b115799..6c8fe4b196dea 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -132,7 +132,7 @@ abstract class HiveComparisonTest answer: Seq[String]): Seq[String] = { def isSorted(plan: LogicalPlan): Boolean = plan match { - case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false + case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false case PhysicalOperation(_, _, Sort(_, _)) => true case _ => plan.children.iterator.exists(isSorted) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index eba4f6fdda3f3..b493972ef67f8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -51,28 +51,6 @@ class HiveQuerySuite extends HiveComparisonTest { "Incorrect number of rows in created table") } - // TODO: put me in a separate EstimateSuite? - test("BHJ by size") { - hql("""SET spark.sql.join.broadcastTables=""") // reset broadcast tables - // TODO: use two different tables? - // assume src has small size - val rdd = hql("""SELECT * FROM src a JOIN src b ON a.key = b.key""") - val physical = rdd.queryExecution.sparkPlan - val bhj = physical.collect { case j: BroadcastHashJoin => j } - println(s"${rdd.queryExecution}") - assert(bhj.size === 1) - } - - // TODO: put me in a separate EstimateSuite? - test("estimates the size of a MetastoreRelation") { - val rdd = hql("""SELECT * FROM src""") - println(s"${rdd.queryExecution}") - val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation => - mr.estimates.size - }.toSeq - assert(sizes.size === 1 && sizes(0) > 0) - } - createQueryTest("between", "SELECT * FROM src WHERE key Between 1 and 2") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala index 91ad59d7f82c0..3bfe49a760be5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala @@ -35,7 +35,7 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft override def beforeAll() { // write test data - ParquetTestData.writeFile + ParquetTestData.writeFile() testRDD = parquetFile(ParquetTestData.testDir.toString) testRDD.registerAsTable("testsource") }