Skip to content

Commit

Permalink
Refactors.
Browse files Browse the repository at this point in the history
- Remove BaseRelation from Catalyst and clean up related code (e.g.
  unmake SparkLogicalPlan a BaseRelation).
- Remove broadcastTables from SQLConf and clean up related code.
- Add EstimatesSuite.
- Address some review comments.
  • Loading branch information
concretevitamin committed Jul 29, 2014
1 parent 5bf5586 commit 84301a4
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.{errors, trees}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.trees.TreeNode

/**
Expand All @@ -36,7 +36,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
case class UnresolvedRelation(
databaseName: Option[String],
tableName: String,
alias: Option[String] = None) extends BaseRelation {
alias: Option[String] = None) extends LeafNode {
override def output = Nil
override lazy val resolved = false
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
protected class Estimates {
lazy val childrenEstimations = children.map(_.estimates)
lazy val cardinality: Long = childrenEstimations.map(_.cardinality).sum
lazy val numTuples: Long = childrenEstimations.map(_.numTuples).sum
lazy val size: Long = childrenEstimations.map(_.size).sum
lazy val sizeInBytes: Long = childrenEstimations.map(_.sizeInBytes).sum
}

/**
Expand Down
4 changes: 0 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ trait SQLConf {
*/
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt

/** A comma-separated list of table names marked to be broadcasted during joins. */
private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")

/** ********************** SQLConf functionality methods ************ */

@transient
Expand Down Expand Up @@ -94,7 +91,6 @@ trait SQLConf {
object SQLConf {
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
val name = tableName
val newPlan = rdd.logicalPlan transform {
case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name)
}
catalog.registerTable(None, tableName, newPlan)
catalog.registerTable(None, tableName, rdd.logicalPlan)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._

/**
Expand Down Expand Up @@ -66,8 +66,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
* linking.
*/
@DeveloperApi
case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "SparkLogicalPlan")
extends BaseRelation with MultiInstanceRelation {
case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
extends LogicalPlan with MultiInstanceRelation {

def output = alreadyPlanned.output
override def references = Set.empty
Expand All @@ -78,8 +78,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "Spar
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
case _ => sys.error("Multiple instance of the same relation detected.")
}, tableName)
.asInstanceOf[this.type]
}).asInstanceOf[this.type]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{SQLContext, execution}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
import org.apache.spark.sql.parquet._
Expand Down Expand Up @@ -61,29 +61,25 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil
}

def broadcastTables: Seq[String] = sqlContext.joinBroadcastTables.split(",").toBuffer

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(
Inner,
leftKeys,
rightKeys,
condition,
left,
right @ PhysicalOperation(_, _, b: BaseRelation))
if broadcastTables.contains(b.tableName)
|| (right.estimates.size <= sqlContext.autoConvertJoinSize) =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
right)
if right.estimates.sizeInBytes <= sqlContext.autoConvertJoinSize =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)

