Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Fix stats with tightBounds check for AddFiles with deletionVectors #3633

Merged
merged 7 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 134 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,25 @@ object DeltaOperations {
transformer.transformToString(metric, allMetrics)
}
}

/**
* A transaction that commits AddFile actions with deletionVector should have column stats that
* are not tight bounds. An exception to this is ComputeStats operation, which recomputes stats
* on these files, and the new stats are tight bounds. Some other operations that merely take an
* existing AddFile action and commit a copy of it, not changing the deletionVector or stats,
* can then also recommit AddFile with deletionVector and tight bound stats that were recomputed
* before.
*
* An operation for which this can happen, and there is no way that it could be committing
* new deletion vectors, should set this to false to bypass this check.
* All other operations should set this to true, so that this is validated during commit.
*
* This is abstract to force the implementers of all operations to think about this setting.
* All operations should add a comment justifying this setting.
* Any operation that sets this to false should add a test in TightBoundsSuite.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to enforce the test? I feel like people might just cargo-cult the override + comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid that has to be up to good reviewing practice...
I have found examples of some tests that e.g. use reflection to check the number of fields in some class and then comment there what needs to be done if a new one is added.
I can imagine using reflection to find all subclasses of DeltaOperation, listing the ones that already define this as false and have a test, and instruct in that test that if you add a new subclass that overrides it to false then you should add a test, but I think it's a bit overkill...

*/
def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean

}

