Skip to content

Commit

Permalink
[Spark] Improve Delta Protocol Transitions (#2848)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

Currently, protocol transitions can be hard to manage. A few examples:
- It is hard to predict the output of certain operations.
- Once a legacy protocol transitions to a Table Features protocol it is
quite hard to transition back to a legacy protocol.
- Adding a feature in a protocol and then removing it might lead to a
different protocol.
- Adding an explicit feature to a legacy protocol always leads to a
table features protocol although it might not be necessary.
- Dropping features from legacy protocols is not supported. As a result,
the order the features are dropped matters.
- Default protocol versions are ignored in some cases.
- Enabling table features by default results in feature loss in legacy
protocols.
- CREATE TABLE ignores any legacy versions set if there is also a table
feature in the definition.

This PR proposes several protocol transition improvements in order to
simplify user journeys. The high level proposal is the following:

Two protocol representations with singular operational semantics. This
means that we have two ways to represent a protocol: a) The legacy
representation and b) the table features representation. The latter
representation is more powerful than the former, i.e the table features
representation can represent all legacy protocols but the opposite is
not true. This is followed by three simple rules:

1. All operations should be allowed to be performed on both protocol
representations and should yield equivalent results.
2. The result should always be represented with the weaker form when
possible.
3. Conversely, if the result of an operation on a legacy protocol cannot
be represented with the legacy representation, use the Table Features
representation.

**The PR introduces the following behavioural changes:**

1. Now all protocol operations are followed by denormalisation and then
normalisation. Up to now, normalisation would only be performed after
dropping a features.
2. Legacy features can now be dropped directly from a legacy protocol.
The result is represented with table features if it cannot be
represented with a legacy protocol.
3. Operations on table feature protocols now take into account the
default versions. For example, enabling deletion vectors on table
results to protocol `(3, 7, AppendOnly, Invariants, DeletionVectors)`.
5. Operations on table feature protocols now take into account any
protocol versions set on the table. For example, creating a table with
protocol `(1, 3)` and deletion vectors results to protocol `(3, 7,
AppendOnly, Invariants, CheckConstraints, DeletionVectors)`.
6. It is not possible now to have a table features protocol without
table features. For example, creating a table with `(3, 7)` and no table
features is now normalised to `(1, 1)`.
7. Column Mapping can now be automatically enabled on legacy protocols
when the mode is changed explicitly.

## How was this patch tested?
Added `DeltaProtocolTransitionsSuite`. Also modified existing tests in
`DeltaProtocolVersionSuite`.

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
Yes.
  • Loading branch information
