From 4ce09aa8aa820bbbbbaa0f3f084a6cff1d4e6195 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 1 May 2014 12:26:45 +0800 Subject: [PATCH] Made in-memory compression configurable via SparkConf --- .../scala/org/apache/spark/sql/SQLContext.scala | 7 +++++-- .../spark/sql/columnar/ColumnBuilder.scala | 17 +++++++++++++---- .../columnar/InMemoryColumnarTableScan.scala | 8 ++++++-- .../sql/columnar/NullableColumnBuilder.scala | 4 ++-- .../compression/CompressibleColumnBuilder.scala | 12 +++++++++++- .../apache/spark/sql/execution/SparkPlan.scala | 2 +- .../columnar/InMemoryColumnarQuerySuite.scala | 6 +++--- .../TestCompressibleColumnBuilder.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../apache/spark/sql/hive/HiveStrategies.scala | 2 +- 10 files changed, 44 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index e25201a6c1775..0e8102c240774 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -162,8 +162,11 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Caches the specified table in-memory. */ def cacheTable(tableName: String): Unit = { val currentTable = catalog.lookupRelation(None, tableName) + val useCompression = + sparkContext.conf.getBoolean("spark.sql.inMemoryCompression.enabled", false) val asInMemoryRelation = - InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan) + InMemoryColumnarTableScan( + currentTable.output, executePlan(currentTable).executedPlan, useCompression) catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation)) } @@ -173,7 +176,7 @@ class SQLContext(@transient val sparkContext: SparkContext) EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match { // This is kind of a hack to make sure that if this was just an RDD registered as a table, // we reregister the RDD as a table. - case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) => + case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd, _)) => inMem.cachedColumnBuffers.unpersist() catalog.unregisterTable(None, tableName) catalog.registerTable(None, tableName, SparkLogicalPlan(e)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index 048ee66bff44b..4be048cd742d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -28,7 +28,7 @@ private[sql] trait ColumnBuilder { /** * Initializes with an approximate lower bound on the expected number of elements in this column. */ - def initialize(initialSize: Int, columnName: String = "") + def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false) /** * Appends `row(ordinal)` to the column builder. @@ -55,7 +55,11 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType]( protected var buffer: ByteBuffer = _ - override def initialize(initialSize: Int, columnName: String = "") = { + override def initialize( + initialSize: Int, + columnName: String = "", + useCompression: Boolean = false) = { + val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize this.columnName = columnName @@ -130,7 +134,12 @@ private[sql] object ColumnBuilder { } } - def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = { + def apply( + typeId: Int, + initialSize: Int = 0, + columnName: String = "", + useCompression: Boolean = false): ColumnBuilder = { + val builder = (typeId match { case INT.typeId => new IntColumnBuilder case LONG.typeId => new LongColumnBuilder @@ -144,7 +153,7 @@ private[sql] object ColumnBuilder { case GENERIC.typeId => new GenericColumnBuilder }).asInstanceOf[ColumnBuilder] - builder.initialize(initialSize, columnName) + builder.initialize(initialSize, columnName, useCompression) builder } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 8a24733047423..fdf28e1bb1261 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -20,8 +20,12 @@ package org.apache.spark.sql.columnar import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute} import org.apache.spark.sql.execution.{SparkPlan, LeafNode} import org.apache.spark.sql.Row +import org.apache.spark.SparkConf -private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan) +private[sql] case class InMemoryColumnarTableScan( + attributes: Seq[Attribute], + child: SparkPlan, + useCompression: Boolean) extends LeafNode { override def output: Seq[Attribute] = attributes @@ -30,7 +34,7 @@ private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], ch val output = child.output val cached = child.execute().mapPartitions { iterator => val columnBuilders = output.map { attribute => - ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name) + ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression) }.toArray var row: Row = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala index 2a3b6fc1e46d3..d008806eedbe1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala @@ -40,12 +40,12 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder { private var pos: Int = _ private var nullCount: Int = _ - abstract override def initialize(initialSize: Int, columnName: String) { + abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) { nulls = ByteBuffer.allocate(1024) nulls.order(ByteOrder.nativeOrder()) pos = 0 nullCount = 0 - super.initialize(initialSize, columnName) + super.initialize(initialSize, columnName, useCompression) } abstract override def appendFrom(row: Row, ordinal: Int) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala index 0f808f68f2eec..4c6675c3c87bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala @@ -47,7 +47,17 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType] import CompressionScheme._ - val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T]) + var compressionEncoders: Seq[Encoder[T]] = _ + + abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) { + compressionEncoders = + if (useCompression) { + schemes.filter(_.supports(columnType)).map(_.encoder[T]) + } else { + Seq(PassThrough.encoder) + } + super.initialize(initialSize, columnName, useCompression) + } protected def isWorthCompressing(encoder: Encoder[T]) = { encoder.compressionRatio < 0.8 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 50124dd407447..235a9b1692460 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -77,7 +77,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) SparkLogicalPlan( alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) - case scan @ InMemoryColumnarTableScan(output, child) => + case scan @ InMemoryColumnarTableScan(output, _, _) => scan.copy(attributes = output.map(_.newInstance)) case _ => sys.error("Multiple instance of the same relation detected.") }).asInstanceOf[this.type] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index d926e1049e6a6..31c5dfba92954 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("simple columnar query") { val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true)) checkAnswer(scan, testData.collect().toSeq) } test("projection") { val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true)) checkAnswer(scan, testData.collect().map { case Row(key: Int, value: String) => value -> key @@ -44,7 +44,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true)) checkAnswer(scan, testData.collect().toSeq) checkAnswer(scan, testData.collect().toSeq) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala index 81bf5e99d19b9..6d688ea95cfc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala @@ -38,7 +38,7 @@ object TestCompressibleColumnBuilder { scheme: CompressionScheme) = { val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme)) - builder.initialize(0) + builder.initialize(0, "", useCompression = true) builder } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6c907887db79e..ba837a274c51c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -130,7 +130,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with castChildOutput(p, table, child) case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan( - _, HiveTableScan(_, table, _))), _, child, _) => + _, HiveTableScan(_, table, _), _)), _, child, _) => castChildOutput(p, table, child) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index d9a6e0e88932e..b2157074a41bf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -45,7 +45,7 @@ private[hive] trait HiveStrategies { case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) => InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan( - _, HiveTableScan(_, table, _))), partition, child, overwrite) => + _, HiveTableScan(_, table, _), _)), partition, child, overwrite) => InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil case _ => Nil }