abstract class OperationWithPredicates(name: String, val predicates: Seq[Expression])
Expand Down Expand Up @@ -133,13 +152,20 @@ object DeltaOperations {
DeltaOperationMetrics.WRITE_REPLACE_WHERE
}
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
// DVs can be introduced by the replaceWhere operation.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class RemoveColumnMapping(
override val userMetadata: Option[String] = None) extends Operation("REMOVE COLUMN MAPPING") {
override def parameters: Map[String, Any] = Map()

override val operationMetrics: Set[String] = DeltaOperationMetrics.REMOVE_COLUMN_MAPPING

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded during streaming inserts. */
Expand All @@ -154,6 +180,9 @@ object DeltaOperations {
)
override val operationMetrics: Set[String] = DeltaOperationMetrics.STREAMING_UPDATE
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded while deleting certain partitions. */
case class Delete(predicate: Seq[Expression])
Expand All @@ -175,12 +204,18 @@ object DeltaOperations {
strMetrics ++ dvMetrics
}
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when truncating the table. */
case class Truncate() extends Operation("TRUNCATE") {
override val parameters: Map[String, Any] = Map.empty
override val operationMetrics: Set[String] = DeltaOperationMetrics.TRUNCATE
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when converting a table into a Delta table. */
Expand All @@ -198,6 +233,9 @@ object DeltaOperations {
sourceFormat.map("sourceFormat" -> _)
override val operationMetrics: Set[String] = DeltaOperationMetrics.CONVERT
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Represents the predicates and action type (insert, update, delete) for a Merge clause */
Expand Down Expand Up @@ -265,6 +303,9 @@ object DeltaOperations {
}

override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

object Merge {
Expand Down Expand Up @@ -296,6 +337,9 @@ object DeltaOperations {
val dvMetrics = transformDeletionVectorMetrics(metrics)
super.transformMetrics(metrics) ++ dvMetrics
}

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when the table is created. */
case class CreateTable(
Expand All @@ -317,6 +361,9 @@ object DeltaOperations {
DeltaOperationMetrics.WRITE
}
override def changesData: Boolean = asSelect

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when the table is replaced. */
case class ReplaceTable(
Expand All @@ -341,12 +388,18 @@ object DeltaOperations {
DeltaOperationMetrics.WRITE
}
override def changesData: Boolean = true

// This operation shouldn't be introducing AddFile actions with DVs and non-tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when the table properties are set. */
val OP_SET_TBLPROPERTIES = "SET TBLPROPERTIES"
case class SetTableProperties(
properties: Map[String, String]) extends Operation(OP_SET_TBLPROPERTIES) {
override val parameters: Map[String, Any] = Map("properties" -> JsonUtils.toJson(properties))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when the table properties are unset. */
case class UnsetTableProperties(
Expand All @@ -355,6 +408,9 @@ object DeltaOperations {
override val parameters: Map[String, Any] = Map(
"properties" -> JsonUtils.toJson(propKeys),
"ifExists" -> ifExists)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when dropping a table feature. */
case class DropTableFeature(
Expand All @@ -363,6 +419,9 @@ object DeltaOperations {
override val parameters: Map[String, Any] = Map(
"featureName" -> featureName,
"truncateHistory" -> truncateHistory)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when columns are added. */
case class AddColumns(
Expand All @@ -375,6 +434,9 @@ object DeltaOperations {
"column" -> structFieldToMap(columnPath, column)
) ++ colPosition.map("position" -> _.toString)
}))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when columns are dropped. */
Expand All @@ -384,6 +446,9 @@ object DeltaOperations {

override val parameters: Map[String, Any] = Map(
"columns" -> JsonUtils.toJson(colsToDrop.map(UnresolvedAttribute(_).name)))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when column is renamed */
Expand All @@ -394,6 +459,9 @@ object DeltaOperations {
"oldColumnPath" -> UnresolvedAttribute(oldColumnPath).name,
"newColumnPath" -> UnresolvedAttribute(newColumnPath).name
)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when columns are changed. */
Expand All @@ -406,13 +474,19 @@ object DeltaOperations {
override val parameters: Map[String, Any] = Map(
"column" -> JsonUtils.toJson(structFieldToMap(columnPath, newColumn))
) ++ colPosition.map("position" -> _)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
/** Recorded when columns are replaced. */
case class ReplaceColumns(
columns: Seq[StructField]) extends Operation("REPLACE COLUMNS") {

override val parameters: Map[String, Any] = Map(
"columns" -> JsonUtils.toJson(columns.map(structFieldToMap(Seq.empty, _))))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class UpgradeProtocol(newProtocol: Protocol) extends Operation("UPGRADE PROTOCOL") {
Expand All @@ -422,15 +496,24 @@ object DeltaOperations {
"readerFeatures" -> newProtocol.readerFeatures,
"writerFeatures" -> newProtocol.writerFeatures
)))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

object ManualUpdate extends Operation("Manual Update") {
override val parameters: Map[String, Any] = Map.empty

// Unsafe manual update disables checks.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false
}

/** A commit without any actions. Could be used to force creation of new checkpoints. */
object EmptyCommit extends Operation("Empty Commit") {
override val parameters: Map[String, Any] = Map.empty

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class UpdateColumnMetadata(
Expand All @@ -442,18 +525,27 @@ object DeltaOperations {
case (path, field) => structFieldToMap(path, field)
}))
}

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class UpdateSchema(oldSchema: StructType, newSchema: StructType)
extends Operation("UPDATE SCHEMA") {
override val parameters: Map[String, Any] = Map(
"oldSchema" -> JsonUtils.toJson(oldSchema),
"newSchema" -> JsonUtils.toJson(newSchema))

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class AddConstraint(
constraintName: String, expr: String) extends Operation("ADD CONSTRAINT") {
override val parameters: Map[String, Any] = Map("name" -> constraintName, "expr" -> expr)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

case class DropConstraint(
Expand All @@ -465,11 +557,19 @@ object DeltaOperations {
Map("name" -> constraintName, "existed" -> "false")
}
}

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when recomputing stats on the table. */
case class ComputeStats(predicate: Seq[Expression])
extends OperationWithPredicates("COMPUTE STATS", predicate)
extends OperationWithPredicates("COMPUTE STATS", predicate) {

// ComputeStats operation commits AddFiles with recomputed stats which are always tight bounds,
// even when DVs are present. This check should be disabled.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false
}

/** Recorded when restoring a Delta table to an older version. */
val OP_RESTORE = "RESTORE"
Expand All @@ -482,6 +582,10 @@ object DeltaOperations {
override def changesData: Boolean = true

override val operationMetrics: Set[String] = DeltaOperationMetrics.RESTORE

// Restore operation commits AddFiles with files, DVs and stats from the version it restores to.
// It can happen that tight bound stats were recomputed before by ComputeStats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false
}

sealed abstract class OptimizeOrReorg(override val name: String, predicates: Seq[Expression])
Expand Down Expand Up @@ -517,6 +621,9 @@ object DeltaOperations {
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when cloning a Delta table into a new location. */
Expand All @@ -531,6 +638,10 @@ object DeltaOperations {
)
override def changesData: Boolean = true
override val operationMetrics: Set[String] = DeltaOperationMetrics.CLONE

// Clone operation commits AddFiles with files, DVs and stats copied over from the source table.
// It can happen that tight bound stats were recomputed before by ComputeStats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false
}

/**
Expand All @@ -548,6 +659,9 @@ object DeltaOperations {
) ++ specifiedRetentionMillis.map("specifiedRetentionMillis" -> _)

override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_START

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/**
Expand All @@ -559,6 +673,9 @@ object DeltaOperations {
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.VACUUM_END

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when running REORG on the table. */
Expand All @@ -570,6 +687,9 @@ object DeltaOperations {
)

override val operationMetrics: Set[String] = DeltaOperationMetrics.OPTIMIZE

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when clustering columns are changed on clustered tables. */
Expand All @@ -579,6 +699,9 @@ object DeltaOperations {
override val parameters: Map[String, Any] = Map(
"oldClusteringColumns" -> oldClusteringColumns,
"newClusteringColumns" -> newClusteringColumns)

// This operation shouldn't be introducing AddFile actions at all. This check should be trivial.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/** Recorded when we backfill a Delta table's existing AddFiles with row tracking data. */
Expand All @@ -587,6 +710,10 @@ object DeltaOperations {
override val parameters: Map[String, Any] = Map(
"batchId" -> JsonUtils.toJson(batchId)
)

// RowTrackingBackfill operation commits AddFiles with files, DVs and stats copied over.
// It can happen that tight bound stats were recomputed before by ComputeStats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = false
}

private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Expand All @@ -610,6 +737,9 @@ object DeltaOperations {
/** Dummy operation only for testing with arbitrary operation names */
case class TestOperation(operationName: String = "TEST") extends Operation(operationName) {
override val parameters: Map[String, Any] = Map.empty

// Perform the check for testing.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}

/**
Expand All @@ -630,6 +760,9 @@ object DeltaOperations {
case class UpgradeUniformProperties(properties: Map[String, String]) extends Operation(
OP_UPGRADE_UNIFORM_BY_REORG) {
override val parameters: Map[String, Any] = Map("properties" -> JsonUtils.toJson(properties))

// This operation shouldn't be introducing AddFile actions with DVs and non tight bounds stats.
override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true
}
}

Expand Down
Loading
Loading