Skip to content

Commit

Permalink
[Spark] Validate writer features in all commit operations
Browse files Browse the repository at this point in the history
Add protocol checks for all commit paths where they are missing. Before, it was for example possible to clone into a table even though a writer feature on that table was not supported.

Add new tests for all the DML commands.

Closes #2373

GitOrigin-RevId: b45a480eb6a887689f46e84596676d14042e616f
  • Loading branch information
olaky authored and vkorukanti committed Dec 20, 2023
1 parent 930f741 commit e0835ba
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1174,13 +1174,21 @@ trait OptimisticTransactionImpl extends TransactionalWrite
fromProtocol = snapshot.protocol,
toProtocol = p,
isCreatingNewTable)
DeltaTableV2.withEnrichedUnsupportedTableException(catalogTable) {
deltaLog.protocolWrite(p)
}
case d: DomainMetadata =>
numOfDomainMetadatas += 1
case _ =>
}
action
}

// Validate protocol support, specifically writer features.
DeltaTableV2.withEnrichedUnsupportedTableException(catalogTable) {
deltaLog.protocolWrite(snapshot.protocol)
}

allActions = RowId.assignFreshRowIds(protocol, snapshot, allActions)
allActions = DefaultRowCommitVersion
.assignIfMissing(protocol, allActions, getFirstAttemptVersion)
Expand Down Expand Up @@ -1340,6 +1348,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
protocolChanges.foreach { p =>
newProtocol = Some(p)
recordProtocolChanges("delta.protocol.change", snapshot.protocol, p, isCreatingNewTable)
DeltaTableV2.withEnrichedUnsupportedTableException(catalogTable) {
deltaLog.protocolWrite(p)
}
}

// Now, we know that there is at most 1 Metadata change (stored in newMetadata) and at most 1
Expand Down Expand Up @@ -1430,6 +1441,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}

DeltaTableV2.withEnrichedUnsupportedTableException(catalogTable) {
newProtocol.foreach(deltaLog.protocolWrite)
deltaLog.protocolWrite(snapshot.protocol)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import java.io.File
import java.nio.file.{Files, Paths, StandardOpenOption}
import java.util.Locale
import java.util.concurrent.TimeUnit

Expand All @@ -35,11 +36,10 @@ import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.FileNames.{deltaFile, DeltaFile}
import org.apache.spark.sql.delta.util.JsonUtils

import org.apache.spark.{SparkConf, SparkThrowable}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.DateTimeConstants
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -3467,6 +3467,85 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
testV2CheckpointTableFeatureDrop(V2Checkpoint.Format.PARQUET, true, true)
}

// Create a table for testing that has an unsupported feature.
private def withTestTableWithUnsupportedWriterFeature(
emptyTable: Boolean)(testCode: String => Unit): Unit = {
val tableName = "test_table"
withTable(tableName) {
if (emptyTable) {
sql(s"CREATE TABLE $tableName(id INT) USING DELTA")
} else {
sql(s"CREATE TABLE $tableName USING DELTA AS SELECT 1 AS id")
}

sql(s"""ALTER TABLE $tableName
SET TBLPROPERTIES ('delta.minReaderVersion' = '3', 'delta.minWriterVersion' = '7')""")

val deltaLogPath = DeltaLog.forTable(spark, TableIdentifier(tableName)).logPath
.toString.stripPrefix("file:")

// scalastyle:off
val commitJson =
"""{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1702304249309}}
|{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":[],"writerFeatures":["unsupportedWriter"]}}""".stripMargin
// scalastyle:on

Files.write(Paths.get(deltaLogPath, "00000000000000000002.json"), commitJson.getBytes)

testCode(tableName)
}
}

// Test that write commands error out when unsupported features in the table protocol.
private def testUnsupportedFeature(
commandName: String, emptyTable: Boolean)(command: String => Unit): Unit = {
test(s"Writes using $commandName error out when unsupported writer features are present") {
withTestTableWithUnsupportedWriterFeature(emptyTable) { tableName =>
intercept[DeltaUnsupportedTableFeatureException] {
command(tableName)
}
}
}
}

testUnsupportedFeature("INSERT", emptyTable = true) { testTableName =>
sql(s"INSERT INTO $testTableName VALUES (2)")
}

testUnsupportedFeature("UPDATE", emptyTable = false) { testTableName =>
sql(s"UPDATE $testTableName SET id = 2")
}

testUnsupportedFeature("DELETE", emptyTable = false) { testTableName =>
sql(s"DELETE FROM $testTableName WHERE id > 0")
}

testUnsupportedFeature("MERGE", emptyTable = false) { testTableName =>
sql(s"""MERGE INTO $testTableName t
|USING $testTableName s
|ON s.id = t.id + 100
|WHEN NOT MATCHED THEN INSERT *""".stripMargin)
}

testUnsupportedFeature("CREATE OR REPLACE TABLE", emptyTable = false) { testTableName =>
sql(s"CREATE OR REPLACE TABLE $testTableName (other_column INT) USING DELTA")
}

testUnsupportedFeature("ManualUpdate commit", emptyTable = true) { testTableName =>
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(testTableName))
deltaLog.startTransaction(None)
.commit(Seq(DeltaTestUtils.createTestAddFile()), DeltaOperations.ManualUpdate)
}

testUnsupportedFeature("SHALLOW CLONE", emptyTable = true) { testTableName =>
val cloneSourceTableName = "clone_source_table"
withTable(cloneSourceTableName) {
sql(s"DELETE FROM $testTableName")
sql(s"CREATE TABLE $cloneSourceTableName USING delta AS SELECT 1337 as id")
sql(s"CREATE OR REPLACE TABLE $testTableName SHALLOW CLONE $cloneSourceTableName")
}
}

private def assertPropertiesAndShowTblProperties(
deltaLog: DeltaLog,
tableHasFeatures: Boolean = false): Unit = {
Expand Down

0 comments on commit e0835ba

Please sign in to comment.