From 669dca9c05f1ddb32ce1fd612ffd83eabb1cddd9 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou <93710326+andreaschat-db@users.noreply.github.com> Date: Wed, 17 Jul 2024 02:53:14 +0200 Subject: [PATCH] [Spark] Improve Delta Protocol Transitions (#2848) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Currently, protocol transitions can be hard to manage. A few examples: - It is hard to predict the output of certain operations. - Once a legacy protocol transitions to a Table Features protocol it is quite hard to transition back to a legacy protocol. - Adding a feature in a protocol and then removing it might lead to a different protocol. - Adding an explicit feature to a legacy protocol always leads to a table features protocol although it might not be necessary. - Dropping features from legacy protocols is not supported. As a result, the order the features are dropped matters. - Default protocol versions are ignored in some cases. - Enabling table features by default results in feature loss in legacy protocols. - CREATE TABLE ignores any legacy versions set if there is also a table feature in the definition. This PR proposes several protocol transition improvements in order to simplify user journeys. The high level proposal is the following: Two protocol representations with singular operational semantics. This means that we have two ways to represent a protocol: a) The legacy representation and b) the table features representation. The latter representation is more powerful than the former, i.e the table features representation can represent all legacy protocols but the opposite is not true. This is followed by three simple rules: 1. All operations should be allowed to be performed on both protocol representations and should yield equivalent results. 2. The result should always be represented with the weaker form when possible. 3. Conversely, if the result of an operation on a legacy protocol cannot be represented with the legacy representation, use the Table Features representation. **The PR introduces the following behavioural changes:** 1. Now all protocol operations are followed by denormalisation and then normalisation. Up to now, normalisation would only be performed after dropping a features. 2. Legacy features can now be dropped directly from a legacy protocol. The result is represented with table features if it cannot be represented with a legacy protocol. 3. Operations on table feature protocols now take into account the default versions. For example, enabling deletion vectors on table results to protocol `(3, 7, AppendOnly, Invariants, DeletionVectors)`. 5. Operations on table feature protocols now take into account any protocol versions set on the table. For example, creating a table with protocol `(1, 3)` and deletion vectors results to protocol `(3, 7, AppendOnly, Invariants, CheckConstraints, DeletionVectors)`. 6. It is not possible now to have a table features protocol without table features. For example, creating a table with `(3, 7)` and no table features is now normalised to `(1, 1)`. 7. Column Mapping can now be automatically enabled on legacy protocols when the mode is changed explicitly. ## How was this patch tested? Added `DeltaProtocolTransitionsSuite`. Also modified existing tests in `DeltaProtocolVersionSuite`. ## Does this PR introduce _any_ user-facing changes? Yes. --- .../resources/error/delta-error-classes.json | 14 - .../spark/sql/delta/DeltaColumnMapping.scala | 33 +- .../apache/spark/sql/delta/DeltaErrors.scala | 33 +- .../sql/delta/OptimisticTransaction.scala | 25 +- .../apache/spark/sql/delta/TableFeature.scala | 19 +- .../delta/actions/TableFeatureSupport.scala | 115 +-- .../spark/sql/delta/actions/actions.scala | 78 +- .../commands/alterDeltaTableCommands.scala | 22 +- .../sql/delta/sources/DeltaSQLConf.scala | 8 + .../spark/sql/delta/DeltaVariantSuite.scala | 5 +- .../io/delta/tables/DeltaTableSuite.scala | 16 +- .../spark/sql/delta/CloneTableSuiteBase.scala | 4 +- .../sql/delta/DeltaColumnMappingSuite.scala | 23 +- .../spark/sql/delta/DeltaErrorsSuite.scala | 40 +- .../delta/DeltaProtocolTransitionsSuite.scala | 683 ++++++++++++++++++ .../sql/delta/DeltaProtocolVersionSuite.scala | 401 +++++----- .../sql/delta/DeltaTableFeatureSuite.scala | 35 +- .../sql/delta/DeltaTimestampNTZSuite.scala | 6 +- .../sql/delta/RestoreTableSuiteBase.scala | 3 +- .../DropColumnMappingFeatureSuite.scala | 17 +- .../delta/schema/CheckConstraintsSuite.scala | 68 +- .../schema/InvariantEnforcementSuite.scala | 9 +- 22 files changed, 1142 insertions(+), 515 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 4924c8017ec..30290df5a3d 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -2514,20 +2514,6 @@ ], "sqlState" : "0AKDC" }, - "DELTA_UNSUPPORTED_COLUMN_MAPPING_PROTOCOL" : { - "message" : [ - "", - "Your current table protocol version does not support changing column mapping modes", - "using .", - "", - "Required Delta protocol version for column mapping:", - "", - "Your table's current Delta protocol version:", - "", - "" - ], - "sqlState" : "KD004" - }, "DELTA_UNSUPPORTED_COLUMN_MAPPING_SCHEMA_CHANGE" : { "message" : [ "", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index f1c784cd2ad..3729af5bf14 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -86,9 +86,6 @@ trait DeltaColumnMappingBase extends DeltaLogging { RowIdMetadataStructField.isRowIdColumn(field) || RowCommitVersion.MetadataStructField.isRowCommitVersionColumn(field) - def satisfiesColumnMappingProtocol(protocol: Protocol): Boolean = - protocol.isFeatureSupported(ColumnMappingTableFeature) - /** * Allow NameMapping -> NoMapping transition behind a feature flag. * Otherwise only NoMapping -> NameMapping is allowed. @@ -134,33 +131,9 @@ trait DeltaColumnMappingBase extends DeltaLogging { } val isChangingModeOnExistingTable = oldMappingMode != newMappingMode && !isCreatingNewTable - if (isChangingModeOnExistingTable) { - if (!allowMappingModeChange(oldMappingMode, newMappingMode)) { - throw DeltaErrors.changeColumnMappingModeNotSupported( - oldMappingMode.name, newMappingMode.name) - } else { - // legal mode change, now check if protocol is upgraded before or part of this txn - val caseInsensitiveMap = CaseInsensitiveMap(newMetadata.configuration) - val minReaderVersion = caseInsensitiveMap - .get(Protocol.MIN_READER_VERSION_PROP).map(_.toInt) - .getOrElse(oldProtocol.minReaderVersion) - val minWriterVersion = caseInsensitiveMap - .get(Protocol.MIN_WRITER_VERSION_PROP).map(_.toInt) - .getOrElse(oldProtocol.minWriterVersion) - var newProtocol = Protocol(minReaderVersion, minWriterVersion) - val satisfiesWriterVersion = minWriterVersion >= ColumnMappingTableFeature.minWriterVersion - val satisfiesReaderVersion = minReaderVersion >= ColumnMappingTableFeature.minReaderVersion - // This is an OR check because `readerFeatures` and `writerFeatures` can independently - // support table features. - if ((newProtocol.supportsReaderFeatures && satisfiesWriterVersion) || - (newProtocol.supportsWriterFeatures && satisfiesReaderVersion)) { - newProtocol = newProtocol.withFeature(ColumnMappingTableFeature) - } - - if (!satisfiesColumnMappingProtocol(newProtocol)) { - throw DeltaErrors.changeColumnMappingModeOnOldProtocol(oldProtocol) - } - } + if (isChangingModeOnExistingTable && !allowMappingModeChange(oldMappingMode, newMappingMode)) { + throw DeltaErrors.changeColumnMappingModeNotSupported( + oldMappingMode.name, newMappingMode.name) } val updatedMetadata = updateColumnMappingMetadata( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 64fab712313..3121578df68 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2043,41 +2043,18 @@ trait DeltaErrorsBase mode.name)) } - def changeColumnMappingModeOnOldProtocol(oldProtocol: Protocol): Throwable = { - val requiredProtocol = { - if (oldProtocol.supportsReaderFeatures || oldProtocol.supportsWriterFeatures) { - Protocol( - TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION, - TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(ColumnMappingTableFeature) - } else { - ColumnMappingTableFeature.minProtocolVersion - } - } - - new DeltaColumnMappingUnsupportedException( - errorClass = "DELTA_UNSUPPORTED_COLUMN_MAPPING_PROTOCOL", - messageParameters = Array( - s"${DeltaConfigs.COLUMN_MAPPING_MODE.key}", - s"$requiredProtocol", - s"$oldProtocol", - columnMappingAdviceMessage(requiredProtocol))) - } - - private def columnMappingAdviceMessage( + protected def columnMappingAdviceMessage( requiredProtocol: Protocol = ColumnMappingTableFeature.minProtocolVersion): String = { + val readerVersion = requiredProtocol.minReaderVersion + val writerVersion = requiredProtocol.minWriterVersion s""" |Please enable Column Mapping on your Delta table with mapping mode 'name'. |You can use one of the following commands. | - |If your table is already on the required protocol version: |ALTER TABLE table_name SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name') | - |If your table is not on the required protocol version and requires a protocol upgrade: - |ALTER TABLE table_name SET TBLPROPERTIES ( - | 'delta.columnMapping.mode' = 'name', - | 'delta.minReaderVersion' = '${requiredProtocol.minReaderVersion}', - | 'delta.minWriterVersion' = '${requiredProtocol.minWriterVersion}') + |Note, if your table is not on the required protocol version it will be upgraded. + |Column mapping requires at least protocol ($readerVersion, $writerVersion) |""".stripMargin } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index e875e381c45..670a171afe5 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -538,6 +538,22 @@ trait OptimisticTransactionImpl extends TransactionalWrite val newProtocolForLatestMetadata = Protocol(readerVersionAsTableProp, writerVersionAsTableProp) + + // The user-supplied protocol version numbers are treated as a group of features + // that must all be enabled. This ensures that the feature-enabling behavior is the + // same on Table Features-enabled protocols as on legacy protocols, i.e., exactly + // the same set of features are enabled. + // + // This is useful for supporting protocol downgrades to legacy protocol versions. + // When the protocol versions are explicitly set on table features protocol we may + // normalize to legacy protocol versions. Legacy protocol versions can only be + // used if a table supports *exactly* the set of features in that legacy protocol + // version, with no "gaps". By merging in the protocol features from a particular + // protocol version, we may end up with such a "gap-free" protocol. E.g. if a table + // has only table feature "checkConstraints" (added by writer protocol version 3) + // but not "invariants" and "appendOnly", then setting the minWriterVersion to + // 2 or 3 will add "invariants" and "appendOnly", filling in the gaps for writer + // protocol version 3, and then we can downgrade to version 3. val proposedNewProtocol = protocolBeforeUpdate.merge(newProtocolForLatestMetadata) if (proposedNewProtocol != protocolBeforeUpdate) { @@ -620,16 +636,14 @@ trait OptimisticTransactionImpl extends TransactionalWrite Protocol( readerVersionForNewProtocol, TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) - .merge(newProtocolBeforeAddingFeatures) - .withFeatures(newFeaturesFromTableConf)) + .withFeatures(newFeaturesFromTableConf) + .merge(newProtocolBeforeAddingFeatures)) } // We are done with protocol versions and features, time to remove related table properties. val configsWithoutProtocolProps = newMetadataTmp.configuration.filterNot { case (k, _) => TableFeatureProtocolUtils.isTableProtocolProperty(k) } - newMetadataTmp = newMetadataTmp.copy(configuration = configsWithoutProtocolProps) - // Table features Part 3: add automatically-enabled features by looking at the new table // metadata. // @@ -639,6 +653,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite setNewProtocolWithFeaturesEnabledByMetadata(newMetadataTmp) } + newMetadataTmp = newMetadataTmp.copy(configuration = configsWithoutProtocolProps) + Protocol.assertMetadataContainsNoProtocolProps(newMetadataTmp) + newMetadataTmp = MaterializedRowId.updateMaterializedColumnName( protocol, oldMetadata = snapshot.metadata, newMetadataTmp) newMetadataTmp = MaterializedRowCommitVersion.updateMaterializedColumnName( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index fd59a860e5b..a18c71ecedb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -332,8 +332,17 @@ object TableFeature { * Warning: Do not call `get` on this Map to get a specific feature because keys in this map are * in lower cases. Use [[featureNameToFeature]] instead. */ - private[delta] val allSupportedFeaturesMap: Map[String, TableFeature] = { - var features: Set[TableFeature] = Set( + private[delta] def allSupportedFeaturesMap: Map[String, TableFeature] = { + val testingFeaturesEnabled = + try { + SparkSession + .getActiveSession + .map(_.conf.get(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED)) + .getOrElse(true) + } catch { + case _ => true + } + var features: Set[TableFeature] = Set( AllowColumnDefaultsTableFeature, AppendOnlyTableFeature, ChangeDataFeedTableFeature, @@ -355,7 +364,7 @@ object TableFeature { InCommitTimestampTableFeature, VariantTypeTableFeature, CoordinatedCommitsTableFeature) - if (DeltaUtils.isTesting) { + if (DeltaUtils.isTesting && testingFeaturesEnabled) { features ++= Set( TestLegacyWriterFeature, TestLegacyReaderWriterFeature, @@ -405,8 +414,8 @@ object TableFeature { protected def getDroppedExplicitFeatureNames( newProtocol: Protocol, oldProtocol: Protocol): Option[Set[String]] = { - val newFeatureNames = newProtocol.readerAndWriterFeatureNames - val oldFeatureNames = oldProtocol.readerAndWriterFeatureNames + val newFeatureNames = newProtocol.implicitlyAndExplicitlySupportedFeatures.map(_.name) + val oldFeatureNames = oldProtocol.implicitlyAndExplicitlySupportedFeatures.map(_.name) Option(oldFeatureNames -- newFeatureNames).filter(_.nonEmpty) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala index 8ffd383b498..b9065499084 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/TableFeatureSupport.scala @@ -22,10 +22,10 @@ import scala.collection.mutable import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.DeltaOperations.Operation +import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION +import org.apache.spark.sql.delta.sources.DeltaSQLConf import com.fasterxml.jackson.annotation.JsonIgnore -import org.apache.spark.sql.SparkSession - /** * Trait to be mixed into the [[Protocol]] case class to enable Table Features. * @@ -229,25 +229,16 @@ trait TableFeatureSupport { this: Protocol => /** * Determine whether this protocol can be safely upgraded to a new protocol `to`. This means: - * - this protocol has reader protocol version less than or equals to `to`. - * - this protocol has writer protocol version less than or equals to `to`. * - all features supported by this protocol are supported by `to`. * * Examples regarding feature status: - * - from `[appendOnly]` to `[appendOnly]` => allowed - * - from `[appendOnly, changeDataFeed]` to `[appendOnly]` => not allowed - * - from `[appendOnly]` to `[appendOnly, changeDataFeed]` => allowed + * - from `[appendOnly]` to `[appendOnly]` => allowed. + * - from `[appendOnly, changeDataFeed]` to `[appendOnly]` => not allowed. + * - from `[appendOnly]` to `[appendOnly, changeDataFeed]` => allowed. */ - def canUpgradeTo(to: Protocol): Boolean = { - if (to.minReaderVersion < this.minReaderVersion) return false - if (to.minWriterVersion < this.minWriterVersion) return false - - val thisFeatures = - this.readerAndWriterFeatureNames ++ this.implicitlySupportedFeatures.map(_.name) - val toFeatures = to.readerAndWriterFeatureNames ++ to.implicitlySupportedFeatures.map(_.name) - // all features supported by `this` are supported by `to` - thisFeatures.subsetOf(toFeatures) - } + def canUpgradeTo(to: Protocol): Boolean = + // All features supported by `this` are supported by `to`. + implicitlyAndExplicitlySupportedFeatures.subsetOf(to.implicitlyAndExplicitlySupportedFeatures) /** * Determine whether this protocol can be safely downgraded to a new protocol `to`. @@ -287,12 +278,14 @@ trait TableFeatureSupport { this: Protocol => val mergedProtocol = Protocol(mergedReaderVersion, mergedWriterVersion) .withReaderFeatures(mergedReaderFeatures) .withWriterFeatures(mergedWriterFeatures) - - if (mergedProtocol.supportsReaderFeatures || mergedProtocol.supportsWriterFeatures) { - mergedProtocol.withFeatures(mergedImplicitFeatures) - } else { - mergedProtocol - } + .withFeatures(mergedImplicitFeatures) + + // The merged protocol is always normalized in order to represent the protocol + // with the weakest possible form. This enables backward compatibility. + // This is preceded by a denormalization step. This allows to fix invalid legacy Protocols. + // For example, (2, 3) is normalized to (1, 3). This is because there is no legacy feature + // in the set with reader version 2 unless the writer version is at least 5. + mergedProtocol.denormalizedNormalized } /** @@ -323,63 +316,77 @@ trait TableFeatureSupport { this: Protocol => * the feature exists in the protocol. There is a relevant validation at * [[AlterTableDropFeatureDeltaCommand]]. We also require targetFeature is removable. * - * When the feature to remove is the last explicit table feature of the table we also remove the - * TableFeatures feature and downgrade the protocol. + * After removing the feature we normalize the protocol. */ def removeFeature(targetFeature: TableFeature): Protocol = { require(targetFeature.isRemovable) + val currentProtocol = this.denormalized val newProtocol = targetFeature match { case f@(_: ReaderWriterFeature | _: LegacyReaderWriterFeature) => - removeReaderWriterFeature(f) + currentProtocol.removeReaderWriterFeature(f) case f@(_: WriterFeature | _: LegacyWriterFeature) => - removeWriterFeature(f) + currentProtocol.removeWriterFeature(f) case f => throw DeltaErrors.dropTableFeatureNonRemovableFeature(f.name) } - newProtocol.downgradeProtocolVersionsIfNeeded + newProtocol.normalized } + /** - * If the current protocol does not contain any non-legacy table features and the remaining - * set of legacy table features exactly matches a legacy protocol version, it downgrades the - * protocol to the minimum reader/writer versions required to support the protocol's legacy - * features. + * Protocol normalization is the process of converting a table features protocol to the weakest + * possible form. This primarily refers to converting a table features protocol to a legacy + * protocol. A Table Features protocol can be represented with the legacy representation only + * when the features set of the former exactly matches a legacy protocol. + * + * Normalization can also decrease the reader version of a table features protocol when it is + * higher than necessary. * - * Note, when a table is initialized with table features (3, 7), by default there are no legacy - * features. After we remove the last native feature we downgrade the protocol to (1, 1). + * For example: + * (1, 7, AppendOnly, Invariants, CheckConstraints) -> (1, 3) + * (3, 7, RowTracking) -> (1, 7, RowTracking) */ - def downgradeProtocolVersionsIfNeeded: Protocol = { - if (nativeReaderAndWriterFeatures.nonEmpty) { - val (minReaderVersion, minWriterVersion) = - TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures) - // It is guaranteed by the definitions of WriterFeature and ReaderFeature, that we cannot - // end up with invalid protocol versions such as (3, 3). Nevertheless, - // we double check it here. - val newProtocol = - Protocol(minReaderVersion, minWriterVersion).withFeatures(readerAndWriterFeatures) - assert( - newProtocol.supportsWriterFeatures, - s"Downgraded protocol should at least support writer features, but got $newProtocol.") - return newProtocol - } + def normalized: Protocol = { + // Normalization can only be applied to table feature protocols. + if (!supportsWriterFeatures) return this val (minReaderVersion, minWriterVersion) = TableFeatureProtocolUtils.minimumRequiredVersions(readerAndWriterFeatures) val newProtocol = Protocol(minReaderVersion, minWriterVersion) - assert( - !newProtocol.supportsReaderFeatures && !newProtocol.supportsWriterFeatures, - s"Downgraded protocol should not support table features, but got $newProtocol.") - - // Ensure the legacy protocol supports features exactly as the current protocol. if (this.implicitlyAndExplicitlySupportedFeatures == newProtocol.implicitlyAndExplicitlySupportedFeatures) { newProtocol } else { - this + Protocol(minReaderVersion, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeatures(readerAndWriterFeatures) } } + /** + * Protocol denormalization is the process of converting a legacy protocol to the + * the equivalent table features protocol. This is the inverse of protocol normalization. + * It can be used to allow operations on legacy protocols that yield result which + * cannot be represented anymore by a legacy protocol. + */ + def denormalized: Protocol = { + // Denormalization can only be applied to legacy protocols. + if (supportsWriterFeatures) return this + + val (minReaderVersion, _) = + TableFeatureProtocolUtils.minimumRequiredVersions(implicitlySupportedFeatures.toSeq) + + Protocol(minReaderVersion, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeatures(implicitlySupportedFeatures) + } + + /** + * Helper method that applies both denormalization and normalization. This can be used to + * normalize invalid legacy protocols such as (2, 3), (1, 5). A legacy protocol is invalid + * when the version numbers are higher than required to support the implied feature set. + */ + def denormalizedNormalized: Protocol = denormalized.normalized + /** * Check if a `feature` is supported by this protocol. This means either (a) the protocol does * not support table features and implicitly supports the feature, or (b) the protocol supports diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index 0d4af5dea30..1d76f3eb818 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -207,13 +207,22 @@ object Protocol { writerFeatures = if (supportsWriterFeatures(minWriterVersion)) Some(Set()) else None) } + /** Returns the required protocol for a given feature. Takes into account dependent features. */ def forTableFeature(tf: TableFeature): Protocol = { - val writerFeatures = Some(Set(tf.name)) // every table feature is a writer feature - val readerFeatures = if (tf.isReaderWriterFeature) writerFeatures else None - val minReaderVersion = if (readerFeatures.isDefined) TABLE_FEATURES_MIN_READER_VERSION else 1 + // Every table feature is a writer feature. + val writerFeatures = tf.requiredFeatures + tf + val readerFeatures = writerFeatures.filter(f => f.isReaderWriterFeature && !f.isLegacyFeature) + val writerFeaturesNames = writerFeatures.map(_.name) + val readerFeaturesNames = readerFeatures.map(_.name) + val minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION + val minReaderVersion = (readerFeatures.map(_.minReaderVersion) + 1).max - new Protocol(minReaderVersion, minWriterVersion, readerFeatures, writerFeatures) + new Protocol( + minReaderVersion, + minWriterVersion, + readerFeatures = Option(readerFeaturesNames).filter(_.nonEmpty), + writerFeatures = Some(writerFeaturesNames)) } /** @@ -254,7 +263,16 @@ object Protocol { val (readerVersion, writerVersion, enabledFeatures) = minProtocolComponentsFromMetadata(spark, metadata) - Protocol(readerVersion, writerVersion).withFeatures(enabledFeatures) + // New table protocols should always be denormalized and then normalized to convert the + // protocol to the weakest possible form. This means either converting a table features + // protocol to a legacy protocol or reducing the versions of a table features protocol. + // For example: + // 1) (3, 7, RowTracking) is normalized to (1, 7, RowTracking). + // 2) (3, 7, AppendOnly, Invariants) is normalized to (1, 2). + // 3) (2, 3) is normalized to (1, 3). + Protocol(readerVersion, writerVersion) + .withFeatures(enabledFeatures) + .denormalizedNormalized } /** @@ -363,7 +381,23 @@ object Protocol { val finalWriterVersion = Seq(1, writerVersionFromFeatures, writerVersionFromTableConfOpt.getOrElse(0)).max - (finalReaderVersion, finalWriterVersion, allEnabledFeatures) + // If the user explicitly sets the table versions, we need to take into account the + // relevant implicit features. + val implicitFeaturesFromTableConf = + (readerVersionFromTableConfOpt, writerVersionFromTableConfOpt) match { + case (Some(readerVersion), Some(writerVersion)) => + // We cannot have a table features reader version if the protocol does not + // support writer features. + val sanitizedReaderVersion = if (supportsWriterFeatures(writerVersion)) { + readerVersion + } else { + Math.min(2, readerVersion) + } + Protocol(sanitizedReaderVersion, writerVersion).implicitlySupportedFeatures + case _ => Set.empty + } + + (finalReaderVersion, finalWriterVersion, allEnabledFeatures ++ implicitFeaturesFromTableConf) } /** @@ -415,7 +449,7 @@ object Protocol { } /** Assert a table metadata contains no protocol-related table properties. */ - private def assertMetadataContainsNoProtocolProps(metadata: Metadata): Unit = { + def assertMetadataContainsNoProtocolProps(metadata: Metadata): Unit = { assert( !metadata.configuration.contains(MIN_READER_VERSION_PROP), "Should not have the " + @@ -452,16 +486,32 @@ object Protocol { spark: SparkSession, metadata: Metadata, current: Protocol): Option[Protocol] = { - assertMetadataContainsNoProtocolProps(metadata) - val (readerVersion, writerVersion, minRequiredFeatures) = minProtocolComponentsFromAutomaticallyEnabledFeatures(spark, metadata, current) + // If the user sets the protocol versions we need to take it account. In general, + // enabling legacy features on legacy protocols results to pumping up the protocol + // versions. However, setting table feature protocol versions while enabling + // legacy features results to only enabling the requested features. For example: + // 1) Create table with (1, 2), then ALTER TABLE with DeltaConfigs.CHANGE_DATA_FEED.key = true + // results to (1, 4). + // 2) Alternatively, Create table with (1, 2), then + // ALTER TABLE set versions (1, 7) and DeltaConfigs.CHANGE_DATA_FEED.key = true results + // to (1, 7, AppendOnly, Invariants, CDF). + val readerVersionFromConf = + Protocol.getReaderVersionFromTableConf(metadata.configuration).getOrElse(readerVersion) + val writerVersionFromConf = + Protocol.getWriterVersionFromTableConf(metadata.configuration).getOrElse(writerVersion) + + val finalReaderVersion = + Seq(readerVersion, readerVersionFromConf, current.minReaderVersion).max + val finalWriterVersion = + Seq(writerVersion, writerVersionFromConf, current.minWriterVersion).max + // Increment the reader and writer version to accurately add enabled legacy table features - // either to the implicitly enabled table features or the table feature lists - val required = Protocol( - readerVersion.max(current.minReaderVersion), writerVersion.max(current.minWriterVersion)) - .withFeatures(minRequiredFeatures) + // either to the implicitly enabled table features or the table feature lists. + val required = + Protocol(finalReaderVersion, finalWriterVersion).withFeatures(minRequiredFeatures) if (!required.canUpgradeTo(current)) { // When the current protocol does not satisfy metadata requirement, some additional features // must be supported by the protocol. We assert those features can actually perform the @@ -493,7 +543,7 @@ object Protocol { .collect { case f: FeatureAutomaticallyEnabledByMetadata => f } .partition(_.automaticallyUpdateProtocolOfExistingTables) if (nonAutoUpdateCapableFeatures.nonEmpty) { - // The "current features" we give the user are which from the original protocol, plus + // The "current features" we give to the user are from the original protocol, plus // features newly supported by table properties in the current transaction, plus // metadata-enabled features that are auto-update capable. The first two are provided by // `currentFeatures`. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index f636cb85de7..d1b521995fa 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -260,10 +260,12 @@ case class AlterTableUnsetPropertiesDeltaCommand( * If transactions do run for longer than this period while this command is run, then this * can lead to data corruption. * - * Note, legacy features can be removed as well, as long as the protocol supports Table Features. - * This will not downgrade protocol versions but only remove the feature from the - * supported features list. For example, removing legacyRWFeature from - * (3, 7, [legacyRWFeature], [legacyRWFeature]) will result in (3, 7, [], []) and not (1, 1). + * Note, legacy features can be removed as well. When removing a legacy feature from a legacy + * protocol, if the result cannot be represented with a legacy representation we use the + * table features representation. For example, removing Invariants from (1, 3) results to + * (1, 7, None, [AppendOnly, CheckConstraints]). Adding back Invariants to the protocol is + * normalized back to (1, 3). This allows to easily transitions back and forth between legacy + * protocols and table feature protocols. */ case class AlterTableDropFeatureDeltaCommand( table: DeltaTableV2, @@ -300,8 +302,12 @@ case class AlterTableDropFeatureDeltaCommand( } // Check whether the protocol contains the feature in either the writer features list or - // the reader+writer features list. - if (!table.initialSnapshot.protocol.readerAndWriterFeatureNames.contains(featureName)) { + // the reader+writer features list. Note, protocol needs to denormalized to allow dropping + // features from legacy protocols. + val protocol = table.initialSnapshot.protocol + val protocolContainsFeatureName = + protocol.implicitlyAndExplicitlySupportedFeatures.map(_.name).contains(featureName) + if (!protocolContainsFeatureName) { throw DeltaErrors.dropTableFeatureFeatureNotSupportedByProtocol(featureName) } @@ -311,7 +317,7 @@ case class AlterTableDropFeatureDeltaCommand( // Validate that the `removableFeature` is not a dependency of any other feature that is // enabled on the table. - dependentFeatureCheck(removableFeature, table.initialSnapshot.protocol) + dependentFeatureCheck(removableFeature, protocol) // The removableFeature.preDowngradeCommand needs to adhere to the following requirements: // @@ -376,7 +382,7 @@ case class AlterTableDropFeatureDeltaCommand( } } - txn.updateProtocol(txn.protocol.removeFeature(removableFeature)) + txn.updateProtocol(txn.protocol.denormalized.removeFeature(removableFeature)) txn.commit(Nil, DeltaOperations.DropTableFeature(featureName, truncateHistory)) Nil } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index fd79c052512..6ae3759d986 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -423,6 +423,14 @@ trait DeltaSQLConfBase { .checkValues(Set(1, 2, 3)) .createWithDefault(1) + val TABLE_FEATURES_TEST_FEATURES_ENABLED = + buildConf("tableFeatures.testFeatures.enabled") + .internal() + .doc("Controls whether test features are enabled in testing mode. " + + "This config is only used for testing purposes. ") + .booleanConf + .createWithDefault(true) + val DELTA_MAX_SNAPSHOT_LINEAGE_LENGTH = buildConf("maxSnapshotLineageLength") .internal() diff --git a/spark/src/test/scala-spark-master/org/apache/spark/sql/delta/DeltaVariantSuite.scala b/spark/src/test/scala-spark-master/org/apache/spark/sql/delta/DeltaVariantSuite.scala index f83be425b53..70cf9d711ca 100644 --- a/spark/src/test/scala-spark-master/org/apache/spark/sql/delta/DeltaVariantSuite.scala +++ b/spark/src/test/scala-spark-master/org/apache/spark/sql/delta/DeltaVariantSuite.scala @@ -52,10 +52,7 @@ class DeltaVariantSuite sql("CREATE TABLE tbl(s STRING, v VARIANT) USING DELTA") sql("INSERT INTO tbl (SELECT 'foo', parse_json(cast(id + 99 as string)) FROM range(1))") assert(spark.table("tbl").selectExpr("v::int").head == Row(99)) - assert( - getProtocolForTable("tbl") == - VariantTypeTableFeature.minProtocolVersion.withFeature(VariantTypeTableFeature) - ) + assert(getProtocolForTable("tbl").readerAndWriterFeatures.contains(VariantTypeTableFeature)) } } diff --git a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala index 1a542d2b5dd..0c7eab420aa 100644 --- a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala +++ b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala @@ -22,7 +22,7 @@ import java.util.Locale import scala.language.postfixOps // scalastyle:off import.ordering.noEmptyLine -import org.apache.spark.sql.delta.{DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, TestReaderWriterFeature, TestWriterFeature} +import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestWriterFeature} import org.apache.spark.sql.delta.actions.{ Metadata, Protocol } import org.apache.spark.sql.delta.storage.LocalLogStore import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -574,13 +574,17 @@ class DeltaTableHadoopOptionsSuite extends QueryTest // update the protocol to support a writer feature. val table = DeltaTable.forPath(spark, path, fsOptions) table.addFeatureSupport(TestWriterFeature.name) - assert(log.update().protocol === Protocol(1, 7) - .merge(Protocol(1, 2)).withFeature(TestWriterFeature)) + assert(log.update().protocol === Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestWriterFeature))) table.addFeatureSupport(TestReaderWriterFeature.name) assert( - log.update().protocol === Protocol(3, 7) - .merge(Protocol(1, 2)) - .withFeatures(Seq(TestWriterFeature, TestReaderWriterFeature))) + log.update().protocol === Protocol(3, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestWriterFeature, + TestReaderWriterFeature))) // update the protocol again with invalid feature name. assert(intercept[DeltaTableFeatureException] { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala index 51abcbaa4e9..1ddc509e255 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CloneTableSuiteBase.scala @@ -690,10 +690,10 @@ trait CloneTableSuiteBase extends QueryTest cloneTest("clones take protocol from the source", TAG_HAS_SHALLOW_CLONE, TAG_MODIFY_PROTOCOL, TAG_CHANGE_COLUMN_MAPPING_MODE) { (source, clone) => - // Change protocol versions of (read, write) = (2, 3). We cannot initialize this to (0, 0) + // Change protocol versions of (read, write) = (2, 5). We cannot initialize this to (0, 0) // because min reader and writer versions are at least 1. val defaultNewTableProtocol = Protocol.forNewTable(spark, metadataOpt = None) - val sourceProtocol = Protocol(2, 3) + val sourceProtocol = Protocol(2, 5) // Make sure this is actually an upgrade. Downgrades are not supported, and if it's the same // version, we aren't testing anything there. assert(sourceProtocol.minWriterVersion > defaultNewTableProtocol.minWriterVersion && diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala index d360629ae43..de02e1b66a5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala @@ -1355,18 +1355,6 @@ class DeltaColumnMappingSuite extends QueryTest } } - test("legal mode change without explicit upgrade") { - val e = intercept[UnsupportedOperationException] { - withTable("t1") { - createTableWithSQLAPI("t1") - alterTableWithProps("t1", props = Map( - DeltaConfigs.COLUMN_MAPPING_MODE.key -> "name")) - } - } - assert(e.getMessage.contains("Your current table protocol version does not" + - " support changing column mapping modes")) - } - test("getPhysicalNameFieldMap") { // To keep things simple, we use schema `schemaWithPhysicalNamesNested` such that the // physical name is just the logical name repeated three times. @@ -1907,8 +1895,7 @@ class DeltaColumnMappingSuite extends QueryTest s"""CREATE TABLE $testTableName |USING DELTA |TBLPROPERTIES( - |'$minReaderKey' = '2', - |'$minWriterKey' = '7' + |'${DeltaConfigs.ROW_TRACKING_ENABLED.key}' = 'true' |) |AS SELECT * FROM RANGE(1) |""".stripMargin) @@ -1920,6 +1907,14 @@ class DeltaColumnMappingSuite extends QueryTest s"""ALTER TABLE $testTableName SET TBLPROPERTIES( |'$columnMappingMode'='name' |)""".stripMargin) + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(testTableName)) + assert(deltaLog.update().protocol === Protocol(2, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ColumnMappingTableFeature, + RowTrackingFeature + ))) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index 4891eadf698..168f48161c5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -25,6 +25,7 @@ import java.util.Locale import scala.sys.process.Process // scalastyle:off import.ordering.noEmptyLine +// scalastyle:off line.size.limit import org.apache.spark.sql.delta.DeltaErrors.generateDocsLink import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol} import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION} @@ -584,45 +585,6 @@ trait DeltaErrorsSuiteBase "'spark.databricks.delta.properties.defaults.columnMapping.mode' cannot be set to `id` " + "when using CONVERT TO DELTA.")) } - { - val oldAndNew = Seq( - (Protocol(2, 4), ColumnMappingTableFeature.minProtocolVersion), - ( - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION), - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(ColumnMappingTableFeature))) - for ((oldProtocol, newProtocol) <- oldAndNew) { - val e = intercept[DeltaColumnMappingUnsupportedException] { - throw DeltaErrors.changeColumnMappingModeOnOldProtocol(oldProtocol) - } - // scalastyle:off line.size.limit - checkErrorMessage(e, None, None, - Some( - s""" - |Your current table protocol version does not support changing column mapping modes - |using delta.columnMapping.mode. - | - |Required Delta protocol version for column mapping: - |${newProtocol.toString} - |Your table's current Delta protocol version: - |${oldProtocol.toString} - | - |Please enable Column Mapping on your Delta table with mapping mode 'name'. - |You can use one of the following commands. - | - |If your table is already on the required protocol version: - |ALTER TABLE table_name SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name') - | - |If your table is not on the required protocol version and requires a protocol upgrade: - |ALTER TABLE table_name SET TBLPROPERTIES ( - | 'delta.columnMapping.mode' = 'name', - | 'delta.minReaderVersion' = '${newProtocol.minReaderVersion}', - | 'delta.minWriterVersion' = '${newProtocol.minWriterVersion}') - |""".stripMargin) - ) - // scalastyle:off line.size.limit - } - } { val e = intercept[DeltaColumnMappingUnsupportedException] { throw DeltaErrors.schemaChangeDuringMappingModeChangeNotSupported( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala new file mode 100644 index 00000000000..e94c1cae114 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala @@ -0,0 +1,683 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN +import org.apache.spark.sql.delta.actions.Protocol +import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.{TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION} +import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.commands.AlterTableDropFeatureDeltaCommand +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.test.DeltaTestImplicits._ + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +class DeltaProtocolTransitionsSuite + extends QueryTest + with SharedSparkSession + with DeltaSQLCommandTest { + + override def beforeAll(): Unit = { + super.beforeAll() + spark.conf.set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, "false") + } + + protected def testProtocolTransition( + createTableColumns: Seq[(String, String)] = Seq.empty, + createTableGeneratedColumns: Seq[(String, String, String)] = Seq.empty, + createTableProperties: Seq[(String, String)] = Seq.empty, + alterTableProperties: Seq[(String, String)] = Seq.empty, + dropFeatures: Seq[TableFeature] = Seq.empty, + expectedProtocol: Protocol): Unit = { + + withTempDir { dir => + val deltaLog = DeltaLog.forTable(spark, dir) + + val tableBuilder = io.delta.tables.DeltaTable.create(spark) + tableBuilder.tableName(s"delta.`$dir`") + + createTableColumns.foreach { c => + tableBuilder.addColumn(c._1, c._2) + } + + createTableGeneratedColumns.foreach { c => + val columnBuilder = io.delta.tables.DeltaTable.columnBuilder(spark, c._1) + columnBuilder.dataType(c._2) + columnBuilder.generatedAlwaysAs(c._3) + tableBuilder.addColumn(columnBuilder.build()) + } + + createTableProperties.foreach { p => + tableBuilder.property(p._1, p._2) + } + + tableBuilder.location(dir.getCanonicalPath) + tableBuilder.execute() + + if (alterTableProperties.nonEmpty) { + sql( + s"""ALTER TABLE delta.`${deltaLog.dataPath}` + |SET TBLPROPERTIES ( + |${alterTableProperties.map(p => s"'${p._1}' = '${p._2}'").mkString(",")} + |)""".stripMargin) + } + + // Drop features. + dropFeatures.foreach { f => + sql(s"ALTER TABLE delta.`${deltaLog.dataPath}` DROP FEATURE ${f.name}") + } + + assert(deltaLog.update().protocol === expectedProtocol) + } + } + + test("CREATE TABLE default protocol versions") { + testProtocolTransition( + expectedProtocol = Protocol(1, 2)) + + // Setting table versions overrides protocol versions. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 1.toString)), + expectedProtocol = Protocol(1, 1)) + } + + test("CREATE TABLE normalization") { + // Table features protocols without features are normalized to (1, 1). + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 3.toString), + ("delta.minWriterVersion", 7.toString)), + expectedProtocol = Protocol(1, 1)) + + // Default protocol is taken into account. + testProtocolTransition( + createTableProperties = Seq( + (s"delta.feature.${TestRemovableWriterFeature.name}", "supported")), + expectedProtocol = Protocol(1, 7).withFeatures(Seq( + InvariantsTableFeature, + AppendOnlyTableFeature, + TestRemovableWriterFeature))) + + // Default protocol is not taken into account because we explicitly set the protocol versions. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 3.toString), + ("delta.minWriterVersion", 7.toString), + (s"delta.feature.${TestRemovableWriterFeature.name}", "supported")), + expectedProtocol = Protocol(1, 7).withFeature(TestRemovableWriterFeature)) + + // Reader version normalizes correctly. + testProtocolTransition( + createTableProperties = Seq( + (s"delta.feature.${TestRemovableWriterFeature.name}", "supported"), + (s"delta.feature.${ColumnMappingTableFeature.name}", "supported")), + expectedProtocol = + Protocol(2, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestRemovableWriterFeature, + ColumnMappingTableFeature))) + + // Reader version denormalizes correctly. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 7.toString), + (s"delta.feature.${TestRemovableReaderWriterFeature.name}", "supported")), + expectedProtocol = Protocol(3, 7).withFeature(TestRemovableReaderWriterFeature)) + + // Reader version denormalizes correctly. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 2.toString), + ("delta.minWriterVersion", 7.toString), + (s"delta.feature.${TestRemovableReaderWriterFeature.name}", "supported")), + expectedProtocol = Protocol(3, 7).withFeature(TestRemovableReaderWriterFeature)) + } + + for ((readerVersion, writerVersion) <- Seq((2, 1), (2, 2), (2, 3), (2, 4), (1, 5))) + test("Invalid legacy protocol normalization" + + s" - invalidProtocol($readerVersion, $writerVersion)") { + + val expectedReaderVersion = 1 + val expectedWriterVersion = Math.min(writerVersion, 4) + + withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { + // Base case. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", readerVersion.toString), + ("delta.minWriterVersion", writerVersion.toString)), + expectedProtocol = Protocol(expectedReaderVersion, expectedWriterVersion)) + + // Invalid legacy versions are normalized in default confs. + withSQLConf( + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION.key -> readerVersion.toString, + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> writerVersion.toString) { + testProtocolTransition( + expectedProtocol = Protocol(expectedReaderVersion, expectedWriterVersion)) + } + + // Invalid legacy versions are normalized in alter table. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 1.toString)), + alterTableProperties = Seq( + ("delta.minReaderVersion", readerVersion.toString), + ("delta.minWriterVersion", writerVersion.toString)), + expectedProtocol = Protocol(expectedReaderVersion, expectedWriterVersion)) + } + } + + test("ADD FEATURE normalization") { + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 1.toString)), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + expectedProtocol = Protocol(1, 4)) + + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 2.toString)), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + expectedProtocol = Protocol(1, 4)) + + // Setting lower legacy versions is noop. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 2.toString)), + expectedProtocol = Protocol(1, 4)) + + // Setting the same legacy versions is noop. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + expectedProtocol = Protocol(1, 4)) + + // Setting legacy versions is an ADD operation. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 6.toString)), + alterTableProperties = Seq( + ("delta.minReaderVersion", 2.toString), + ("delta.minWriterVersion", 5.toString)), + expectedProtocol = Protocol(2, 6)) + + // The inverse of the above test. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 2.toString), + ("delta.minWriterVersion", 5.toString)), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 6.toString)), + expectedProtocol = Protocol(2, 6)) + + // Adding a legacy protocol to a table features protocol adds the features + // of the former to the later. + testProtocolTransition( + createTableProperties = Seq( + (s"delta.feature.${TestWriterFeature.name}", "supported")), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 3.toString)), + expectedProtocol = Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + CheckConstraintsTableFeature, + InvariantsTableFeature, + TestWriterFeature))) + + // Variation of the above. + testProtocolTransition( + createTableProperties = Seq( + (s"delta.feature.${TestWriterFeature.name}", "supported"), + (s"delta.feature.${IdentityColumnsTableFeature.name}", "supported")), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 3.toString)), + expectedProtocol = Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + CheckConstraintsTableFeature, + InvariantsTableFeature, + IdentityColumnsTableFeature, + TestWriterFeature))) + + // New feature is added to the table protocol features. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 3.toString)), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 7.toString), + (DeltaConfigs.CHANGE_DATA_FEED.key, true.toString)), + expectedProtocol = Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + CheckConstraintsTableFeature, + ChangeDataFeedTableFeature))) + + // Addition result is normalized. + testProtocolTransition( + createTableProperties = Seq( + (s"delta.feature.${InvariantsTableFeature.name}", "supported")), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 2.toString)), + expectedProtocol = Protocol(1, 2)) + + // Variation of the above. + testProtocolTransition( + createTableProperties = Seq( + (s"delta.feature.${CheckConstraintsTableFeature.name}", "supported")), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 2.toString)), + expectedProtocol = Protocol(1, 3)) + + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 2.toString)), + alterTableProperties = Seq( + (s"delta.feature.${CheckConstraintsTableFeature.name}", "supported")), + expectedProtocol = Protocol(1, 3)) + + withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + alterTableProperties = Seq( + (s"delta.feature.${ColumnMappingTableFeature.name}", "supported")), + expectedProtocol = Protocol(2, 5)) + + + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 7.toString), + (DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")), + expectedProtocol = Protocol(2, 5)) + } + } + + test("DROP FEATURE normalization") { + // Can drop features on legacy protocols and the result is normalized. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 3.toString)), + dropFeatures = Seq(CheckConstraintsTableFeature), + expectedProtocol = Protocol(1, 2)) + + // If the removal result does not match a legacy version use the denormalized form. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + dropFeatures = Seq(CheckConstraintsTableFeature), + expectedProtocol = Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + GeneratedColumnsTableFeature, + ChangeDataFeedTableFeature))) + + // Normalization after dropping a table feature. + testProtocolTransition( + createTableProperties = Seq( + (s"delta.feature.${TestRemovableWriterFeature.name}", "supported")), + dropFeatures = Seq(TestRemovableWriterFeature), + expectedProtocol = Protocol(1, 2)) + + // Variation of the above. Because the default protocol is overwritten the result + // is normalized to (1, 1). + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 7.toString), + (s"delta.feature.${TestRemovableWriterFeature.name}", "supported")), + dropFeatures = Seq(TestRemovableWriterFeature), + expectedProtocol = Protocol(1, 1)) + + // Reader version is normalized correctly to 2 after dropping the reader feature. + testProtocolTransition( + createTableProperties = Seq( + (s"delta.feature.${ColumnMappingTableFeature.name}", "supported"), + (s"delta.feature.${TestRemovableWriterFeature.name}", "supported"), + (s"delta.feature.${TestRemovableReaderWriterFeature.name}", "supported")), + dropFeatures = Seq(TestRemovableReaderWriterFeature), + expectedProtocol = Protocol(2, 7).withFeatures(Seq( + InvariantsTableFeature, + AppendOnlyTableFeature, + ColumnMappingTableFeature, + TestRemovableWriterFeature))) + + testProtocolTransition( + createTableProperties = Seq( + (s"delta.feature.${TestRemovableWriterFeature.name}", "supported"), + (s"delta.feature.${TestRemovableReaderWriterFeature.name}", "supported")), + dropFeatures = Seq(TestRemovableReaderWriterFeature), + expectedProtocol = Protocol(1, 7).withFeatures(Seq( + InvariantsTableFeature, + AppendOnlyTableFeature, + TestRemovableWriterFeature))) + + withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 2.toString), + ("delta.minWriterVersion", 5.toString)), + dropFeatures = Seq(ColumnMappingTableFeature), + expectedProtocol = Protocol(1, 4)) + } + } + + test("Default Enabled native features") { + withSQLConf(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> "true") { + // Table protocol is taken into account when default table features exist. + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + expectedProtocol = Protocol(3, 7).withFeatures(Seq( + DeletionVectorsTableFeature, + InvariantsTableFeature, + AppendOnlyTableFeature, + CheckConstraintsTableFeature, + ChangeDataFeedTableFeature, + GeneratedColumnsTableFeature))) + + // Default protocol versions are taken into account when default features exist. + testProtocolTransition( + expectedProtocol = Protocol(3, 7).withFeatures(Seq( + DeletionVectorsTableFeature, + InvariantsTableFeature, + AppendOnlyTableFeature))) + } + + withSQLConf( + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION.key -> 1.toString, + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> 7.toString, + DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> "true") { + testProtocolTransition( + expectedProtocol = Protocol(3, 7).withFeature(DeletionVectorsTableFeature)) + } + } + + test("Default Enabled legacy features") { + testProtocolTransition( + createTableProperties = Seq((DeltaConfigs.CHANGE_DATA_FEED.key, true.toString)), + expectedProtocol = Protocol(1, 4)) + + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 3.toString), + (DeltaConfigs.CHANGE_DATA_FEED.key, true.toString)), + expectedProtocol = Protocol(1, 4)) + + withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true") { + testProtocolTransition(expectedProtocol = Protocol(1, 4)) + } + + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 7.toString), + (DeltaConfigs.CHANGE_DATA_FEED.key, true.toString)), + expectedProtocol = Protocol(1, 7).withFeature(ChangeDataFeedTableFeature)) + + withSQLConf( + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION.key -> 1.toString, + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> 7.toString, + DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true") { + testProtocolTransition( + expectedProtocol = Protocol(1, 7).withFeature(ChangeDataFeedTableFeature)) + } + + withSQLConf( + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION.key -> 1.toString, + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> 7.toString) { + testProtocolTransition( + createTableProperties = Seq((DeltaConfigs.CHANGE_DATA_FEED.key, true.toString)), + expectedProtocol = Protocol(1, 7).withFeature(ChangeDataFeedTableFeature)) + } + + withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true") { + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 7.toString)), + expectedProtocol = Protocol(1, 7).withFeature(ChangeDataFeedTableFeature)) + } + } + + test("Enabling legacy features on a table") { + testProtocolTransition( + createTableColumns = Seq(("id", "INT")), + createTableGeneratedColumns = Seq(("id2", "INT", "id + 1")), + expectedProtocol = Protocol(1, 4)) + + testProtocolTransition( + createTableColumns = Seq(("id", "INT")), + createTableGeneratedColumns = Seq(("id2", "INT", "id + 1")), + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 7.toString)), + expectedProtocol = Protocol(1, 7).withFeature(GeneratedColumnsTableFeature)) + + withSQLConf( + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION.key -> 1.toString, + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> 7.toString) { + testProtocolTransition( + createTableColumns = Seq(("id", "INT")), + createTableGeneratedColumns = Seq(("id2", "INT", "id + 1")), + expectedProtocol = Protocol(1, 7).withFeature(GeneratedColumnsTableFeature)) + } + + testProtocolTransition( + alterTableProperties = Seq((DeltaConfigs.CHANGE_DATA_FEED.key, "true")), + expectedProtocol = Protocol(1, 4)) + + testProtocolTransition( + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 7.toString), + (DeltaConfigs.CHANGE_DATA_FEED.key, true.toString)), + expectedProtocol = Protocol(1, 7).withFeatures(Seq( + InvariantsTableFeature, + AppendOnlyTableFeature, + ChangeDataFeedTableFeature))) + } + + test("Column Mapping does not require a manual protocol versions upgrade") { + testProtocolTransition( + createTableProperties = Seq((DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")), + expectedProtocol = Protocol(2, 5)) + + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 7.toString), + (DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")), + expectedProtocol = Protocol(2, 7).withFeature(ColumnMappingTableFeature)) + + withSQLConf( + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION.key -> 1.toString, + DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> 7.toString) { + testProtocolTransition( + createTableProperties = Seq((DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")), + expectedProtocol = Protocol(2, 7).withFeature(ColumnMappingTableFeature)) + } + + testProtocolTransition( + alterTableProperties = Seq((DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")), + expectedProtocol = Protocol(2, 5)) + + testProtocolTransition( + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 7.toString), + (DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")), + expectedProtocol = Protocol(2, 7).withFeatures(Seq( + InvariantsTableFeature, + AppendOnlyTableFeature, + ColumnMappingTableFeature))) + } + + private def validVersions = Seq((1, 1), (1, 2), (1, 3), (1, 4), (2, 5), (1, 7), (3, 7)) + private def invalidVersions = Seq((2, 2), (2, 3)) + for ((readerVersion, writerVersion) <- validVersions ++ invalidVersions) + test("Legacy features are added when setting legacy versions: " + + s"readerVersionToSet = $readerVersion, writerVersionToSet = $writerVersion") { + withTempDir { dir => + val deltaLog = DeltaLog.forTable(spark, dir) + + // Creates a table with (1, 7) versions with the given table feature. + sql( + s"""CREATE TABLE delta.`${deltaLog.dataPath}` (id bigint) USING delta + |TBLPROPERTIES ( + |delta.feature.${TestRemovableWriterFeature.name} = 'supported' + |)""".stripMargin) + + sql( + s""" + |ALTER TABLE delta.`${deltaLog.dataPath}` SET TBLPROPERTIES ( + | 'delta.minReaderVersion' = $readerVersion, + | 'delta.minWriterVersion' = $writerVersion + |)""".stripMargin) + + val expected = Protocol(readerVersion, writerVersion).implicitlySupportedFeatures ++ + Set(InvariantsTableFeature, AppendOnlyTableFeature, TestRemovableWriterFeature) + assert(deltaLog.update().protocol.readerAndWriterFeatureNames === expected.map(_.name)) + } + } + + for { + tableFeatureToAdd <- Seq(TestRemovableWriterFeature, TestRemovableReaderWriterFeature) + downgradeVersionToSet <- Seq(1, 2, 3, 4, 5, 6) + preemptiveVersionDowngrade <- BOOLEAN_DOMAIN + } test("Protocol versions are always downgraded to the minimum required " + + s"tableFeatureToAdd: ${tableFeatureToAdd.name}, " + + s"downgradeVersionToSet: $downgradeVersionToSet, " + + s"preemptiveVersionDowngrade: $preemptiveVersionDowngrade") { + withTempDir { dir => + val deltaLog = DeltaLog.forTable(spark, dir) + + sql( + s"""CREATE TABLE delta.`${deltaLog.dataPath}` (id bigint) USING delta + |TBLPROPERTIES ( + |delta.minReaderVersion = ${Math.max(tableFeatureToAdd.minReaderVersion, 1)}, + |delta.minWriterVersion = $TABLE_FEATURES_MIN_WRITER_VERSION, + |delta.feature.${tableFeatureToAdd.name} = 'supported', + |delta.feature.${ChangeDataFeedTableFeature.name} = 'supported' + |)""".stripMargin) + + val downgradeProtocolVersionsSQL = + s""" + |ALTER TABLE delta.`${deltaLog.dataPath}` SET TBLPROPERTIES ( + | 'delta.minReaderVersion' = 1, + | 'delta.minWriterVersion' = $downgradeVersionToSet + |)""".stripMargin + + if (preemptiveVersionDowngrade) sql(downgradeProtocolVersionsSQL) + + AlterTableDropFeatureDeltaCommand( + DeltaTableV2(spark, deltaLog.dataPath), + tableFeatureToAdd.name, + truncateHistory = tableFeatureToAdd.isReaderWriterFeature).run(spark) + + if (!preemptiveVersionDowngrade) sql(downgradeProtocolVersionsSQL) + + val expectedProtocol = if (downgradeVersionToSet < 4) { + Protocol(tableFeatureToAdd.minReaderVersion, 7).withFeature(ChangeDataFeedTableFeature) + .merge(Protocol(1, downgradeVersionToSet)) + } else { + Protocol(1, downgradeVersionToSet) + } + assert(deltaLog.update().protocol === expectedProtocol) + } + } + + for { + tableFeatureToAdd <- Seq(TestRemovableWriterFeature, TestRemovableReaderWriterFeature) + setLegacyVersions <- BOOLEAN_DOMAIN + downgradeAfterDrop <- if (setLegacyVersions) BOOLEAN_DOMAIN else Seq(false) + } test("SOP for downgrading to legacy protocol versions for tables created with features. " + + s"tableFeatureToAdd: ${tableFeatureToAdd.name}, setLegacyVersions: $setLegacyVersions, " + + s"downgradeAfterDrop: ${downgradeAfterDrop}") { + withTempDir { dir => + val deltaLog = DeltaLog.forTable(spark, dir) + + sql( + s"""CREATE TABLE delta.`${deltaLog.dataPath}` (id bigint) USING delta + |TBLPROPERTIES ( + |delta.minReaderVersion = $TABLE_FEATURES_MIN_READER_VERSION, + |delta.minWriterVersion = $TABLE_FEATURES_MIN_WRITER_VERSION, + |delta.feature.${tableFeatureToAdd.name} = 'supported', + |delta.feature.${ChangeDataFeedTableFeature.name} = 'supported' + |)""".stripMargin) + + val downgradeProtocolVersionsSQL = + s""" + |ALTER TABLE delta.`${deltaLog.dataPath}` SET TBLPROPERTIES ( + | 'delta.minReaderVersion' = 1, + | 'delta.minWriterVersion' = 4 + |)""".stripMargin + + if (setLegacyVersions && !downgradeAfterDrop) sql(downgradeProtocolVersionsSQL) + + AlterTableDropFeatureDeltaCommand( + DeltaTableV2(spark, deltaLog.dataPath), + tableFeatureToAdd.name, + truncateHistory = tableFeatureToAdd.isReaderWriterFeature).run(spark) + + if (setLegacyVersions && downgradeAfterDrop) sql(downgradeProtocolVersionsSQL) + + val expectedProtocol = if (setLegacyVersions) { + Protocol(1, 4) + } else { + Protocol(1, TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(ChangeDataFeedTableFeature) + } + assert(deltaLog.update().protocol === expectedProtocol) + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index 5a5025c1386..db1e20e2985 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -106,9 +106,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest readerVersion = 1, writerVersion = 1, features = Seq(TestLegacyReaderWriterFeature), - expectedProtocol = - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestLegacyReaderWriterFeature)) + expectedProtocol = Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeature(TestLegacyReaderWriterFeature)) testEmptyFolder( readerVersion = 1, writerVersion = 1, @@ -120,33 +119,27 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest writerVersion = TABLE_FEATURES_MIN_WRITER_VERSION, features = Seq(TestLegacyReaderWriterFeature), expectedProtocol = - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION) .withFeature(TestLegacyReaderWriterFeature)) testEmptyFolder( readerVersion = 1, writerVersion = 1, features = Seq(TestWriterFeature), sqlConfs = Seq((DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")), - expectedProtocol = Protocol( - 1, - TABLE_FEATURES_MIN_WRITER_VERSION) + expectedProtocol = Protocol(1, TABLE_FEATURES_MIN_WRITER_VERSION) .withFeatures(Seq(TestWriterFeature, ChangeDataFeedTableFeature))) testEmptyFolder( readerVersion = 1, writerVersion = 1, features = Seq(TestLegacyReaderWriterFeature), sqlConfs = Seq((DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")), - expectedProtocol = Protocol( - TABLE_FEATURES_MIN_READER_VERSION, - TABLE_FEATURES_MIN_WRITER_VERSION) + expectedProtocol = Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION) .withFeatures(Seq(TestLegacyReaderWriterFeature, ChangeDataFeedTableFeature))) testEmptyFolder( readerVersion = 1, writerVersion = 1, features = Seq(TestWriterFeature, TestLegacyReaderWriterFeature), - expectedProtocol = Protocol( - TABLE_FEATURES_MIN_READER_VERSION, - TABLE_FEATURES_MIN_WRITER_VERSION) + expectedProtocol = Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION) .withFeatures(Seq(TestWriterFeature, TestLegacyReaderWriterFeature))) } @@ -189,78 +182,43 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } test("upgrade to support table features - no feature") { + // Setting a table feature versions to a protocol without table features is a noop. withTempDir { path => val log = createTableWithProtocol(Protocol(1, 1), path) - assert(log.snapshot.protocol === Protocol(1, 1)) + assert(log.update().protocol === Protocol(1, 1)) val table = io.delta.tables.DeltaTable.forPath(spark, path.getCanonicalPath) table.upgradeTableProtocol(1, TABLE_FEATURES_MIN_WRITER_VERSION) - assert( - log.snapshot.protocol === Protocol( - minReaderVersion = 1, - minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = None, - writerFeatures = Some(Set()))) + assert(log.update().protocol === Protocol(1, 1)) table.upgradeTableProtocol( TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - assert( - log.snapshot.protocol === Protocol( - minReaderVersion = TABLE_FEATURES_MIN_READER_VERSION, - minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = Some(Set()), - writerFeatures = Some(Set()))) + assert(log.update().protocol === Protocol(1, 1)) } } test("upgrade to support table features - writer-only feature") { + // Setting table feature versions to a protocol without table features is a noop. withTempDir { path => val log = createTableWithProtocol(Protocol(1, 2), path) - assert(log.snapshot.protocol === Protocol(1, 2)) + assert(log.update().protocol === Protocol(1, 2)) val table = io.delta.tables.DeltaTable.forPath(spark, path.getCanonicalPath) table.upgradeTableProtocol(1, TABLE_FEATURES_MIN_WRITER_VERSION) - assert( - log.snapshot.protocol === Protocol( - minReaderVersion = 1, - minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = None, - writerFeatures = - Some(Set(AppendOnlyTableFeature, InvariantsTableFeature).map(_.name)))) + assert(log.update().protocol === Protocol(1, 2)) table.upgradeTableProtocol( TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - assert( - log.snapshot.protocol === Protocol( - minReaderVersion = TABLE_FEATURES_MIN_READER_VERSION, - minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = Some(Set()), - writerFeatures = - Some(Set(AppendOnlyTableFeature, InvariantsTableFeature).map(_.name)))) + assert(log.update().protocol === Protocol(1, 2)) } } test("upgrade to support table features - many features") { withTempDir { path => val log = createTableWithProtocol(Protocol(2, 5), path) - assert(log.snapshot.protocol === Protocol(2, 5)) + assert(log.update().protocol === Protocol(2, 5)) val table = io.delta.tables.DeltaTable.forPath(spark, path.getCanonicalPath) table.upgradeTableProtocol(2, TABLE_FEATURES_MIN_WRITER_VERSION) - assert( - log.snapshot.protocol === Protocol( - minReaderVersion = 2, - minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = None, - writerFeatures = Some(Set( - AppendOnlyTableFeature, - ChangeDataFeedTableFeature, - CheckConstraintsTableFeature, - ColumnMappingTableFeature, - GeneratedColumnsTableFeature, - InvariantsTableFeature, - TestLegacyWriterFeature, - TestRemovableLegacyWriterFeature, - TestLegacyReaderWriterFeature, - TestRemovableLegacyReaderWriterFeature) - .map(_.name)))) + // Setting table feature versions to a protocol without table features is a noop. + assert(log.update().protocol === Protocol(2, 5)) spark.sql( s"ALTER TABLE delta.`${path.getPath}` SET TBLPROPERTIES (" + s" delta.feature.${TestWriterFeature.name}='enabled'" + @@ -270,9 +228,9 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest TABLE_FEATURES_MIN_WRITER_VERSION) assert( log.snapshot.protocol === Protocol( - minReaderVersion = TABLE_FEATURES_MIN_READER_VERSION, + minReaderVersion = 2, minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = Some(Set()), + readerFeatures = None, writerFeatures = Some( Set( AppendOnlyTableFeature, @@ -294,34 +252,22 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest withTempDir { path => val log = createTableWithProtocol(Protocol(1, 2), path) - assert(log.snapshot.protocol === Protocol(1, 2)) + assert(log.update().protocol === Protocol(1, 2)) sql( s"ALTER TABLE delta.`${path.getCanonicalPath}` " + "SET TBLPROPERTIES (delta.minWriterVersion = 3)") - assert(log.snapshot.protocol === Protocol(1, 3)) + assert(log.update().protocol === Protocol(1, 3)) assertPropertiesAndShowTblProperties(log) sql(s"ALTER TABLE delta.`${path.getCanonicalPath}` " + s"SET TBLPROPERTIES (delta.minWriterVersion=$TABLE_FEATURES_MIN_WRITER_VERSION)") - assert( - log.snapshot.protocol === Protocol( - minReaderVersion = 1, - minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = None, - writerFeatures = Some( - Set(AppendOnlyTableFeature, CheckConstraintsTableFeature, InvariantsTableFeature) - .map(_.name)))) - assertPropertiesAndShowTblProperties(log, tableHasFeatures = true) - sql(s"ALTER TABLE delta.`${path.getCanonicalPath}` " + - s"SET TBLPROPERTIES (delta.minReaderVersion=$TABLE_FEATURES_MIN_READER_VERSION)") - assert( - log.snapshot.protocol === Protocol( - minReaderVersion = TABLE_FEATURES_MIN_READER_VERSION, - minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = Some(Set()), - writerFeatures = Some( - Set(AppendOnlyTableFeature, CheckConstraintsTableFeature, InvariantsTableFeature) - .map(_.name)))) - assertPropertiesAndShowTblProperties(log, tableHasFeatures = true) + assert(log.update().protocol === Protocol(1, 3)) + assertPropertiesAndShowTblProperties(log, tableHasFeatures = false) + sql(s"""ALTER TABLE delta.`${path.getCanonicalPath}` SET TBLPROPERTIES ( + |delta.minReaderVersion=$TABLE_FEATURES_MIN_READER_VERSION, + |delta.minWriterVersion=$TABLE_FEATURES_MIN_WRITER_VERSION + |)""".stripMargin) + assert(log.update().protocol === Protocol(1, 3)) + assertPropertiesAndShowTblProperties(log, tableHasFeatures = false) } } @@ -821,15 +767,15 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest test("protocol downgrade is a no-op") { withTempDir { path => - val log = createTableWithProtocol(Protocol(2, 3), path) - assert(log.update().protocol === Protocol(2, 3)) + val log = createTableWithProtocol(Protocol(2, 5), path) + assert(log.update().protocol === Protocol(2, 5)) { // DeltaLog API. This API is internal-only and will fail when downgrade. val e = intercept[ProtocolDowngradeException] { log.upgradeProtocol(Protocol(1, 2)) } - assert(log.update().protocol == Protocol(2, 3)) + assert(log.update().protocol == Protocol(2, 5)) assert(e.getErrorClass.contains("DELTA_INVALID_PROTOCOL_DOWNGRADE")) } { // DeltaTable API @@ -837,7 +783,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest val events = Log4jUsageLogger.track { table.upgradeTableProtocol(1, 2) } - assert(log.update().protocol == Protocol(2, 3)) + assert(log.update().protocol == Protocol(2, 5)) assert(events.count(_.tags.get("opType").contains("delta.protocol.downgradeIgnored")) === 1) } { // SQL API @@ -845,7 +791,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest sql(s"ALTER TABLE delta.`${path.getCanonicalPath}` " + "SET TBLPROPERTIES (delta.minWriterVersion = 2)") } - assert(log.update().protocol == Protocol(2, 3)) + assert(log.update().protocol == Protocol(2, 5)) assert(events.count(_.tags.get("opType").contains("delta.protocol.downgradeIgnored")) === 1) } } @@ -1013,21 +959,6 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } } - test("can create table using the latest protocol with conf") { - val readerVersion = Action.supportedProtocolVersion().minReaderVersion - val writerVersion = Action.supportedProtocolVersion().minWriterVersion - withTempDir { dir => - withSQLConf( - DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> writerVersion.toString, - DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION.key -> readerVersion.toString) { - sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta") - val deltaLog = DeltaLog.forTable(spark, dir) - assert(deltaLog.snapshot.protocol === - Action.supportedProtocolVersion(withAllFeatures = false)) - } - } - } - test("can create table using features configured in session") { val readerVersion = Action.supportedProtocolVersion().minReaderVersion val writerVersion = Action.supportedProtocolVersion().minWriterVersion @@ -1059,9 +990,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest s" delta.feature.${TestLegacyReaderWriterFeature.name}='enabled'" + s")") val deltaLog = DeltaLog.forTable(spark, dir) - assert( - deltaLog.snapshot.protocol.minReaderVersion === - TABLE_FEATURES_MIN_READER_VERSION, + assert(deltaLog.snapshot.protocol.minReaderVersion === 2, "reader protocol version should support table features because we used the " + "'delta.feature.' config.") assert( @@ -1072,6 +1001,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest assert( deltaLog.snapshot.protocol.readerAndWriterFeatureNames === Set( AppendOnlyTableFeature, + InvariantsTableFeature, TestLegacyReaderWriterFeature, TestWriterFeature).map(_.name)) } @@ -1302,13 +1232,12 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest deltaLog.snapshot.protocol.minWriterVersion === TABLE_FEATURES_MIN_WRITER_VERSION) assert( deltaLog.snapshot.protocol.readerAndWriterFeatureNames === Set( - AppendOnlyTableFeature, TestWriterFeature).map(_.name)) + AppendOnlyTableFeature, InvariantsTableFeature, TestWriterFeature).map(_.name)) assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } } - test( - "table creation with legacy reader-writer features as table property") { + test("table creation with legacy reader-writer features as table property") { withTempDir { dir => val deltaLog = DeltaLog.forTable(spark, dir) sql( @@ -1316,9 +1245,11 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest "TBLPROPERTIES (DeLtA.fEaTurE.testLEGACYReaderWritER='eNAbled')") assert( - deltaLog.snapshot.protocol === Protocol( - TABLE_FEATURES_MIN_READER_VERSION, - TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(TestLegacyReaderWriterFeature)) + deltaLog.update().protocol === Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestLegacyReaderWriterFeature))) } } @@ -1336,7 +1267,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest TABLE_FEATURES_MIN_WRITER_VERSION) assert( deltaLog.snapshot.protocol.readerAndWriterFeatureNames === - Set(TestWriterFeature.name)) + Set(AppendOnlyTableFeature.name, InvariantsTableFeature.name, TestWriterFeature.name)) assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } } @@ -1357,7 +1288,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest deltaLog.snapshot.protocol.minWriterVersion === TABLE_FEATURES_MIN_WRITER_VERSION) assert( deltaLog.snapshot.protocol.readerAndWriterFeatureNames === Set( - TestLegacyReaderWriterFeature, TestReaderWriterFeature).map(_.name)) + InvariantsTableFeature, + AppendOnlyTableFeature, + TestLegacyReaderWriterFeature, + TestReaderWriterFeature).map(_.name)) assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } } @@ -1375,12 +1309,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest assert( deltaLog.snapshot.protocol === Protocol( - minReaderVersion = TABLE_FEATURES_MIN_READER_VERSION, + minReaderVersion = 2, minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = - Some(Set(TestLegacyReaderWriterFeature.name)), - writerFeatures = - Some(Set(TestLegacyReaderWriterFeature.name)))) + readerFeatures = None, + writerFeatures = Some(Set(TestLegacyReaderWriterFeature.name)))) assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } } @@ -1417,7 +1349,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest minReaderVersion = TABLE_FEATURES_MIN_READER_VERSION, minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, readerFeatures = Some(Set(TestReaderWriterMetadataAutoUpdateFeature.name)), - writerFeatures = Some(Set(TestReaderWriterMetadataAutoUpdateFeature.name)))) + writerFeatures = Some(Set( + TestReaderWriterMetadataAutoUpdateFeature.name, + AppendOnlyTableFeature.name, + InvariantsTableFeature.name)))) assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } } @@ -1450,7 +1385,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest minReaderVersion = TABLE_FEATURES_MIN_READER_VERSION, minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, readerFeatures = Some(Set(TestReaderWriterMetadataAutoUpdateFeature.name)), - writerFeatures = Some(Set(TestReaderWriterMetadataAutoUpdateFeature.name)))) + writerFeatures = Some(Set( + TestReaderWriterMetadataAutoUpdateFeature.name, + InvariantsTableFeature.name, + AppendOnlyTableFeature.name)))) assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } } @@ -1469,7 +1407,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest minReaderVersion = 1, minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, readerFeatures = None, - writerFeatures = Some(Set(TestWriterFeature.name)))) + writerFeatures = Some(Set( + InvariantsTableFeature.name, + AppendOnlyTableFeature.name, + TestWriterFeature.name)))) assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } } @@ -1512,43 +1453,59 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest "legacy protocol, legacy feature, feature property", Map(s"delta.feature.${TestLegacyReaderWriterFeature.name}" -> "enabled"), expectedFinalProtocol = Some( - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestLegacyReaderWriterFeature))) + Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION).withFeatures(Seq( + TestLegacyReaderWriterFeature, + AppendOnlyTableFeature, + InvariantsTableFeature)))) testCreateTable( "legacy protocol, legacy writer feature, feature property", Map(s"delta.feature.${TestLegacyWriterFeature.name}" -> "enabled"), expectedFinalProtocol = Some( - Protocol(1, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestLegacyWriterFeature))) + Protocol(1, TABLE_FEATURES_MIN_WRITER_VERSION).withFeatures(Seq( + TestLegacyWriterFeature, + AppendOnlyTableFeature, + InvariantsTableFeature )))) testCreateTable( "legacy protocol, native auto-update feature, metadata", Map(TestReaderWriterMetadataAutoUpdateFeature.TABLE_PROP_KEY -> "true"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataAutoUpdateFeature))) + .withFeatures(Seq( + TestReaderWriterMetadataAutoUpdateFeature, + AppendOnlyTableFeature, + InvariantsTableFeature)))) testCreateTable( "legacy protocol, native non-auto-update feature, metadata", Map(TestReaderWriterMetadataNoAutoUpdateFeature.TABLE_PROP_KEY -> "true"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataNoAutoUpdateFeature))) + .withFeatures(Seq( + TestReaderWriterMetadataNoAutoUpdateFeature, + AppendOnlyTableFeature, + InvariantsTableFeature)))) testCreateTable( "legacy protocol, native auto-update feature, feature property", Map(s"delta.feature.${TestReaderWriterMetadataAutoUpdateFeature.name}" -> "enabled"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataAutoUpdateFeature))) + .withFeatures(Seq( + TestReaderWriterMetadataAutoUpdateFeature, + AppendOnlyTableFeature, + InvariantsTableFeature)))) testCreateTable( "legacy protocol, native non-auto-update feature, feature property", Map(s"delta.feature.${TestReaderWriterMetadataNoAutoUpdateFeature.name}" -> "enabled"), expectedFinalProtocol = Some( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestReaderWriterMetadataNoAutoUpdateFeature))) + .withFeatures(Seq( + TestReaderWriterMetadataNoAutoUpdateFeature, + AppendOnlyTableFeature, + InvariantsTableFeature)))) testCreateTable( "legacy protocol with supported version props, legacy feature, feature property", @@ -1558,9 +1515,9 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest DeltaConfigs.MIN_WRITER_VERSION.key -> TestLegacyReaderWriterFeature.minWriterVersion.toString, s"delta.feature.${TestLegacyReaderWriterFeature.name}" -> "enabled"), - expectedFinalProtocol = Some( - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestLegacyReaderWriterFeature))) + expectedFinalProtocol = Some(Protocol( + TestLegacyReaderWriterFeature.minReaderVersion, + TestLegacyReaderWriterFeature.minWriterVersion))) testCreateTable( "legacy protocol with table feature version props, legacy feature, feature property", @@ -1569,8 +1526,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest DeltaConfigs.MIN_WRITER_VERSION.key -> TABLE_FEATURES_MIN_WRITER_VERSION.toString, s"delta.feature.${TestLegacyReaderWriterFeature.name}" -> "enabled"), expectedFinalProtocol = Some( - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestLegacyReaderWriterFeature))) + Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(TestLegacyReaderWriterFeature))) testCreateTable( "legacy protocol with supported version props, native feature, feature property", @@ -1589,8 +1545,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest DeltaConfigs.MIN_WRITER_VERSION.key -> TABLE_FEATURES_MIN_WRITER_VERSION.toString, "delta.appendOnly" -> "true"), expectedFinalProtocol = Some( - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(AppendOnlyTableFeature))) + Protocol(1, TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(AppendOnlyTableFeature))) testCreateTable( "table features protocol, legacy feature, feature property", @@ -1599,8 +1554,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest DeltaConfigs.MIN_WRITER_VERSION.key -> TABLE_FEATURES_MIN_WRITER_VERSION.toString, s"delta.feature.${TestLegacyReaderWriterFeature.name}" -> "enabled"), expectedFinalProtocol = Some( - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestLegacyReaderWriterFeature))) + Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(TestLegacyReaderWriterFeature))) testCreateTable( "table features protocol, native auto-update feature, metadata", @@ -1703,23 +1657,23 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest testAlterTable( name = "downgrade reader version is a no-op", - tableProtocol = Protocol(2, 2), + tableProtocol = Protocol(2, 5), props = Map(DeltaConfigs.MIN_READER_VERSION.key -> "1"), - expectedFinalProtocol = Some(Protocol(2, 2))) + expectedFinalProtocol = Some(Protocol(2, 5))) testAlterTable( name = "downgrade writer version is a no-op", - tableProtocol = Protocol(2, 2), + tableProtocol = Protocol(1, 3), props = Map(DeltaConfigs.MIN_WRITER_VERSION.key -> "1"), - expectedFinalProtocol = Some(Protocol(2, 2))) + expectedFinalProtocol = Some(Protocol(1, 3))) testAlterTable( name = "downgrade both reader and versions version is a no-op", - tableProtocol = Protocol(2, 2), + tableProtocol = Protocol(2, 5), props = Map( DeltaConfigs.MIN_READER_VERSION.key -> "1", DeltaConfigs.MIN_WRITER_VERSION.key -> "1"), - expectedFinalProtocol = Some(Protocol(2, 2))) + expectedFinalProtocol = Some(Protocol(2, 5))) testAlterTable( name = "downgrade reader but upgrade writer versions (legacy protocol)", @@ -1735,18 +1689,17 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest props = Map( DeltaConfigs.MIN_READER_VERSION.key -> "1", DeltaConfigs.MIN_WRITER_VERSION.key -> "7"), - expectedFinalProtocol = Some( - Protocol(2, 7).withFeatures( - Seq(AppendOnlyTableFeature, InvariantsTableFeature)))) // Features from writer version 2 + // There is no (2, 2) feature. Protocol versions are downgraded (1, 2). + expectedFinalProtocol = Some(Protocol(1, 2))) testAlterTable( name = "downgrade while enabling a feature will become an upgrade", - tableProtocol = Protocol(2, 2), + tableProtocol = Protocol(1, 2), props = Map( DeltaConfigs.MIN_READER_VERSION.key -> "1", DeltaConfigs.MIN_WRITER_VERSION.key -> "1", DeltaConfigs.CHANGE_DATA_FEED.key -> "true"), - expectedFinalProtocol = Some(Protocol(2, 4))) + expectedFinalProtocol = Some(Protocol(1, 4))) testAlterTable( "legacy protocol, legacy feature, metadata", @@ -1757,8 +1710,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest "legacy protocol, legacy feature, feature property", Map(s"delta.feature.${TestLegacyReaderWriterFeature.name}" -> "enabled"), expectedFinalProtocol = Some( - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestLegacyReaderWriterFeature))) + Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(TestLegacyReaderWriterFeature))) testAlterTable( "legacy protocol, legacy writer feature, feature property", @@ -1823,8 +1775,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest DeltaConfigs.MIN_WRITER_VERSION.key -> TABLE_FEATURES_MIN_WRITER_VERSION.toString, s"delta.feature.${TestLegacyReaderWriterFeature.name}" -> "enabled"), expectedFinalProtocol = Some( - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestLegacyReaderWriterFeature))) + Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(TestLegacyReaderWriterFeature))) testAlterTable( "legacy protocol with supported version props, native feature, feature property", @@ -1840,19 +1791,15 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest "table features protocol, legacy feature, metadata", Map("delta.appendOnly" -> "true"), expectedFinalProtocol = Some( - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(AppendOnlyTableFeature)), - tableProtocol = - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)) + Protocol(1, TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(AppendOnlyTableFeature)), + tableProtocol = Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)) testAlterTable( "table features protocol, legacy feature, feature property", Map(s"delta.feature.${TestLegacyReaderWriterFeature.name}" -> "enabled"), expectedFinalProtocol = Some( - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .withFeature(TestLegacyReaderWriterFeature)), - tableProtocol = - Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)) + Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(TestLegacyReaderWriterFeature)), + tableProtocol = Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION)) testAlterTable( "table features protocol, native auto-update feature, metadata", @@ -1960,9 +1907,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest s" '${TestWriterMetadataNoAutoUpdateFeature.TABLE_PROP_KEY}' = 'true')") assert( deltaLog.update().protocol === - expectedProtocolOnCreation - .merge(TestWriterMetadataNoAutoUpdateFeature.minProtocolVersion) - .withFeature(TestWriterMetadataNoAutoUpdateFeature)) + Protocol(1, 7).withFeature(TestWriterMetadataNoAutoUpdateFeature) + .merge(TestWriterMetadataNoAutoUpdateFeature.minProtocolVersion)) } } @@ -2165,9 +2111,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest | 'delta.feature.testLegacyReaderWriter' = 'enabled' |)""".stripMargin) assert( - log.snapshot.protocol === Protocol( - TABLE_FEATURES_MIN_READER_VERSION, - TABLE_FEATURES_MIN_WRITER_VERSION).withFeature(TestLegacyReaderWriterFeature)) + log.update().protocol === Protocol(2, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeature(TestLegacyReaderWriterFeature)) } } @@ -2179,9 +2124,9 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest | 'delta.feature.testLegacyReaderWriter' = 'enabled' |)""".stripMargin) assert(log.snapshot.protocol === Protocol( - TABLE_FEATURES_MIN_READER_VERSION, + 2, TABLE_FEATURES_MIN_WRITER_VERSION, - readerFeatures = Some(Set(TestLegacyReaderWriterFeature.name)), + readerFeatures = None, writerFeatures = Some(Set(TestLegacyReaderWriterFeature.name)))) } } @@ -2269,7 +2214,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } assert( log.update().protocol === - Protocol(2, 5).merge(Protocol(3, 7)).withFeature(TestReaderWriterFeature)) + Protocol(2, 5).merge(Protocol(3, 7).withFeature(TestReaderWriterFeature))) } } @@ -2356,7 +2301,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest assert(captureProtocolChangeEventBlob { sql( s"ALTER TABLE delta.`$dir` " + - s"SET TBLPROPERTIES (${DeltaConfigs.MIN_WRITER_VERSION.key} = '7')") + s"SET TBLPROPERTIES (${DeltaConfigs.MIN_WRITER_VERSION.key} = '3')") } === Map( "fromProtocol" -> Map( "minReaderVersion" -> 1, @@ -2365,8 +2310,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest ), "toProtocol" -> Map( "minReaderVersion" -> 1, - "minWriterVersion" -> 7, - "supportedFeatures" -> List("appendOnly", "invariants") + "minWriterVersion" -> 3, + "supportedFeatures" -> List("appendOnly", "checkConstraints", "invariants") ))) // Add feature @@ -2377,13 +2322,14 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } === Map( "fromProtocol" -> Map( "minReaderVersion" -> 1, - "minWriterVersion" -> 7, - "supportedFeatures" -> List("appendOnly", "invariants") + "minWriterVersion" -> 3, + "supportedFeatures" -> List("appendOnly", "checkConstraints", "invariants") ), "toProtocol" -> Map( "minReaderVersion" -> 3, "minWriterVersion" -> 7, - "supportedFeatures" -> List("appendOnly", "deletionVectors", "invariants") + "supportedFeatures" -> + List("appendOnly", "checkConstraints", "deletionVectors", "invariants") ))) } } @@ -2471,18 +2417,19 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest featureProperty: String): Unit = { sql(s"""CREATE TABLE delta.`${deltaLog.dataPath}` (id bigint) USING delta |TBLPROPERTIES ( - |delta.minReaderVersion = $TABLE_FEATURES_MIN_READER_VERSION, |delta.feature.${feature.name} = 'supported', |$featureProperty = "true" |)""".stripMargin) - val expectedWriterFeatures = Some(Set(feature.name)) + val readerVersion = Math.max(feature.minReaderVersion, 1) + val expectedWriterFeatures = + Some(Set(feature.name, InvariantsTableFeature.name, AppendOnlyTableFeature.name)) val expectedReaderFeatures: Option[Set[String]] = - if (feature.isReaderWriterFeature) expectedWriterFeatures else Some(Set.empty) + if (supportsReaderFeatures(readerVersion)) Some(Set(feature.name)) else None assert( deltaLog.update().protocol === Protocol( - minReaderVersion = TABLE_FEATURES_MIN_READER_VERSION, + minReaderVersion = Math.max(feature.minReaderVersion, 1), minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, readerFeatures = expectedReaderFeatures, writerFeatures = expectedWriterFeatures)) @@ -2505,7 +2452,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest // Writer feature is removed from the writer features set. val snapshot = deltaLog.update() - assert(snapshot.protocol === Protocol(1, 1)) + assert(snapshot.protocol === Protocol(1, 2)) assert(!snapshot.metadata.configuration.contains(featurePropertyKey)) assertPropertiesAndShowTblProperties(deltaLog) } @@ -2596,7 +2543,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest // Reader+writer feature is removed from the features set. val snapshot = deltaLog.update() - assert(snapshot.protocol === Protocol(1, 1)) + assert(snapshot.protocol === Protocol(1, 2)) assert(!snapshot.metadata.configuration.contains(featurePropertyKey)) assertPropertiesAndShowTblProperties(deltaLog) } else { @@ -2676,7 +2623,11 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest val protocol = deltaLog.update().protocol assert(protocol === protocolWithFeatures( - writerFeatures = Seq(TestWriterFeature, TestRemovableWriterFeature))) + writerFeatures = Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestWriterFeature, + TestRemovableWriterFeature))) val command = AlterTableDropFeatureDeltaCommand( DeltaTableV2(spark, deltaLog.dataPath), @@ -2688,7 +2639,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest minReaderVersion = 1, minWriterVersion = TABLE_FEATURES_MIN_WRITER_VERSION, readerFeatures = None, - writerFeatures = Some(Set(TestWriterFeature.name)))) + writerFeatures = Some(Set( + TestWriterFeature.name, + AppendOnlyTableFeature.name, + InvariantsTableFeature.name)))) } } @@ -2700,7 +2654,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest |delta.feature.${TestWriterMetadataNoAutoUpdateFeature.name} = 'supported' |)""".stripMargin) - val expectedProtocol = protocolWithWriterFeature(TestWriterMetadataNoAutoUpdateFeature) + val expectedProtocol = protocolWithFeatures(writerFeatures = Seq( + TestWriterMetadataNoAutoUpdateFeature, + AppendOnlyTableFeature, + InvariantsTableFeature)) assert(deltaLog.update().protocol === expectedProtocol) val command = AlterTableDropFeatureDeltaCommand( @@ -2770,23 +2727,13 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest test(s"Remove a feature not present in the protocol - withTableFeatures: $withTableFeatures") { withTempDir { dir => val deltaLog = DeltaLog.forTable(spark, dir) - val (minReaderVersion, minWriterVersion) = if (withTableFeatures) { - (TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - } else { - (1, 2) - } - sql( - s"""CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta - |TBLPROPERTIES ( - |delta.minReaderVersion = $minReaderVersion, - |delta.minWriterVersion = $minWriterVersion)""".stripMargin) + sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta") - assert( - deltaLog.update().protocol === Protocol( - minReaderVersion = minReaderVersion, - minWriterVersion = minWriterVersion, - readerFeatures = if (withTableFeatures) Some(Set.empty) else None, - writerFeatures = if (withTableFeatures) Some(Set.empty) else None)) + assert(deltaLog.update().protocol === Protocol( + minReaderVersion = 1, + minWriterVersion = 2, + readerFeatures = None, + writerFeatures = None)) val command = AlterTableDropFeatureDeltaCommand( DeltaTableV2(spark, deltaLog.dataPath), @@ -2810,21 +2757,23 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest |delta.feature.${TestRemovableWriterFeature.name} = 'supported' |)""".stripMargin) + val expectedFeatures = + Seq(AppendOnlyTableFeature, InvariantsTableFeature, TestRemovableWriterFeature) val protocol = deltaLog.update().protocol - assert(protocol === protocolWithWriterFeature(TestRemovableWriterFeature)) + assert(protocol === protocolWithFeatures(writerFeatures = expectedFeatures)) val command = AlterTableDropFeatureDeltaCommand( DeltaTableV2(spark, deltaLog.dataPath), TestRemovableWriterFeature.name) command.run(spark) - assert(deltaLog.update().protocol === Protocol(1, 1)) + assert(deltaLog.update().protocol === Protocol(1, 2)) sql(s"""ALTER TABLE delta.`${dir.getCanonicalPath}` SET TBLPROPERTIES ( |delta.feature.${TestRemovableWriterFeature.name} = 'supported' |)""".stripMargin) val expectedProtocolAfterReintroduction = - protocolWithFeatures(writerFeatures = Seq(TestRemovableWriterFeature)) + protocolWithFeatures(writerFeatures = expectedFeatures) assert(deltaLog.update().protocol === expectedProtocolAfterReintroduction) } } @@ -2844,11 +2793,14 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest |)""".stripMargin) var protocol = deltaLog.update().protocol - assert(protocol === protocolWithWriterFeature(TestRemovableWriterFeature)) + assert(protocol === protocolWithFeatures(writerFeatures = Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestRemovableWriterFeature))) AlterTableDropFeatureDeltaCommand( DeltaTableV2(spark, deltaLog.dataPath), TestRemovableWriterFeature.name).run(spark) - assert(deltaLog.update().protocol === Protocol(1, 1)) + assert(deltaLog.update().protocol === Protocol(1, 2)) // Scenario-2: Create a table with `TestRemovableWriterFeatureWithDependency` feature. This // will enable 2 dependent features also. @@ -3046,7 +2998,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest // Reader+writer feature is removed from the features set. val snapshot = deltaLog.update() - assert(snapshot.protocol === Protocol(1, 1)) + assert(snapshot.protocol === Protocol(1, 2)) assert(!snapshot.metadata.configuration .contains(TestRemovableReaderWriterFeature.TABLE_PROP_KEY)) assertPropertiesAndShowTblProperties(deltaLog) @@ -3178,7 +3130,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest Map(TestRemovableWriterFeature.TABLE_PROP_KEY -> "true")).run(spark) val protocol = deltaLog.update().protocol - assert(protocol === protocolWithWriterFeature(TestRemovableWriterFeature)) + assert(protocol === protocolWithFeatures(writerFeatures = Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestRemovableWriterFeature))) val logs = Log4jUsageLogger.track { val featureName = quoteWith match { @@ -3187,7 +3142,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest case "backtick" => s"`${TestRemovableWriterFeature.name}`" } sql(s"ALTER TABLE $table DROP FEATURE $featureName") - assert(deltaLog.update().protocol === Protocol(1, 1)) + assert(deltaLog.update().protocol === Protocol(1, 2)) } // Test that the write downgrade command was invoked. val expectedOpType = "delta.test.TestWriterFeaturePreDowngradeCommand" @@ -3239,7 +3194,12 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest } val protocol = deltaLog.update().protocol - assert(protocol === protocolWithReaderFeature(TestRemovableReaderWriterFeature)) + assert(protocol === protocolWithFeatures( + readerFeatures = Seq(TestRemovableReaderWriterFeature), + writerFeatures = Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestRemovableReaderWriterFeature))) val logs = Log4jUsageLogger.track { val featureName = quoteWith match { @@ -3258,7 +3218,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest clock.advance(TimeUnit.HOURS.toMillis(1)) sql(s"ALTER TABLE $table DROP FEATURE $featureName TRUNCATE HISTORY") - assert(deltaLog.update().protocol === Protocol(1, 1)) + assert(deltaLog.update().protocol === Protocol(1, 2)) } // Validate the correct downgrade command was invoked. @@ -3401,7 +3361,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest initialMinWriterVersion = 7, featuresToAdd = Seq(TestRemovableWriterFeature, ColumnMappingTableFeature), featuresToRemove = Seq(TestRemovableWriterFeature), - expectedDowngradedProtocol = Protocol(3, 7).withFeature(ColumnMappingTableFeature)) + expectedDowngradedProtocol = Protocol(2, 7).withFeature(ColumnMappingTableFeature)) // Remove reader+writer legacy feature as well. testProtocolVersionDowngrade( @@ -3439,7 +3399,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest expectedDowngradedProtocol = protocolWithReaderFeature(TestRemovableReaderWriterFeature)) } - test(s"Can drop reader+writer feature when there is nothing to clean") { + test("Can drop reader+writer feature when there is nothing to clean") { withTempPath { dir => val clock = new ManualClock(System.currentTimeMillis()) val targetLog = DeltaLog.forTable(spark, dir, clock) @@ -3464,7 +3424,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest TestRemovableReaderWriterFeature.name, truncateHistory = true).run(spark) - assert(targetLog.update().protocol == Protocol(1, 1)) + assert(targetLog.update().protocol == Protocol(1, 2)) } } @@ -3553,7 +3513,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest featureName = TestRemovableWriterWithHistoryTruncationFeature.name, truncateHistory = truncateHistory).run(spark) - assert(deltaLog.update().protocol === Protocol(1, 1)) + assert(deltaLog.update().protocol === Protocol(1, 2)) } } @@ -3718,10 +3678,11 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest |)""".stripMargin) val protocol = deltaLog.update().protocol - assert(protocol.minReaderVersion == TABLE_FEATURES_MIN_READER_VERSION) - assert(protocol.minWriterVersion == TABLE_FEATURES_MIN_WRITER_VERSION) - assert(protocol.readerFeatures.get.contains(featureName) - === enableFeatureInitially) + assert(protocol.minReaderVersion == + (if (enableFeatureInitially) TABLE_FEATURES_MIN_READER_VERSION else 1)) + assert(protocol.minWriterVersion == + (if (enableFeatureInitially) TABLE_FEATURES_MIN_WRITER_VERSION else 1)) + assert(protocol.readerFeatures.isDefined === enableFeatureInitially) downgradeFailsWithException match { case Some(exceptionClass) => val e = intercept[DeltaTableFeatureException] { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala index 0c61df3bfdc..261aa06c6ee 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala @@ -133,6 +133,8 @@ class DeltaTableFeatureSuite Some(metadata)) .readerAndWriterFeatureNames === Set( + AppendOnlyTableFeature.name, + InvariantsTableFeature.name, TestWriterFeatureWithTransitiveDependency.name, TestFeatureWithDependency.name, TestReaderWriterFeature.name)) @@ -205,30 +207,13 @@ class DeltaTableFeatureSuite val tfProtocol2 = Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - assert( - tfProtocol1.merge(Protocol(1, 2)) === - tfProtocol1.withFeatures(Seq(AppendOnlyTableFeature, InvariantsTableFeature))) - assert( - tfProtocol2.merge(Protocol(2, 6)) === - tfProtocol2.withFeatures(Set( - AppendOnlyTableFeature, - InvariantsTableFeature, - ColumnMappingTableFeature, - ChangeDataFeedTableFeature, - CheckConstraintsTableFeature, - GeneratedColumnsTableFeature, - IdentityColumnsTableFeature, - TestLegacyWriterFeature, - TestLegacyReaderWriterFeature, - TestRemovableLegacyWriterFeature, - TestRemovableLegacyReaderWriterFeature))) + assert(tfProtocol1.merge(Protocol(1, 2)) === Protocol(1, 2)) + assert(tfProtocol2.merge(Protocol(2, 6)) === Protocol(2, 6)) } test("protocol upgrade compatibility") { assert(Protocol(1, 1).canUpgradeTo(Protocol(1, 1))) assert(Protocol(1, 1).canUpgradeTo(Protocol(2, 1))) - assert(!Protocol(1, 2).canUpgradeTo(Protocol(1, 1))) - assert(!Protocol(2, 2).canUpgradeTo(Protocol(2, 1))) assert( Protocol(1, 1).canUpgradeTo( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION))) @@ -267,10 +252,6 @@ class DeltaTableFeatureSuite TestLegacyReaderWriterFeature, TestRemovableLegacyWriterFeature, TestRemovableLegacyReaderWriterFeature)))) - // Features are identical but protocol versions are lower, thus `canUpgradeTo` is `false`. - assert( - !Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - .canUpgradeTo(Protocol(1, 1))) } test("protocol downgrade compatibility") { @@ -295,8 +276,8 @@ class DeltaTableFeatureSuite .canDowngradeTo(Protocol(1, 1), droppedFeatureName = TestReaderWriterFeature.name)) assert( tableFeatureProtocol - .merge(Protocol(2, 5)) .withFeatures(Seq(TestReaderWriterFeature, TestRemovableLegacyReaderWriterFeature)) + .merge(Protocol(2, 5)) .canDowngradeTo(Protocol(2, 5), droppedFeatureName = TestReaderWriterFeature.name)) // Downgraded protocol must be able to support all legacy table features. assert( @@ -305,8 +286,8 @@ class DeltaTableFeatureSuite .canDowngradeTo(Protocol(2, 4), droppedFeatureName = TestWriterFeature.name)) assert( tableFeatureProtocol - .merge(Protocol(2, 5)) .withFeatures(Seq(TestWriterFeature, AppendOnlyTableFeature, ColumnMappingTableFeature)) + .merge(Protocol(2, 5)) .canDowngradeTo(Protocol(2, 5), droppedFeatureName = TestWriterFeature.name)) } @@ -354,6 +335,8 @@ class DeltaTableFeatureSuite val log = DeltaLog.forTable(spark, TableIdentifier("tbl")) val protocol = log.update().protocol assert(protocol.readerAndWriterFeatureNames === Set( + AppendOnlyTableFeature.name, + InvariantsTableFeature.name, ColumnMappingTableFeature.name, TestWriterFeature.name)) } @@ -407,6 +390,8 @@ class DeltaTableFeatureSuite commandName, targetTableName = "tbl", sourceTableName = "tbl", tblProperties)) val protocol = log.update().protocol assert(protocol.readerAndWriterFeatureNames === Set( + AppendOnlyTableFeature.name, + InvariantsTableFeature.name, ChangeDataFeedTableFeature.name, TestWriterFeature.name)) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimestampNTZSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimestampNTZSuite.scala index ab7a7493997..2233684da8c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimestampNTZSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimestampNTZSuite.scala @@ -48,7 +48,8 @@ class DeltaTimestampNTZSuite extends QueryTest new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000), LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000))) assert(getProtocolForTable("tbl") == - TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature) + TimestampNTZTableFeature.minProtocolVersion.withFeatures(Seq( + AppendOnlyTableFeature, InvariantsTableFeature, TimestampNTZTableFeature)) ) } } @@ -114,7 +115,8 @@ class DeltaTimestampNTZSuite extends QueryTest new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000), LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000))) assert(getProtocolForTable("delta_test") == - TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature) + TimestampNTZTableFeature.minProtocolVersion.withFeatures(Seq( + AppendOnlyTableFeature, InvariantsTableFeature, TimestampNTZTableFeature)) ) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala index f76df40109d..1361aa94132 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala @@ -216,7 +216,8 @@ trait RestoreTableSuiteBase extends QueryTest with SharedSparkSession val deltaLog = DeltaLog.forTable(spark, path) val oldProtocolVersion = deltaLog.snapshot.protocol // Update table to latest version. - deltaLog.upgradeProtocol(oldProtocolVersion.merge(Protocol())) + deltaLog.upgradeProtocol( + oldProtocolVersion.merge(Protocol().withFeature(TestReaderWriterFeature))) val newProtocolVersion = deltaLog.snapshot.protocol assert(newProtocolVersion.minReaderVersion > oldProtocolVersion.minReaderVersion && newProtocolVersion.minWriterVersion > oldProtocolVersion.minWriterVersion, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/DropColumnMappingFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/DropColumnMappingFeatureSuite.scala index 713af3bf5d0..0e9ac064806 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/DropColumnMappingFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/DropColumnMappingFeatureSuite.scala @@ -71,9 +71,13 @@ class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils { |USING delta |TBLPROPERTIES ('delta.columnMapping.mode' = 'name') |""".stripMargin) - val e = intercept[DeltaTableFeatureException] { + val e = intercept[DeltaAnalysisException] { + // Try to drop column mapping. dropColumnMappingTableFeature() } + checkError(e, + errorClass = "DELTA_INVALID_COLUMN_NAMES_WHEN_REMOVING_COLUMN_MAPPING", + parameters = Map("invalidColumnNames" -> "col1 with special chars ,;{}()\n\t=")) } test("drop column mapping from a table without table feature") { @@ -82,17 +86,12 @@ class DropColumnMappingFeatureSuite extends RemoveColumnMappingSuiteUtils { |USING delta |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name', | '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = 'false', - | 'delta.minReaderVersion' = '1', - | 'delta.minWriterVersion' = '1') + | 'delta.minReaderVersion' = '3', + | 'delta.minWriterVersion' = '7') |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn | FROM RANGE(0, $totalRows, 1, $numFiles) |""".stripMargin) - val e = intercept[DeltaTableFeatureException] { - dropColumnMappingTableFeature() - } - checkError(e, - errorClass = "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT", - parameters = Map("feature" -> "columnMapping")) + testDroppingColumnMapping() } test("drop column mapping from a table with table feature") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala index 24fdf211e3e..7f14b19e3b2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/schema/CheckConstraintsSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.delta.schema import scala.collection.JavaConverters._ -import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog} +import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.constraints.CharVarcharConstraint import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -460,37 +461,46 @@ class CheckConstraintsSuite extends QueryTest } test("drop table feature") { - withTable("table") { - sql("CREATE TABLE table (a INT, b INT) USING DELTA " + - "TBLPROPERTIES ('delta.feature.checkConstraints' = 'supported')") - sql("ALTER TABLE table ADD CONSTRAINT c1 CHECK (a > 0)") - sql("ALTER TABLE table ADD CONSTRAINT c2 CHECK (b > 0)") - - val error1 = intercept[AnalysisException] { - sql("ALTER TABLE table DROP FEATURE checkConstraints") - } - checkError( - error1, - errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", - parameters = Map("constraints" -> "`c1`, `c2`") - ) - val deltaLog = DeltaLog.forTable(spark, TableIdentifier("table")) - assert(deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints")) + withSQLConf( + DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> false.toString) { + withTable("table") { + sql("CREATE TABLE table (a INT, b INT) USING DELTA " + + "TBLPROPERTIES ('delta.feature.checkConstraints' = 'supported')") + sql("ALTER TABLE table ADD CONSTRAINT c1 CHECK (a > 0)") + sql("ALTER TABLE table ADD CONSTRAINT c2 CHECK (b > 0)") + + val error1 = intercept[AnalysisException] { + sql("ALTER TABLE table DROP FEATURE checkConstraints") + } + checkError( + error1, + errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", + parameters = Map("constraints" -> "`c1`, `c2`") + ) + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("table")) + val featureNames1 = + deltaLog.update().protocol.implicitlyAndExplicitlySupportedFeatures.map(_.name) + assert(featureNames1.contains("checkConstraints")) + + sql("ALTER TABLE table DROP CONSTRAINT c1") + val error2 = intercept[AnalysisException] { + sql("ALTER TABLE table DROP FEATURE checkConstraints") + } + checkError( + error2, + errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", + parameters = Map("constraints" -> "`c2`") + ) + val featureNames2 = + deltaLog.update().protocol.implicitlyAndExplicitlySupportedFeatures.map(_.name) + assert(featureNames2.contains("checkConstraints")) - sql("ALTER TABLE table DROP CONSTRAINT c1") - val error2 = intercept[AnalysisException] { + sql("ALTER TABLE table DROP CONSTRAINT c2") sql("ALTER TABLE table DROP FEATURE checkConstraints") + val featureNames3 = + deltaLog.update().protocol.implicitlyAndExplicitlySupportedFeatures.map(_.name) + assert(!featureNames3.contains("checkConstraints")) } - checkError( - error2, - errorClass = "DELTA_CANNOT_DROP_CHECK_CONSTRAINT_FEATURE", - parameters = Map("constraints" -> "`c2`") - ) - assert(deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints")) - - sql("ALTER TABLE table DROP CONSTRAINT c2") - sql("ALTER TABLE table DROP FEATURE checkConstraints") - assert(!deltaLog.update().protocol.readerAndWriterFeatureNames.contains("checkConstraints")) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala index 90035f3917c..157f444b47e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala @@ -407,14 +407,9 @@ class InvariantEnforcementSuite extends QueryTest val newMetadata = txn.metadata.copy( configuration = txn.metadata.configuration + ("delta.constraints.mychk" -> "valueA < valueB")) - assert(txn.protocol.minWriterVersion === writerVersion) txn.commit(Seq(newMetadata), DeltaOperations.ManualUpdate) - val upVersion = if (TableFeatureProtocolUtils.supportsWriterFeatures(writerVersion)) { - TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION - } else { - CheckConstraintsTableFeature.minWriterVersion - } - assert(table.deltaLog.unsafeVolatileSnapshot.protocol.minWriterVersion === upVersion) + assert(table.deltaLog.update().protocol.minWriterVersion === + CheckConstraintsTableFeature.minWriterVersion) spark.sql("INSERT INTO constraint VALUES (50, 100, null)") val e = intercept[InvariantViolationException] { spark.sql("INSERT INTO constraint VALUES (100, 50, null)")