Skip to content

Commit

Permalink
[SPARK-17549][SQL] Only collect table size stat in driver for cached …
Browse files Browse the repository at this point in the history
…relation.

This reverts commit 9ac68db. Turns out
the original fix was correct.

Original change description:
The existing code caches all stats for all columns for each partition
in the driver; for a large relation, this causes extreme memory usage,
which leads to gc hell and application failures.

It seems that only the size in bytes of the data is actually used in the
driver, so instead just colllect that. In executors, the full stats are
still kept, but that's not a big problem; we expect the data to be distributed
and thus not really incur in too much memory pressure in each individual
executor.

There are also potential improvements on the executor side, since the data
being stored currently is very wasteful (e.g. storing boxed types vs.
primitive types for stats). But that's a separate issue.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #15304 from vanzin/SPARK-17549.2.
  • Loading branch information
Marcelo Vanzin committed Oct 4, 2016
1 parent 068c198 commit 8d969a2
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.columnar

import scala.collection.JavaConverters._

import org.apache.commons.lang3.StringUtils

import org.apache.spark.network.util.JavaUtils
Expand All @@ -31,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.CollectionAccumulator
import org.apache.spark.util.LongAccumulator


object InMemoryRelation {
Expand Down Expand Up @@ -63,8 +61,7 @@ case class InMemoryRelation(
@transient child: SparkPlan,
tableName: Option[String])(
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
val batchStats: CollectionAccumulator[InternalRow] =
child.sqlContext.sparkContext.collectionAccumulator[InternalRow])
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator)
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
Expand All @@ -74,21 +71,12 @@ case class InMemoryRelation(
@transient val partitionStatistics = new PartitionStatistics(output)

override lazy val statistics: Statistics = {
if (batchStats.value.isEmpty) {
if (batchStats.value == 0L) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
// available, return the default statistics.
Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
} else {
// Underlying columnar RDD has been materialized, required information has also been
// collected via the `batchStats` accumulator.
val sizeOfRow: Expression =
BindReferences.bindReference(
output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add),
partitionStatistics.schema)

val sizeInBytes =
batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum
Statistics(sizeInBytes = sizeInBytes)
Statistics(sizeInBytes = batchStats.value.longValue)
}
}

Expand Down Expand Up @@ -139,10 +127,10 @@ case class InMemoryRelation(
rowCount += 1
}

batchStats.add(totalSize)

val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics)
.flatMap(_.values))

batchStats.add(stats)
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,18 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
val columnTypes2 = List.fill(length2)(IntegerType)
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
}

test("SPARK-17549: cached table size should be correctly calculated") {
val data = spark.sparkContext.parallelize(1 to 10, 5).toDF()
val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None)

// Materialize the data.
val expectedAnswer = data.collect()
checkAnswer(cached, expectedAnswer)

// Check that the right size was calculated.
assert(cached.batchStats.value === expectedAnswer.size * INT.defaultSize)
}

}

0 comments on commit 8d969a2

Please sign in to comment.