Skip to content

Commit

Permalink
Made in-memory compression configurable via SparkConf
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 1, 2014
1 parent d537a36 commit 4ce09aa
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 18 deletions.
7 changes: 5 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
/** Caches the specified table in-memory. */
def cacheTable(tableName: String): Unit = {
val currentTable = catalog.lookupRelation(None, tableName)
val useCompression =
sparkContext.conf.getBoolean("spark.sql.inMemoryCompression.enabled", false)
val asInMemoryRelation =
InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)
InMemoryColumnarTableScan(
currentTable.output, executePlan(currentTable).executedPlan, useCompression)

catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
}
Expand All @@ -173,7 +176,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table.
case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) =>
case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd, _)) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[sql] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
def initialize(initialSize: Int, columnName: String = "")
def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false)

/**
* Appends `row(ordinal)` to the column builder.
Expand All @@ -55,7 +55,11 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](

protected var buffer: ByteBuffer = _

override def initialize(initialSize: Int, columnName: String = "") = {
override def initialize(
initialSize: Int,
columnName: String = "",
useCompression: Boolean = false) = {

val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
this.columnName = columnName

Expand Down Expand Up @@ -130,7 +134,12 @@ private[sql] object ColumnBuilder {
}
}

def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = {
def apply(
typeId: Int,
initialSize: Int = 0,
columnName: String = "",
useCompression: Boolean = false): ColumnBuilder = {

val builder = (typeId match {
case INT.typeId => new IntColumnBuilder
case LONG.typeId => new LongColumnBuilder
Expand All @@ -144,7 +153,7 @@ private[sql] object ColumnBuilder {
case GENERIC.typeId => new GenericColumnBuilder
}).asInstanceOf[ColumnBuilder]

builder.initialize(initialSize, columnName)
builder.initialize(initialSize, columnName, useCompression)
builder
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ package org.apache.spark.sql.columnar
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
import org.apache.spark.sql.Row
import org.apache.spark.SparkConf

private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
private[sql] case class InMemoryColumnarTableScan(
attributes: Seq[Attribute],
child: SparkPlan,
useCompression: Boolean)
extends LeafNode {

override def output: Seq[Attribute] = attributes
Expand All @@ -30,7 +34,7 @@ private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], ch
val output = child.output
val cached = child.execute().mapPartitions { iterator =>
val columnBuilders = output.map { attribute =>
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)
ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
}.toArray

var row: Row = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
private var pos: Int = _
private var nullCount: Int = _

abstract override def initialize(initialSize: Int, columnName: String) {
abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
nulls = ByteBuffer.allocate(1024)
nulls.order(ByteOrder.nativeOrder())
pos = 0
nullCount = 0
super.initialize(initialSize, columnName)
super.initialize(initialSize, columnName, useCompression)
}

abstract override def appendFrom(row: Row, ordinal: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,17 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]

import CompressionScheme._

val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T])
var compressionEncoders: Seq[Encoder[T]] = _

abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
compressionEncoders =
if (useCompression) {
schemes.filter(_.supports(columnType)).map(_.encoder[T])
} else {
Seq(PassThrough.encoder)
}
super.initialize(initialSize, columnName, useCompression)
}

protected def isWorthCompressing(encoder: Encoder[T]) = {
encoder.compressionRatio < 0.8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
case scan @ InMemoryColumnarTableScan(output, child) =>
case scan @ InMemoryColumnarTableScan(output, _, _) =>
scan.copy(attributes = output.map(_.newInstance))
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {

test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))

checkAnswer(scan, testData.collect().toSeq)
}

test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))

checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
Expand All @@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {

test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))

checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object TestCompressibleColumnBuilder {
scheme: CompressionScheme) = {

val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
builder.initialize(0)
builder.initialize(0, "", useCompression = true)
builder
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
castChildOutput(p, table, child)

case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
_, HiveTableScan(_, table, _))), _, child, _) =>
_, HiveTableScan(_, table, _), _)), _, child, _) =>
castChildOutput(p, table, child)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private[hive] trait HiveStrategies {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
_, HiveTableScan(_, table, _))), partition, child, overwrite) =>
_, HiveTableScan(_, table, _), _)), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case _ => Nil
}
Expand Down

0 comments on commit 4ce09aa

Please sign in to comment.