Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3537][SPARK-3914][SQL] Refines in-memory columnar table statistics #2860

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,17 @@ package org.apache.spark.sql.catalyst.expressions
* of the name, or the expected nullability).
*/
object AttributeMap {
def apply[A](kvs: Seq[(Attribute, A)]) =
new AttributeMap(kvs.map(kv => (kv._1.exprId, (kv._1, kv._2))).toMap)
def apply[A](kvs: Seq[(Attribute, A)]) = new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap)
}

class AttributeMap[A](baseMap: Map[ExprId, (Attribute, A)])
extends Map[Attribute, A] with Serializable {

override def get(k: Attribute): Option[A] = baseMap.get(k.exprId).map(_._2)

override def + [B1 >: A](kv: (Attribute, B1)): Map[Attribute, B1] =
(baseMap.map(_._2) + kv).toMap
override def + [B1 >: A](kv: (Attribute, B1)): Map[Attribute, B1] = baseMap.values.toMap + kv

override def iterator: Iterator[(Attribute, A)] = baseMap.map(_._2).iterator
override def iterator: Iterator[(Attribute, A)] = baseMap.valuesIterator

override def -(key: Attribute): Map[Attribute, A] = (baseMap.map(_._2) - key).toMap
override def -(key: Attribute): Map[Attribute, A] = baseMap.values.toMap - key
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,24 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.catalyst.trees

/**
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
* corresponding statistic produced by the children. To override this behavior, override
* `statistics` and assign it an overriden version of `Statistics`.
*
* '''NOTE''': concrete and/or overriden versions of statistics fields should pay attention to the
* performance of the implementations. The reason is that estimations might get triggered in
* performance-critical processes, such as query plan planning.
*
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
*/
private[sql] case class Statistics(sizeInBytes: BigInt)

abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
self: Product =>

/**
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
* corresponding statistic produced by the children. To override this behavior, override
* `statistics` and assign it an overriden version of `Statistics`.
*
* '''NOTE''': concrete and/or overriden versions of statistics fields should pay attention to the
* performance of the implementations. The reason is that estimations might get triggered in
* performance-critical processes, such as query plan planning.
*
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
*/
case class Statistics(
sizeInBytes: BigInt
)
lazy val statistics: Statistics = {
def statistics: Statistics = {
if (children.size == 0) {
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute, Attri
import org.apache.spark.sql.catalyst.types._

private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable {
val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = false)()
val lowerBound = AttributeReference(a.name + ".lowerBound", a.dataType, nullable = false)()
val nullCount = AttributeReference(a.name + ".nullCount", IntegerType, nullable = false)()
val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = true)()
val lowerBound = AttributeReference(a.name + ".lowerBound", a.dataType, nullable = true)()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upper/lower bound can be null for types like string.

val nullCount = AttributeReference(a.name + ".nullCount", IntegerType, nullable = false)()
val count = AttributeReference(a.name + ".count", IntegerType, nullable = false)()
val sizeInBytes = AttributeReference(a.name + ".sizeInBytes", LongType, nullable = false)()

val schema = Seq(lowerBound, upperBound, nullCount)
val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes)
}

private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable {
Expand All @@ -45,10 +47,21 @@ private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Seri
* brings significant performance penalty.
*/
private[sql] sealed trait ColumnStats extends Serializable {
protected var count = 0
protected var nullCount = 0
protected var sizeInBytes = 0L

/**
* Gathers statistics information from `row(ordinal)`.
*/
def gatherStats(row: Row, ordinal: Int): Unit
def gatherStats(row: Row, ordinal: Int): Unit = {
if (row.isNullAt(ordinal)) {
nullCount += 1
// 4 bytes for null position
sizeInBytes += 4
}
count += 1
}

/**
* Column statistics represented as a single row, currently including closed lower bound, closed
Expand All @@ -65,163 +78,154 @@ private[sql] class NoopColumnStats extends ColumnStats {
}

private[sql] class ByteColumnStats extends ColumnStats {
var upper = Byte.MinValue
var lower = Byte.MaxValue
var nullCount = 0
protected var upper = Byte.MinValue
protected var lower = Byte.MaxValue

override def gatherStats(row: Row, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getByte(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
} else {
nullCount += 1
sizeInBytes += BYTE.defaultSize
}
}

def collectedStatistics = Row(lower, upper, nullCount)
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}

private[sql] class ShortColumnStats extends ColumnStats {
var upper = Short.MinValue
var lower = Short.MaxValue
var nullCount = 0
protected var upper = Short.MinValue
protected var lower = Short.MaxValue

override def gatherStats(row: Row, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getShort(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
} else {
nullCount += 1
sizeInBytes += SHORT.defaultSize
}
}

def collectedStatistics = Row(lower, upper, nullCount)
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}

private[sql] class LongColumnStats extends ColumnStats {
var upper = Long.MinValue
var lower = Long.MaxValue
var nullCount = 0
protected var upper = Long.MinValue
protected var lower = Long.MaxValue

override def gatherStats(row: Row, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getLong(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
} else {
nullCount += 1
sizeInBytes += LONG.defaultSize
}
}

def collectedStatistics = Row(lower, upper, nullCount)
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}

private[sql] class DoubleColumnStats extends ColumnStats {
var upper = Double.MinValue
var lower = Double.MaxValue
var nullCount = 0
protected var upper = Double.MinValue
protected var lower = Double.MaxValue

override def gatherStats(row: Row, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getDouble(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
} else {
nullCount += 1
sizeInBytes += DOUBLE.defaultSize
}
}

def collectedStatistics = Row(lower, upper, nullCount)
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}

private[sql] class FloatColumnStats extends ColumnStats {
var upper = Float.MinValue
var lower = Float.MaxValue
var nullCount = 0
protected var upper = Float.MinValue
protected var lower = Float.MaxValue

override def gatherStats(row: Row, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getFloat(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
} else {
nullCount += 1
sizeInBytes += FLOAT.defaultSize
}
}

def collectedStatistics = Row(lower, upper, nullCount)
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}

private[sql] class IntColumnStats extends ColumnStats {
var upper = Int.MinValue
var lower = Int.MaxValue
var nullCount = 0
protected var upper = Int.MinValue
protected var lower = Int.MaxValue

override def gatherStats(row: Row, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getInt(ordinal)
if (value > upper) upper = value
if (value < lower) lower = value
} else {
nullCount += 1
sizeInBytes += INT.defaultSize
}
}

def collectedStatistics = Row(lower, upper, nullCount)
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}

private[sql] class StringColumnStats extends ColumnStats {
var upper: String = null
var lower: String = null
var nullCount = 0
protected var upper: String = null
protected var lower: String = null

override def gatherStats(row: Row, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row.getString(ordinal)
if (upper == null || value.compareTo(upper) > 0) upper = value
if (lower == null || value.compareTo(lower) < 0) lower = value
} else {
nullCount += 1
sizeInBytes += STRING.actualSize(row, ordinal)
}
}

def collectedStatistics = Row(lower, upper, nullCount)
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}

private[sql] class DateColumnStats extends ColumnStats {
var upper: Date = null
var lower: Date = null
var nullCount = 0
protected var upper: Date = null
protected var lower: Date = null

override def gatherStats(row: Row, ordinal: Int) {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row(ordinal).asInstanceOf[Date]
if (upper == null || value.compareTo(upper) > 0) upper = value
if (lower == null || value.compareTo(lower) < 0) lower = value
} else {
nullCount += 1
sizeInBytes += DATE.defaultSize
}
}

def collectedStatistics = Row(lower, upper, nullCount)
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}

private[sql] class TimestampColumnStats extends ColumnStats {
var upper: Timestamp = null
var lower: Timestamp = null
var nullCount = 0
protected var upper: Timestamp = null
protected var lower: Timestamp = null

override def gatherStats(row: Row, ordinal: Int): Unit = {
super.gatherStats(row, ordinal)
if (!row.isNullAt(ordinal)) {
val value = row(ordinal).asInstanceOf[Timestamp]
if (upper == null || value.compareTo(upper) > 0) upper = value
if (lower == null || value.compareTo(lower) < 0) lower = value
} else {
nullCount += 1
sizeInBytes += TIMESTAMP.defaultSize
}
}

def collectedStatistics = Row(lower, upper, nullCount)
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}
Loading