Skip to content


[SPARK-23445] ColumnStat refactoring
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Refactor ColumnStat to be more flexible.

* Split `ColumnStat` and `CatalogColumnStat` just like `CatalogStatistics` is split from `Statistics`. This detaches how the statistics are stored from how they are processed in the query plan. `CatalogColumnStat` keeps `min` and `max` as `String`, making it not depend on dataType information.
* For `CatalogColumnStat`, parse column names from property names in the metastore (`KEY_VERSION` property), not from metastore schema. This means that `CatalogColumnStat`s can be created for columns even if the schema itself is not stored in the metastore.
* Make all fields optional. `min`, `max` and `histogram` for columns were optional already. Having them all optional is more consistent, and gives flexibility to e.g. drop some of the fields through transformations if they are difficult / impossible to calculate.

The added flexibility will make it possible to have alternative implementations for stats, and separates stats collection from stats and estimation processing in plans.

## How was this patch tested?

Refactored existing tests to work with refactored `ColumnStat` and `CatalogColumnStat`.
New tests added in `StatisticsSuite` checking that backwards / forwards compatibility is not broken.

Author: Juliusz Sompolski <>

Closes #20624 from juliuszsompolski/SPARK-23445.
  • Loading branch information
juliuszsompolski authored and gatorsmile committed Feb 27, 2018
1 parent 7ec8365 commit 8077bb0
Show file tree
Hide file tree
Showing 22 changed files with 995 additions and 872 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import
import java.util.Date

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
Expand All @@ -30,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -361,15 +363,16 @@ object CatalogTable {
case class CatalogStatistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
colStats: Map[String, ColumnStat] = Map.empty) {
colStats: Map[String, CatalogColumnStat] = Map.empty) {

* Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
* on column names.
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
if (cboEnabled && rowCount.isDefined) {
val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get( -> _)))
val attrStats = AttributeMap(planOutput
.flatMap(a => colStats.get( -> _.toPlanStat(, a.dataType))))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
Expand All @@ -387,6 +390,143 @@ case class CatalogStatistics(

* This class of statistics for a column is used in [[CatalogTable]] to interact with metastore.
case class CatalogColumnStat(
distinctCount: Option[BigInt] = None,
min: Option[String] = None,
max: Option[String] = None,
nullCount: Option[BigInt] = None,
avgLen: Option[Long] = None,
maxLen: Option[Long] = None,
histogram: Option[Histogram] = None) {

* Returns a map from string to string that can be used to serialize the column stats.
* The key is the name of the column and name of the field (e.g. "colName.distinctCount"),
* and the value is the string representation for the value.
* min/max values are stored as Strings. They can be deserialized using
* [[CatalogColumnStat.fromExternalString]].
* As part of the protocol, the returned map always contains a key called "version".
* Any of the fields that are null (None) won't appear in the map.
def toMap(colName: String): Map[String, String] = {
val map = new scala.collection.mutable.HashMap[String, String]
map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
distinctCount.foreach { v =>
map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString)
nullCount.foreach { v =>
map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString)
avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
histogram.foreach { h =>
map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h))

/** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
def toPlanStat(
colName: String,
dataType: DataType): ColumnStat =
distinctCount = distinctCount,
min =, colName, dataType)),
max =, colName, dataType)),
nullCount = nullCount,
avgLen = avgLen,
maxLen = maxLen,
histogram = histogram)

object CatalogColumnStat extends Logging {

// List of string keys used to serialize CatalogColumnStat
val KEY_VERSION = "version"
private val KEY_DISTINCT_COUNT = "distinctCount"
private val KEY_MIN_VALUE = "min"
private val KEY_MAX_VALUE = "max"
private val KEY_NULL_COUNT = "nullCount"
private val KEY_AVG_LEN = "avgLen"
private val KEY_MAX_LEN = "maxLen"
private val KEY_HISTOGRAM = "histogram"

* Converts from string representation of data type to the corresponding Catalyst data type.
def fromExternalString(s: String, name: String, dataType: DataType): Any = {
dataType match {
case BooleanType => s.toBoolean
case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
case ByteType => s.toByte
case ShortType => s.toShort
case IntegerType => s.toInt
case LongType => s.toLong
case FloatType => s.toFloat
case DoubleType => s.toDouble
case _: DecimalType => Decimal(s)
// This version of Spark does not use min/max for binary/string types so we ignore it.
case BinaryType | StringType => null
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column $name of data type: $dataType.")

* Converts the given value from Catalyst data type to string representation of external
* data type.
def toExternalString(v: Any, colName: String, dataType: DataType): String = {
val externalValue = dataType match {
case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
case BooleanType | _: IntegralType | FloatType | DoubleType => v
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
// This version of Spark does not use min/max for binary/string types so we ignore it.
case _ =>
throw new AnalysisException("Column statistics serialization is not supported for " +
s"column $colName of data type: $dataType.")

* Creates a [[CatalogColumnStat]] object from the given map.
* This is used to deserialize column stats from some external storage.
* The serialization side is defined in [[CatalogColumnStat.toMap]].
def fromMap(
table: String,
colName: String,
map: Map[String, String]): Option[CatalogColumnStat] = {

try {
distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)),
min = map.get(s"${colName}.${KEY_MIN_VALUE}"),
max = map.get(s"${colName}.${KEY_MAX_VALUE}"),
nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize)
} catch {
case NonFatal(e) =>
logWarning(s"Failed to parse column statistics for column ${colName} in table $table", e)

case class CatalogTableType private(name: String)
object CatalogTableType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@ object StarSchemaDetection extends PredicateHelper {
stats.rowCount match {
case Some(rowCount) if rowCount >= 0 =>
if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) {
val colStats = stats.attributeStats.get(col)
if (colStats.get.nullCount > 0) {
val colStats = stats.attributeStats.get(col).get
if (!colStats.hasCountStats || colStats.nullCount.get > 0) {
} else {
val distinctCount = colStats.get.distinctCount
val distinctCount = colStats.distinctCount.get
val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d)
// ndvMaxErr adjusted based on TPCDS 1TB data results
relDiff <= conf.ndvMaxError * 2
Expand Down

0 comments on commit 8077bb0

Please sign in to comment.