andreaschat-db authored Jul 17, 2024
1 parent 4430dc1 commit 669dca9
Show file tree
Hide file tree
Showing 22 changed files with 1,142 additions and 515 deletions.
14 changes: 0 additions & 14 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2514,20 +2514,6 @@
],
"sqlState" : "0AKDC"
},
"DELTA_UNSUPPORTED_COLUMN_MAPPING_PROTOCOL" : {
"message" : [
"",
"Your current table protocol version does not support changing column mapping modes",
"using <config>.",
"",
"Required Delta protocol version for column mapping:",
"<requiredVersion>",
"Your table's current Delta protocol version:",
"<currentVersion>",
"<advice>"
],
"sqlState" : "KD004"
},
"DELTA_UNSUPPORTED_COLUMN_MAPPING_SCHEMA_CHANGE" : {
"message" : [
"",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ trait DeltaColumnMappingBase extends DeltaLogging {
RowIdMetadataStructField.isRowIdColumn(field) ||
RowCommitVersion.MetadataStructField.isRowCommitVersionColumn(field)

def satisfiesColumnMappingProtocol(protocol: Protocol): Boolean =
protocol.isFeatureSupported(ColumnMappingTableFeature)

/**
* Allow NameMapping -> NoMapping transition behind a feature flag.
* Otherwise only NoMapping -> NameMapping is allowed.
Expand Down Expand Up @@ -134,33 +131,9 @@ trait DeltaColumnMappingBase extends DeltaLogging {
}

val isChangingModeOnExistingTable = oldMappingMode != newMappingMode && !isCreatingNewTable
if (isChangingModeOnExistingTable) {
if (!allowMappingModeChange(oldMappingMode, newMappingMode)) {
throw DeltaErrors.changeColumnMappingModeNotSupported(
oldMappingMode.name, newMappingMode.name)
} else {
// legal mode change, now check if protocol is upgraded before or part of this txn
val caseInsensitiveMap = CaseInsensitiveMap(newMetadata.configuration)
val minReaderVersion = caseInsensitiveMap
.get(Protocol.MIN_READER_VERSION_PROP).map(_.toInt)
.getOrElse(oldProtocol.minReaderVersion)
val minWriterVersion = caseInsensitiveMap
.get(Protocol.MIN_WRITER_VERSION_PROP).map(_.toInt)
.getOrElse(oldProtocol.minWriterVersion)
var newProtocol = Protocol(minReaderVersion, minWriterVersion)
val satisfiesWriterVersion = minWriterVersion >= ColumnMappingTableFeature.minWriterVersion
val satisfiesReaderVersion = minReaderVersion >= ColumnMappingTableFeature.minReaderVersion
// This is an OR check because `readerFeatures` and `writerFeatures` can independently
// support table features.
if ((newProtocol.supportsReaderFeatures && satisfiesWriterVersion) ||
(newProtocol.supportsWriterFeatures && satisfiesReaderVersion)) {
newProtocol = newProtocol.withFeature(ColumnMappingTableFeature)
}

if (!satisfiesColumnMappingProtocol(newProtocol)) {
throw DeltaErrors.changeColumnMappingModeOnOldProtocol(oldProtocol)
}
}
if (isChangingModeOnExistingTable && !allowMappingModeChange(oldMappingMode, newMappingMode)) {
throw DeltaErrors.changeColumnMappingModeNotSupported(
oldMappingMode.name, newMappingMode.name)
}

val updatedMetadata = updateColumnMappingMetadata(
Expand Down
33 changes: 5 additions & 28 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2043,41 +2043,18 @@ trait DeltaErrorsBase
mode.name))
}

def changeColumnMappingModeOnOldProtocol(oldProtocol: Protocol): Throwable = {
val requiredProtocol = {
if (oldProtocol.supportsReaderFeatures || oldProtocol.supportsWriterFeatures) {
Protocol(
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION,
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
.withFeature(ColumnMappingTableFeature)
} else {
ColumnMappingTableFeature.minProtocolVersion
}
}

new DeltaColumnMappingUnsupportedException(
errorClass = "DELTA_UNSUPPORTED_COLUMN_MAPPING_PROTOCOL",
messageParameters = Array(
s"${DeltaConfigs.COLUMN_MAPPING_MODE.key}",
s"$requiredProtocol",
s"$oldProtocol",
columnMappingAdviceMessage(requiredProtocol)))
}

private def columnMappingAdviceMessage(
protected def columnMappingAdviceMessage(
requiredProtocol: Protocol = ColumnMappingTableFeature.minProtocolVersion): String = {
val readerVersion = requiredProtocol.minReaderVersion
val writerVersion = requiredProtocol.minWriterVersion
s"""
|Please enable Column Mapping on your Delta table with mapping mode 'name'.
|You can use one of the following commands.
|
|If your table is already on the required protocol version:
|ALTER TABLE table_name SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')
|
|If your table is not on the required protocol version and requires a protocol upgrade:
|ALTER TABLE table_name SET TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.minReaderVersion' = '${requiredProtocol.minReaderVersion}',
| 'delta.minWriterVersion' = '${requiredProtocol.minWriterVersion}')
|Note, if your table is not on the required protocol version it will be upgraded.
|Column mapping requires at least protocol ($readerVersion, $writerVersion)
|""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,22 @@ trait OptimisticTransactionImpl extends TransactionalWrite

val newProtocolForLatestMetadata =
Protocol(readerVersionAsTableProp, writerVersionAsTableProp)

// The user-supplied protocol version numbers are treated as a group of features
// that must all be enabled. This ensures that the feature-enabling behavior is the
// same on Table Features-enabled protocols as on legacy protocols, i.e., exactly
// the same set of features are enabled.
//
// This is useful for supporting protocol downgrades to legacy protocol versions.
// When the protocol versions are explicitly set on table features protocol we may
// normalize to legacy protocol versions. Legacy protocol versions can only be
// used if a table supports *exactly* the set of features in that legacy protocol
// version, with no "gaps". By merging in the protocol features from a particular
// protocol version, we may end up with such a "gap-free" protocol. E.g. if a table
// has only table feature "checkConstraints" (added by writer protocol version 3)
// but not "invariants" and "appendOnly", then setting the minWriterVersion to
// 2 or 3 will add "invariants" and "appendOnly", filling in the gaps for writer
// protocol version 3, and then we can downgrade to version 3.
val proposedNewProtocol = protocolBeforeUpdate.merge(newProtocolForLatestMetadata)

if (proposedNewProtocol != protocolBeforeUpdate) {
Expand Down Expand Up @@ -620,16 +636,14 @@ trait OptimisticTransactionImpl extends TransactionalWrite
Protocol(
readerVersionForNewProtocol,
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION)
.merge(newProtocolBeforeAddingFeatures)
.withFeatures(newFeaturesFromTableConf))
.withFeatures(newFeaturesFromTableConf)
.merge(newProtocolBeforeAddingFeatures))
}

// We are done with protocol versions and features, time to remove related table properties.
val configsWithoutProtocolProps = newMetadataTmp.configuration.filterNot {
case (k, _) => TableFeatureProtocolUtils.isTableProtocolProperty(k)
}
newMetadataTmp = newMetadataTmp.copy(configuration = configsWithoutProtocolProps)

// Table features Part 3: add automatically-enabled features by looking at the new table
// metadata.
//
Expand All @@ -639,6 +653,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
setNewProtocolWithFeaturesEnabledByMetadata(newMetadataTmp)
}

newMetadataTmp = newMetadataTmp.copy(configuration = configsWithoutProtocolProps)
Protocol.assertMetadataContainsNoProtocolProps(newMetadataTmp)

newMetadataTmp = MaterializedRowId.updateMaterializedColumnName(
protocol, oldMetadata = snapshot.metadata, newMetadataTmp)
newMetadataTmp = MaterializedRowCommitVersion.updateMaterializedColumnName(
Expand Down
19 changes: 14 additions & 5 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,17 @@ object TableFeature {
* Warning: Do not call `get` on this Map to get a specific feature because keys in this map are
* in lower cases. Use [[featureNameToFeature]] instead.
*/
private[delta] val allSupportedFeaturesMap: Map[String, TableFeature] = {
var features: Set[TableFeature] = Set(
private[delta] def allSupportedFeaturesMap: Map[String, TableFeature] = {
val testingFeaturesEnabled =
try {
SparkSession
.getActiveSession
.map(_.conf.get(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED))
.getOrElse(true)
} catch {
case _ => true
}
var features: Set[TableFeature] = Set(
AllowColumnDefaultsTableFeature,
AppendOnlyTableFeature,
ChangeDataFeedTableFeature,
Expand All @@ -355,7 +364,7 @@ object TableFeature {
InCommitTimestampTableFeature,
VariantTypeTableFeature,
CoordinatedCommitsTableFeature)
if (DeltaUtils.isTesting) {
if (DeltaUtils.isTesting && testingFeaturesEnabled) {
features ++= Set(
TestLegacyWriterFeature,
TestLegacyReaderWriterFeature,
Expand Down Expand Up @@ -405,8 +414,8 @@ object TableFeature {
protected def getDroppedExplicitFeatureNames(
newProtocol: Protocol,
oldProtocol: Protocol): Option[Set[String]] = {
val newFeatureNames = newProtocol.readerAndWriterFeatureNames
val oldFeatureNames = oldProtocol.readerAndWriterFeatureNames
val newFeatureNames = newProtocol.implicitlyAndExplicitlySupportedFeatures.map(_.name)
val oldFeatureNames = oldProtocol.implicitlyAndExplicitlySupportedFeatures.map(_.name)
Option(oldFeatureNames -- newFeatureNames).filter(_.nonEmpty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import scala.collection.mutable

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaOperations.Operation
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import com.fasterxml.jackson.annotation.JsonIgnore

import org.apache.spark.sql.SparkSession

/**
* Trait to be mixed into the [[Protocol]] case class to enable Table Features.
*
Expand Down Expand Up @@ -229,25 +229,16 @@ trait TableFeatureSupport { this: Protocol =>

/**
* Determine whether this protocol can be safely upgraded to a new protocol `to`. This means:
* - this protocol has reader protocol version less than or equals to `to`.
* - this protocol has writer protocol version less than or equals to `to`.
* - all features supported by this protocol are supported by `to`.
*
* Examples regarding feature status:
* - from `[appendOnly]` to `[appendOnly]` => allowed
* - from `[appendOnly, changeDataFeed]` to `[appendOnly]` => not allowed
* - from `[appendOnly]` to `[appendOnly, changeDataFeed]` => allowed
* - from `[appendOnly]` to `[appendOnly]` => allowed.
* - from `[appendOnly, changeDataFeed]` to `[appendOnly]` => not allowed.
* - from `[appendOnly]` to `[appendOnly, changeDataFeed]` => allowed.
*/
def canUpgradeTo(to: Protocol): Boolean = {
if (to.minReaderVersion < this.minReaderVersion) return false
if (to.minWriterVersion < this.minWriterVersion) return false

val thisFeatures =
this.readerAndWriterFeatureNames ++ this.implicitlySupportedFeatures.map(_.name)
val toFeatures = to.readerAndWriterFeatureNames ++ to.implicitlySupportedFeatures.map(_.name)
// all features supported by `this` are supported by `to`
thisFeatures.subsetOf(toFeatures)
}
def canUpgradeTo(to: Protocol): Boolean =
// All features supported by `this` are supported by `to`.
implicitlyAndExplicitlySupportedFeatures.subsetOf(to.implicitlyAndExplicitlySupportedFeatures)

/**
* Determine whether this protocol can be safely downgraded to a new protocol `to`.
Expand Down Expand Up @@ -287,12 +278,14 @@ trait TableFeatureSupport { this: Protocol =>
val mergedProtocol = Protocol(mergedReaderVersion, mergedWriterVersion)
.withReaderFeatures(mergedReaderFeatures)
.withWriterFeatures(mergedWriterFeatures)

if (mergedProtocol.supportsReaderFeatures || mergedProtocol.supportsWriterFeatures) {
mergedProtocol.withFeatures(mergedImplicitFeatures)
} else {
mergedProtocol
}
.withFeatures(mergedImplicitFeatures)

// The merged protocol is always normalized in order to represent the protocol
// with the weakest possible form. This enables backward compatibility.
// This is preceded by a denormalization step. This allows to fix invalid legacy Protocols.
// For example, (2, 3) is normalized to (1, 3). This is because there is no legacy feature
// in the set with reader version 2 unless the writer version is at least 5.
mergedProtocol.denormalizedNormalized
}

/**
Expand Down Expand Up @@ -323,63 +316,77 @@ trait TableFeatureSupport { this: Protocol =>
* the feature exists in the protocol. There is a relevant validation at
* [[AlterTableDropFeatureDeltaCommand]]. We also require targetFeature is removable.
*
* When the feature to remove is the last explicit table feature of the table we also remove the
* TableFeatures feature and downgrade the protocol.
* After removing the feature we normalize the protocol.
*/
def removeFeature(targetFeature: TableFeature): Protocol = {
require(targetFeature.isRemovable)
val currentProtocol = this.denormalized
val newProtocol = targetFeature match {
case f@(_: ReaderWriterFeature | _: LegacyReaderWriterFeature) =>
removeReaderWriterFeature(f)
currentProtocol.removeReaderWriterFeature(f)
case f@(_: WriterFeature | _: LegacyWriterFeature) =>
removeWriterFeature(f)
currentProtocol.removeWriterFeature(f)
case f =>
throw DeltaErrors.dropTableFeatureNonRemovableFeature(f.name)
}
newProtocol.downgradeProtocolVersionsIfNeeded
newProtocol.normalized
}


/**
* If the current protocol does not contain any non-legacy table features and the remaining
* set of legacy table features exactly matches a legacy protocol version, it downgrades the
* protocol to the minimum reader/writer versions required to support the protocol's legacy
* features.
* Protocol normalization is the process of converting a table features protocol to the weakest
* possible form. This primarily refers to converting a table features protocol to a legacy
* protocol. A Table Features protocol can be represented with the legacy representation only
* when the features set of the former exactly matches a legacy protocol.
*
* Normalization can also decrease the reader version of a table features protocol when it is
* higher than necessary.
*
* Note, when a table is initialized with table features (3, 7), by default there are no legacy
* features. After we remove the last native feature we downgrade the protocol to (1, 1).
* For example:
* (1, 7, AppendOnly, Invariants, CheckConstraints) -> (1, 3)
* (3, 7, RowTracking) -> (1, 7, RowTracking)
*/
def downgradeProtocolVersionsIfNeeded: Protocol = {
if (nativeReaderAndWriterFeatures.nonEmpty) {
val (minReaderVersion, minWriterVersion) =
TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures)
// It is guaranteed by the definitions of WriterFeature and ReaderFeature, that we cannot
// end up with invalid protocol versions such as (3, 3). Nevertheless,
// we double check it here.
val newProtocol =
Protocol(minReaderVersion, minWriterVersion).withFeatures(readerAndWriterFeatures)
assert(
newProtocol.supportsWriterFeatures,
s"Downgraded protocol should at least support writer features, but got $newProtocol.")
return newProtocol
}
def normalized: Protocol = {
// Normalization can only be applied to table feature protocols.
if (!supportsWriterFeatures) return this

val (minReaderVersion, minWriterVersion) =
TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures)
val newProtocol = Protocol(minReaderVersion, minWriterVersion)

assert(
!newProtocol.supportsReaderFeatures && !newProtocol.supportsWriterFeatures,
s"Downgraded protocol should not support table features, but got $newProtocol.")

// Ensure the legacy protocol supports features exactly as the current protocol.
if (this.implicitlyAndExplicitlySupportedFeatures ==
newProtocol.implicitlyAndExplicitlySupportedFeatures) {
newProtocol
} else {
this
Protocol(minReaderVersion, TABLE_FEATURES_MIN_WRITER_VERSION)
.withFeatures(readerAndWriterFeatures)
}
}

/**
* Protocol denormalization is the process of converting a legacy protocol to the
* the equivalent table features protocol. This is the inverse of protocol normalization.
* It can be used to allow operations on legacy protocols that yield result which
* cannot be represented anymore by a legacy protocol.
*/
def denormalized: Protocol = {
// Denormalization can only be applied to legacy protocols.
if (supportsWriterFeatures) return this

val (minReaderVersion, _) =
TableFeatureProtocolUtils.minimumRequiredVersions(implicitlySupportedFeatures.toSeq)

Protocol(minReaderVersion, TABLE_FEATURES_MIN_WRITER_VERSION)
.withFeatures(implicitlySupportedFeatures)
}

/**
* Helper method that applies both denormalization and normalization. This can be used to
* normalize invalid legacy protocols such as (2, 3), (1, 5). A legacy protocol is invalid
* when the version numbers are higher than required to support the implied feature set.
*/
def denormalizedNormalized: Protocol = denormalized.normalized

/**
* Check if a `feature` is supported by this protocol. This means either (a) the protocol does
* not support table features and implicitly supports the feature, or (b) the protocol supports
Expand Down
Loading

0 comments on commit 669dca9

Please sign in to comment.