From bcd0ee2deb682aa5c8df488cb93dae003bc08921 Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Tue, 14 Nov 2023 11:11:06 -0800 Subject: [PATCH] Add Feature Phaseout support for V2 Checkpoints This PR adds table table feature phaseout support for V2 checkpoints. Users can now downgrade their tables from v2 checkpoints to classic checkpoints allowing older clients to interact with these tables. Closes delta-io/delta#2284 GitOrigin-RevId: 6321298103baeda6fbd9a0d9932090f915af4ee0 --- .../PreDowngradeTableFeatureCommand.scala | 32 ++++- .../apache/spark/sql/delta/TableFeature.scala | 26 ++++ .../sql/delta/DeltaProtocolVersionSuite.scala | 135 ++++++++++++++++++ 3 files changed, 192 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index c894eba0092..8e48f5b7954 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -15,9 +15,10 @@ */ package org.apache.spark.sql.delta +import java.util.concurrent.TimeUnit import org.apache.spark.sql.delta.catalog.DeltaTableV2 -import org.apache.spark.sql.delta.commands.AlterTableUnsetPropertiesDeltaCommand +import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand} import org.apache.spark.sql.delta.metering.DeltaLogging /** @@ -87,3 +88,32 @@ case class TestLegacyReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2) true } } + +case class V2CheckpointPreDowngradeCommand(table: DeltaTableV2) + extends PreDowngradeTableFeatureCommand + with DeltaLogging { + /** + * We set the checkpoint policy to classic to prevent any transactions from creating + * v2 checkpoints. + * + * @return True if it changed checkpoint policy metadata property to classic. + * False otherwise. + */ + override def removeFeatureTracesIfNeeded(): Boolean = { + + if (V2CheckpointTableFeature.validateRemoval(table.initialSnapshot)) return false + + val startTimeNs = System.nanoTime() + val properties = Map(DeltaConfigs.CHECKPOINT_POLICY.key -> CheckpointPolicy.Classic.name) + AlterTableSetPropertiesDeltaCommand(table, properties).run(table.spark) + + recordDeltaEvent( + table.deltaLog, + opType = "delta.v2CheckpointFeatureRemovalMetrics", + data = + Map(("downgradeTimeMs", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs))) + ) + + true + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 922de40bc1f..1f77913890c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -535,6 +535,7 @@ object ClusteringTableFeature extends WriterFeature("clustering") { */ object V2CheckpointTableFeature extends ReaderWriterFeature(name = "v2Checkpoint") + with RemovableFeature with FeatureAutomaticallyEnabledByMetadata { override def automaticallyUpdateProtocolOfExistingTables: Boolean = true @@ -545,6 +546,31 @@ object V2CheckpointTableFeature override def metadataRequiresFeatureToBeEnabled( metadata: Metadata, spark: SparkSession): Boolean = isV2CheckpointSupportNeededByMetadata(metadata) + + override def validateRemoval(snapshot: Snapshot): Boolean = { + // Fail validation if v2 checkpoints are still enabled in the current snapshot + if (isV2CheckpointSupportNeededByMetadata(snapshot.metadata)) return false + + // Validation also fails if the current snapshot might depend on a v2 checkpoint. + // NOTE: Empty and preloaded checkpoint providers never reference v2 checkpoints. + snapshot.checkpointProvider match { + case p if p.isEmpty => true + case _: PreloadedCheckpointProvider => true + case lazyProvider: LazyCompleteCheckpointProvider => + lazyProvider.underlyingCheckpointProvider.isInstanceOf[PreloadedCheckpointProvider] + case _ => false + } + } + + override def actionUsesFeature(action: Action): Boolean = action match { + case m: Metadata => isV2CheckpointSupportNeededByMetadata(m) + case _: CheckpointMetadata => true + case _: SidecarFile => true + case _ => false + } + + override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand = + V2CheckpointPreDowngradeCommand(table) } /** diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index e450e8e86fc..3ef2e24d173 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -31,11 +31,13 @@ import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, A import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ +import org.apache.spark.sql.delta.util.FileNames import org.apache.spark.sql.delta.util.FileNames.{deltaFile, DeltaFile} import org.apache.spark.sql.delta.util.JsonUtils import org.apache.spark.{SparkConf, SparkThrowable} import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.util.DateTimeConstants import org.apache.spark.sql.execution.streaming.MemoryStream @@ -3139,6 +3141,139 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest expectedDowngradedProtocol = protocolWithReaderFeature(TestRemovableReaderWriterFeature)) } + private def dropV2CheckpointsTableFeature(spark: SparkSession, log: DeltaLog): Unit = { + spark.sql(s"ALTER TABLE delta.`${log.dataPath}` DROP FEATURE " + + s"`${V2CheckpointTableFeature.name}`") + } + + private def testV2CheckpointTableFeatureDrop( + v2CheckpointFormat: V2Checkpoint.Format, + withInitialV2Checkpoint: Boolean, + forceMultiPartCheckpoint: Boolean = false): Unit = { + var confs = Seq( + DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> CheckpointPolicy.V2.name, + DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> v2CheckpointFormat.name + ) + val expectedClassicCheckpointType = if (forceMultiPartCheckpoint) { + confs :+= DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> "1" + CheckpointInstance.Format.WITH_PARTS + } else { + CheckpointInstance.Format.SINGLE + } + withSQLConf(confs: _*) { + withTempPath { dir => + val clock = new ManualClock(System.currentTimeMillis()) + val targetLog = DeltaLog.forTable(spark, dir, clock) + val defaultRetentionPeriod = + DeltaConfigs.LOG_RETENTION.fromMetaData(targetLog.update().metadata).toString + + val targetDF = spark.range(start = 0, end = 100, step = 1, numPartitions = 2) + targetDF.write.format("delta").save(dir.toString) + + val initialCheckpointCount = if (withInitialV2Checkpoint) 1 else 0 + + if (withInitialV2Checkpoint) { + // Create a v2 checkpoint. + targetLog.checkpoint() + } + + // Assert that the current checkpointing policy requires v2 checkpoint support. + val preDowngradeSnapshot = targetLog.update() + assert( + DeltaConfigs.CHECKPOINT_POLICY + .fromMetaData(preDowngradeSnapshot.metadata) + .needsV2CheckpointSupport) + val checkpointFiles = targetLog.listFrom(0).filter(FileNames.isCheckpointFile) + assert(checkpointFiles.length == initialCheckpointCount) + checkpointFiles.foreach { f => + assert(CheckpointInstance(f.getPath).format == CheckpointInstance.Format.V2) + } + + // Dropping the feature should fail because + // 1. The checkpointing policy in metadata requires v2 checkpoint support. + // 2. Also, when initialCheckpointCount = true, there is a v2 checkpoint. + val e1 = intercept[DeltaTableFeatureException] { + dropV2CheckpointsTableFeature(spark, targetLog) + } + checkError( + exception = e1, + errorClass = "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD", + parameters = Map( + "feature" -> V2CheckpointTableFeature.name, + "logRetentionPeriodKey" -> "delta.logRetentionDuration", + "logRetentionPeriod" -> defaultRetentionPeriod, + "truncateHistoryLogRetentionPeriod" -> truncateHistoryDefaultLogRetention.toString)) + + val postCleanupCheckpointFiles = + targetLog.listFrom(0).filter(FileNames.isCheckpointFile).toList + + // Assert that a new classic checkpoint has been created. + val uniqueCheckpointCount = postCleanupCheckpointFiles + .drop(initialCheckpointCount) + .map { checkpointFile => + val checkpointInstance = CheckpointInstance(checkpointFile.getPath) + + assert(checkpointInstance.format == expectedClassicCheckpointType) + + checkpointInstance.version + } + // Count a multi-part checkpoint as a single checkpoint. + .toSet.size + // Drop feature command generates one classic checkpoints after v2 checkpoint cleanup. + val expectedClassicCheckpointCount = 1 + assert(uniqueCheckpointCount == expectedClassicCheckpointCount) + + spark.range(100, 120).write.format("delta").mode("append").save(dir.getCanonicalPath) + + // V2 Checkpoint related traces have not been cleaned up yet. Attempt should fail. + val e2 = intercept[DeltaTableFeatureException] { + dropV2CheckpointsTableFeature(spark, targetLog) + } + checkError( + exception = e2, + errorClass = "DELTA_FEATURE_DROP_HISTORICAL_VERSIONS_EXIST", + parameters = Map( + "feature" -> V2CheckpointTableFeature.name, + "logRetentionPeriodKey" -> "delta.logRetentionDuration", + "logRetentionPeriod" -> defaultRetentionPeriod, + "truncateHistoryLogRetentionPeriod" -> truncateHistoryDefaultLogRetention.toString)) + + // Pretend retention period has passed. + clock.advance( + targetLog.deltaRetentionMillis(targetLog.update().metadata) + + TimeUnit.HOURS.toMillis(1)) + + // History is now clean. We should be able to remove the feature. + dropV2CheckpointsTableFeature(spark, targetLog) + + val postDowngradeSnapshot = targetLog.update() + val protocol = postDowngradeSnapshot.protocol + assert(!protocol.readerFeatureNames.contains(V2CheckpointTableFeature.name)) + assert( + !DeltaConfigs.CHECKPOINT_POLICY + .fromMetaData(postDowngradeSnapshot.metadata) + .needsV2CheckpointSupport) + assert(targetLog.listFrom(0).filter(FileNames.isCheckpointFile).forall { f => + CheckpointInstance(f.getPath).format == expectedClassicCheckpointType + }) + } + } + } + + for ( + v2CheckpointFormat <- V2Checkpoint.Format.ALL; + withInitialV2Checkpoint <- BOOLEAN_DOMAIN) + test(s"Remove v2 Checkpoints Feature [v2CheckpointFormat: ${v2CheckpointFormat.name}; " + + s"withInitialV2Checkpoint: $withInitialV2Checkpoint; forceMultiPartCheckpoint: false]") { + testV2CheckpointTableFeatureDrop(v2CheckpointFormat, withInitialV2Checkpoint) + } + + test( + s"Remove v2 Checkpoints Feature [v2CheckpointFormat: ${V2Checkpoint.Format.PARQUET.name}; " + + s"withInitialV2Checkpoint: true; forceMultiPartCheckpoint: true]") { + testV2CheckpointTableFeatureDrop(V2Checkpoint.Format.PARQUET, true, true) + } + private def assertPropertiesAndShowTblProperties( deltaLog: DeltaLog, tableHasFeatures: Boolean = false): Unit = {