case ExtractEquiJoinKeys(
Inner,
leftKeys,
rightKeys,
condition,
left @ PhysicalOperation(_, _, b: BaseRelation),
left,
right)
if broadcastTables.contains(b.tableName)
|| (left.estimates.size <= sqlContext.autoConvertJoinSize) =>
if left.estimates.sizeInBytes <= sqlContext.autoConvertJoinSize =>
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
Expand Down Expand Up @@ -285,7 +281,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.ExistingRdd(Nil, singleRowRdd) :: Nil
case logical.Repartition(expressions, child) =>
execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil
case SparkLogicalPlan(existingPlan, _) => existingPlan :: Nil
case SparkLogicalPlan(existingPlan) => existingPlan :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
*
* @param path The path to the Parquet file.
*/
// TODO: make me a BaseRelation? For HashJoin strategy.
private[sql] case class ParquetRelation(
path: String,
@transient conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation {
@transient conf: Option[Configuration] = None)
extends LeafNode with MultiInstanceRelation {

self: Product =>

@transient override lazy val estimates = new Estimates {
// TODO: investigate getting encoded column statistics in the parquet file?
override lazy val size: Long = {
override lazy val sizeInBytes: Long = {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job())))
fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent?
Expand Down
11 changes: 0 additions & 11 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,13 @@ package org.apache.spark.sql

import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._

class JoinSuite extends QueryTest {

// Ensures tables are loaded.
TestData

test("parquet") {
val data = parquetFile("../../points.parquet") // local file!
val sizes = data.logicalPlan.collect { case j: ParquetRelation =>
j.newInstance.estimates.size // also works without .newInstance
}.toSeq
assert(sizes.size === 1 && sizes(0) > 0)
}

test("equi-join is hash-join") {
val x = testData2.as('x)
val y = testData2.as('y)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private[hive] case class MetastoreRelation
(databaseName: String, tableName: String, alias: Option[String])
(val table: TTable, val partitions: Seq[TPartition])
(@transient hiveConf: HiveConf, @transient path: Path)
extends BaseRelation {
extends LeafNode {

self: Product =>

Expand All @@ -271,9 +271,8 @@ private[hive] case class MetastoreRelation

// TODO: are there any stats in hiveQlTable.getSkewedInfo that we can use?
@transient override lazy val estimates = new Estimates {
// Size getters adapted from
// https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java
override lazy val size: Long =
// Size getters adapted from SizeBasedBigTableSelectorForAutoSMJ.java in Hive (version 0.13).
override lazy val sizeInBytes: Long =
maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path)

private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = {
Expand Down
113 changes: 113 additions & 0 deletions sql/hive/src/test/scala/org/apache/spark/sql/hive/EstimatesSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import scala.reflect.ClassTag

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.BroadcastHashJoin
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.parquet.{ParquetRelation, ParquetTestData}
import org.apache.spark.util.Utils

class EstimatesSuite extends QueryTest {

test("estimates the size of a test ParquetRelation") {
ParquetTestData.writeFile()
val testRDD = parquetFile(ParquetTestData.testDir.toString)

val sizes = testRDD.logicalPlan.collect { case j: ParquetRelation =>
(j.estimates.sizeInBytes, j.newInstance.estimates.sizeInBytes)
}
assert(sizes.size === 1)
assert(sizes(0)._1 == sizes(0)._2, "after .newInstance, estimates are different from before")
assert(sizes(0)._1 > 0)

Utils.deleteRecursively(ParquetTestData.testDir)
}

test("estimates the size of a test MetastoreRelation") {
val rdd = hql("""SELECT * FROM src""")
val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
mr.estimates.sizeInBytes
}
assert(sizes.size === 1 && sizes(0) > 0)
}

test("auto converts to broadcast hash join, by size estimate of a relation") {
def mkTest(
before: () => Unit,
after: () => Unit,
query: String,
expectedAnswer: Seq[Any],
ct: ClassTag[_]) = {
before()

var rdd = hql(query)

// Assert src has a size smaller than the threshold.
val sizes = rdd.queryExecution.analyzed.collect {
case r if ct.runtimeClass.isAssignableFrom(r.getClass) =>
r.estimates.sizeInBytes
}
assert(sizes.size === 2 && sizes(0) <= autoConvertJoinSize,
s"query should contain two relations, each of which has size smaller than autoConvertSize")

// Using `sparkPlan` because for relevant patterns in HashJoin to be
// matched, other strategies need to be applied.
var bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
assert(bhj.size === 1,
s"actual query plans do not contain broadcast join: ${rdd.queryExecution}")

checkAnswer(rdd, expectedAnswer)

// TODO(zongheng): synchronize on TestHive.settings, or use Sequential/Stepwise.
val tmp = autoConvertJoinSize
hql("""SET spark.sql.auto.convert.join.size=0""")
rdd = hql(query)
bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
assert(bhj.isEmpty)

hql(s"""SET spark.sql.auto.convert.join.size=$tmp""")

after()
}

/** Tests for ParquetRelation */
val parquetQuery =
"""SELECT a.mystring, b.myint
|FROM psrc a
|JOIN psrc b
|ON a.mylong = 0 AND a.mylong = b.mylong""".stripMargin
val parquetAnswer = Seq(("abc", 5))
def parquetBefore(): Unit = {
ParquetTestData.writeFile()
val testRDD = parquetFile(ParquetTestData.testDir.toString)
testRDD.registerAsTable("psrc")
}
mkTest(
parquetBefore, reset, parquetQuery, parquetAnswer, implicitly[ClassTag[ParquetRelation]])

/** Tests for MetastoreRelation */
val metastoreQuery = """SELECT * FROM src a JOIN src b ON a.key = 238 AND a.key = b.key"""
val metastoreAnswer = Seq.fill(4)((238, "val_238", 238, "val_238"))
mkTest(
() => (), () => (), metastoreQuery, metastoreAnswer, implicitly[ClassTag[MetastoreRelation]])
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ abstract class HiveComparisonTest
answer: Seq[String]): Seq[String] = {

def isSorted(plan: LogicalPlan): Boolean = plan match {
case _: Join | _: Aggregate | _: BaseRelation | _: Generate | _: Sample | _: Distinct => false
case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false
case PhysicalOperation(_, _, Sort(_, _)) => true
case _ => plan.children.iterator.exists(isSorted)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,6 @@ class HiveQuerySuite extends HiveComparisonTest {
"Incorrect number of rows in created table")
}

// TODO: put me in a separate EstimateSuite?
test("BHJ by size") {
hql("""SET spark.sql.join.broadcastTables=""") // reset broadcast tables
// TODO: use two different tables?
// assume src has small size
val rdd = hql("""SELECT * FROM src a JOIN src b ON a.key = b.key""")
val physical = rdd.queryExecution.sparkPlan
val bhj = physical.collect { case j: BroadcastHashJoin => j }
println(s"${rdd.queryExecution}")
assert(bhj.size === 1)
}

// TODO: put me in a separate EstimateSuite?
test("estimates the size of a MetastoreRelation") {
val rdd = hql("""SELECT * FROM src""")
println(s"${rdd.queryExecution}")
val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
mr.estimates.size
}.toSeq
assert(sizes.size === 1 && sizes(0) > 0)
}

createQueryTest("between",
"SELECT * FROM src WHERE key Between 1 and 2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft

override def beforeAll() {
// write test data
ParquetTestData.writeFile
ParquetTestData.writeFile()
testRDD = parquetFile(ParquetTestData.testDir.toString)
testRDD.registerAsTable("testsource")
}
Expand Down

0 comments on commit 84301a4

Please sign in to comment.