diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 7fbf7dc935c..d07b409c87f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1174,6 +1174,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite fromProtocol = snapshot.protocol, toProtocol = p, isCreatingNewTable) + DeltaTableV2.withEnrichedUnsupportedTableException(catalogTable) { + deltaLog.protocolWrite(p) + } case d: DomainMetadata => numOfDomainMetadatas += 1 case _ => @@ -1181,6 +1184,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite action } + // Validate protocol support, specifically writer features. + DeltaTableV2.withEnrichedUnsupportedTableException(catalogTable) { + deltaLog.protocolWrite(snapshot.protocol) + } + allActions = RowId.assignFreshRowIds(protocol, snapshot, allActions) allActions = DefaultRowCommitVersion .assignIfMissing(protocol, allActions, getFirstAttemptVersion) @@ -1340,6 +1348,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite protocolChanges.foreach { p => newProtocol = Some(p) recordProtocolChanges("delta.protocol.change", snapshot.protocol, p, isCreatingNewTable) + DeltaTableV2.withEnrichedUnsupportedTableException(catalogTable) { + deltaLog.protocolWrite(p) + } } // Now, we know that there is at most 1 Metadata change (stored in newMetadata) and at most 1 @@ -1430,6 +1441,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite } DeltaTableV2.withEnrichedUnsupportedTableException(catalogTable) { + newProtocol.foreach(deltaLog.protocolWrite) deltaLog.protocolWrite(snapshot.protocol) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index 7ab12d0ee64..9c74a585b86 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import java.io.File +import java.nio.file.{Files, Paths, StandardOpenOption} import java.util.Locale import java.util.concurrent.TimeUnit @@ -35,11 +36,10 @@ import org.apache.spark.sql.delta.util.FileNames import org.apache.spark.sql.delta.util.FileNames.{deltaFile, DeltaFile} import org.apache.spark.sql.delta.util.JsonUtils -import org.apache.spark.{SparkConf, SparkThrowable} +import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.util.DateTimeConstants import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType @@ -3467,6 +3467,85 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest testV2CheckpointTableFeatureDrop(V2Checkpoint.Format.PARQUET, true, true) } + // Create a table for testing that has an unsupported feature. + private def withTestTableWithUnsupportedWriterFeature( + emptyTable: Boolean)(testCode: String => Unit): Unit = { + val tableName = "test_table" + withTable(tableName) { + if (emptyTable) { + sql(s"CREATE TABLE $tableName(id INT) USING DELTA") + } else { + sql(s"CREATE TABLE $tableName USING DELTA AS SELECT 1 AS id") + } + + sql(s"""ALTER TABLE $tableName + SET TBLPROPERTIES ('delta.minReaderVersion' = '3', 'delta.minWriterVersion' = '7')""") + + val deltaLogPath = DeltaLog.forTable(spark, TableIdentifier(tableName)).logPath + .toString.stripPrefix("file:") + + // scalastyle:off + val commitJson = + """{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1702304249309}} + |{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":[],"writerFeatures":["unsupportedWriter"]}}""".stripMargin + // scalastyle:on + + Files.write(Paths.get(deltaLogPath, "00000000000000000002.json"), commitJson.getBytes) + + testCode(tableName) + } + } + + // Test that write commands error out when unsupported features in the table protocol. + private def testUnsupportedFeature( + commandName: String, emptyTable: Boolean)(command: String => Unit): Unit = { + test(s"Writes using $commandName error out when unsupported writer features are present") { + withTestTableWithUnsupportedWriterFeature(emptyTable) { tableName => + intercept[DeltaUnsupportedTableFeatureException] { + command(tableName) + } + } + } + } + + testUnsupportedFeature("INSERT", emptyTable = true) { testTableName => + sql(s"INSERT INTO $testTableName VALUES (2)") + } + + testUnsupportedFeature("UPDATE", emptyTable = false) { testTableName => + sql(s"UPDATE $testTableName SET id = 2") + } + + testUnsupportedFeature("DELETE", emptyTable = false) { testTableName => + sql(s"DELETE FROM $testTableName WHERE id > 0") + } + + testUnsupportedFeature("MERGE", emptyTable = false) { testTableName => + sql(s"""MERGE INTO $testTableName t + |USING $testTableName s + |ON s.id = t.id + 100 + |WHEN NOT MATCHED THEN INSERT *""".stripMargin) + } + + testUnsupportedFeature("CREATE OR REPLACE TABLE", emptyTable = false) { testTableName => + sql(s"CREATE OR REPLACE TABLE $testTableName (other_column INT) USING DELTA") + } + + testUnsupportedFeature("ManualUpdate commit", emptyTable = true) { testTableName => + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(testTableName)) + deltaLog.startTransaction(None) + .commit(Seq(DeltaTestUtils.createTestAddFile()), DeltaOperations.ManualUpdate) + } + + testUnsupportedFeature("SHALLOW CLONE", emptyTable = true) { testTableName => + val cloneSourceTableName = "clone_source_table" + withTable(cloneSourceTableName) { + sql(s"DELETE FROM $testTableName") + sql(s"CREATE TABLE $cloneSourceTableName USING delta AS SELECT 1337 as id") + sql(s"CREATE OR REPLACE TABLE $testTableName SHALLOW CLONE $cloneSourceTableName") + } + } + private def assertPropertiesAndShowTblProperties( deltaLog: DeltaLog, tableHasFeatures: Boolean = false): Unit = {