From 30fa2c5842c32aebbc05b8cad9f6313c3db7df20 Mon Sep 17 00:00:00 2001 From: Jackie Zhang Date: Tue, 18 Apr 2023 21:32:56 -0700 Subject: [PATCH] Fix the streaming unsafe escape flag for non-additive schema changes. GitOrigin-RevId: 9dfe9417689563810fa63fd10c035937ea8c6d5b --- .../resources/error/delta-error-classes.json | 5 +- .../apache/spark/sql/delta/DeltaErrors.scala | 10 ++- .../spark/sql/delta/schema/SchemaUtils.scala | 7 +- .../spark/sql/delta/sources/DeltaSource.scala | 24 ++++- .../spark/sql/delta/DeltaErrorsSuite.scala | 8 +- .../delta/DeltaSourceColumnMappingSuite.scala | 88 +++++++++++++++++-- 6 files changed, 124 insertions(+), 18 deletions(-) diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index d082ad0eab0..a5e72eef0d8 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -1566,6 +1566,7 @@ "DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE" : { "message" : [ "Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).", + "For further information and possible next steps to resolve this issue, please review the documentation at ", "Read schema: . Incompatible data schema: ." ], "sqlState" : "42KD4" @@ -1573,9 +1574,9 @@ "DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_SCHEMA_LOG" : { "message" : [ "Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).", - "Read schema: . Incompatible data schema: .", "Please provide a 'schemaTrackingLocation' to enable non-additive schema evolution for Delta stream processing.", - "See for more details." + "See for more details.", + "Read schema: . Incompatible data schema: ." ], "sqlState" : "42KD4" }, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index ce65551d9fa..d91455baf35 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -101,7 +101,8 @@ trait DocsPath { "icebergClassMissing", "tableFeatureReadRequiresWriteException", "tableFeatureRequiresHigherReaderProtocolVersion", - "tableFeatureRequiresHigherWriterProtocolVersion" + "tableFeatureRequiresHigherWriterProtocolVersion", + "blockStreamingReadsWithIncompatibleColumnMappingSchemaChanges" ) } @@ -2557,12 +2558,13 @@ trait DeltaErrorsBase readSchema: StructType, incompatibleSchema: StructType, detectedDuringStreaming: Boolean): Throwable = { + val docLink = "/versioning.html#column-mapping" val enableNonAdditiveSchemaEvolution = spark.sessionState.conf.getConf( DeltaSQLConf.DELTA_STREAMING_ENABLE_NON_ADDITIVE_SCHEMA_EVOLUTION) new DeltaStreamingColumnMappingSchemaIncompatibleException( readSchema, incompatibleSchema, - "", + generateDocsLink(spark.sparkContext.getConf, docLink), enableNonAdditiveSchemaEvolution, additionalProperties = Map( "detectedDuringStreaming" -> detectedDuringStreaming.toString @@ -3109,7 +3111,7 @@ class DeltaStreamingColumnMappingSchemaIncompatibleException( "DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE" }, messageParameters = Array( + docLink, readSchema.json, - incompatibleSchema.json, - docLink) + incompatibleSchema.json) ) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index 8f05a9d48b4..820d3cd0db0 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -231,7 +231,7 @@ object SchemaUtils extends DeltaLogging { * As the Delta snapshots update, the schema may change as well. This method defines whether the * new schema of a Delta table can be used with a previously analyzed LogicalPlan. Our * rules are to return false if: - * - Dropping any column that was present in the existing schema + * - Dropping any column that was present in the existing schema, if not allowMissingColumns * - Any change of datatype * - If `forbidTightenNullability` = true: * - Forbids tightening the nullability (existing nullable=true -> read nullable=false) @@ -249,7 +249,8 @@ object SchemaUtils extends DeltaLogging { def isReadCompatible( existingSchema: StructType, readSchema: StructType, - forbidTightenNullability: Boolean = false): Boolean = { + forbidTightenNullability: Boolean = false, + allowMissingColumns: Boolean = false): Boolean = { def isNullabilityCompatible(existingNullable: Boolean, readNullable: Boolean): Boolean = { if (forbidTightenNullability) { @@ -287,7 +288,7 @@ object SchemaUtils extends DeltaLogging { "Delta tables don't allow field names that only differ by case") // scalastyle:on caselocale - if (!existingFieldNames.subsetOf(newFields)) { + if (!allowMissingColumns && !existingFieldNames.subsetOf(newFields)) { // Dropped a column that was present in the DataFrame schema return false } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 4d773410502..5587d8f6f52 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -131,13 +131,18 @@ trait DeltaSourceBase extends Source protected lazy val forceEnableUnsafeReadOnNullabilityChange = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STREAM_UNSAFE_READ_ON_NULLABILITY_CHANGE) + /** + * Whether we are streaming from a table with column mapping enabled + */ + protected val isStreamingFromColumnMappingTable: Boolean = + snapshotAtSourceInit.metadata.columnMappingMode != NoMapping + /** * Whether we should explicitly verify column mapping related schema changes such as rename or * drop columns. */ protected lazy val shouldVerifyColumnMappingSchemaChanges = - snapshotAtSourceInit.metadata.columnMappingMode != NoMapping && - !forceEnableStreamingReadOnColumnMappingSchemaChanges + isStreamingFromColumnMappingTable && !forceEnableStreamingReadOnColumnMappingSchemaChanges /** * The persisted schema from the schema log that must be used to read data files in this Delta @@ -531,7 +536,20 @@ trait DeltaSourceBase extends Source // because we don't ever want to read back any nulls when the read schema is non-nullable. val shouldForbidTightenNullability = !forceEnableUnsafeReadOnNullabilityChange if (!SchemaUtils.isReadCompatible( - schemaChange, schema, forbidTightenNullability = shouldForbidTightenNullability)) { + schemaChange, schema, + forbidTightenNullability = shouldForbidTightenNullability, + // If a user is streaming from a column mapping table and enable the unsafe flag to ignore + // column mapping schema changes, we can allow the standard check to allow missing columns + // from the read schema in the schema change, because the only case that happens is when + // user rename/drops column but they don't care so they enabled the flag to unblock. + // This is only allowed when we are "backfilling", i.e. the stream progress is older than + // the analyzed table version. Any schema change past the analysis should still throw + // exception, because additive schema changes MUST be taken into account. + allowMissingColumns = + isStreamingFromColumnMappingTable && + forceEnableStreamingReadOnColumnMappingSchemaChanges && + backfilling + )) { // Only schema change later than the current read snapshot/schema can be retried, in other // words, backfills could never be retryable, because we have no way to refresh // the latest schema to "catch up" when the schema change happens before than current read diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index c0437455e70..1b969f10104 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -97,7 +97,13 @@ trait DeltaErrorsSuiteBase DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion( feature = "feature", currentVersion = 1, - requiredVersion = 7) + requiredVersion = 7), + "blockStreamingReadsWithIncompatibleColumnMappingSchemaChanges" -> + DeltaErrors.blockStreamingReadsWithIncompatibleColumnMappingSchemaChanges( + spark, + StructType.fromDDL("id int"), + StructType.fromDDL("id2 int"), + detectedDuringStreaming = true) ) def otherMessagesToTest: Map[String, String] = Map( diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala index fb5a905c391..531af7ace73 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceColumnMappingSuite.scala @@ -27,12 +27,13 @@ import org.apache.spark.sql.delta.sources.{DeltaSource, DeltaSQLConf} import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.JsonUtils +import org.apache.commons.io.FileUtils import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingExecutionRelation} -import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery, StreamTest} +import org.apache.spark.sql.streaming.{DataStreamReader, StreamTest} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.Utils @@ -214,11 +215,11 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream writeDeltaData(0 until 5, deltaLog, Some(StructType.fromDDL("id string, name string"))) } - def df: DataFrame = dropCDCFields(dsr.load(inputDir.getCanonicalPath)) + def createNewDf(): DataFrame = dropCDCFields(dsr.load(inputDir.getCanonicalPath)) val checkpointDir = new File(inputDir, "_checkpoint") - testStream(df)( + testStream(createNewDf())( StartStream(checkpointLocation = checkpointDir.getCanonicalPath), ProcessAllAvailable(), CheckAnswer((0 until 5).map(i => (i.toString, i.toString)): _*), @@ -247,7 +248,7 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream ) // but should not block after restarting, now in column mapping mode - testStream(df)( + testStream(createNewDf())( StartStream(checkpointLocation = checkpointDir.getCanonicalPath), ProcessAllAvailable(), // Sink is reinitialized, only 10-15 are ingested @@ -258,7 +259,7 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream // use a different checkpoint to simulate a clean stream restart val checkpointDir2 = new File(inputDir, "_checkpoint2") - testStream(df)( + testStream(createNewDf())( StartStream(checkpointLocation = checkpointDir2.getCanonicalPath), ProcessAllAvailable(), // Since the latest schema contain the additional column, it is null for previous batches. @@ -630,6 +631,83 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream ) } } + + test("unsafe flag can unblock drop or rename column") { + // upgrade should not blocked both during the stream AND during stream restart + withTempDir { inputDir => + Seq( + s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` DROP COLUMN value", + s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` RENAME COLUMN value TO value2" + ).foreach { schemaChangeQuery => + FileUtils.deleteDirectory(inputDir) + val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) + withColumnMappingConf("none") { + writeDeltaData(0 until 5, deltaLog, + Some(StructType.fromDDL("id string, value string"))) + } + + def createNewDf(): DataFrame = dropCDCFields(dsr.load(inputDir.getCanonicalPath)) + + val checkpointDir = new File(inputDir, s"_checkpoint_${schemaChangeQuery.hashCode}") + val isRename = schemaChangeQuery.contains("RENAME") + testStream(createNewDf())( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + ProcessAllAvailable(), + CheckAnswer((0 until 5).map(i => (i.toString, i.toString)): _*), + Execute { _ => + sql( + s""" + |ALTER TABLE delta.`${inputDir.getCanonicalPath}` + |SET TBLPROPERTIES ( + | ${DeltaConfigs.COLUMN_MAPPING_MODE.key} = "name", + | ${DeltaConfigs.MIN_READER_VERSION.key} = "2", + | ${DeltaConfigs.MIN_WRITER_VERSION.key} = "5")""".stripMargin) + // Add another schema change to ensure even after enable the flag, we would still hit + // a schema change with more columns than read schema so `verifySchemaChange` would see + // that can complain. + sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` ADD COLUMN (random STRING)") + sql(schemaChangeQuery) + writeDeltaData(5 until 10, deltaLog) + }, + ProcessAllAvailableIgnoreError, + ExistingRetryableInStreamSchemaChangeFailure + ) + + // Without the flag it would still fail + testStream(createNewDf())( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + ProcessAllAvailableIgnoreError, + CheckAnswer(Nil: _*), + ExpectStreamStartInCompatibleSchemaFailure + ) + + val checkExpectedResult = if (isRename) { + CheckAnswer((5 until 10).map(i => (i.toString, i.toString, i.toString)): _*) + } else { + CheckAnswer((5 until 10).map(i => (i.toString, i.toString)): _*) + } + + withSQLConf(DeltaSQLConf + .DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES + .key -> "true") { + testStream(createNewDf())( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + // The processing will pass, ignoring any schema column missing in the backfill. + ProcessAllAvailable(), + // Show up as dropped column + checkExpectedResult, + Execute { _ => + // But any schema change post the stream analysis would still cause exceptions + // as usual, which is critical to avoid data loss. + sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` ADD COLUMN (random2 STRING)") + }, + ProcessAllAvailableIgnoreError, + ExistingRetryableInStreamSchemaChangeFailure + ) + } + } + } + } } trait DeltaSourceColumnMappingSuiteBase extends DeltaColumnMappingSelectedTestMixin {