diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index fe04c046216..cc3908e80cd 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -489,12 +489,13 @@ class DeltaLog private( val relation = HadoopFsRelation( fileIndex, - partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(partitionSchema), + partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata( + DeltaTableUtils.removeInternalMetadata(spark, partitionSchema)), // We pass all table columns as `dataSchema` so that Spark will preserve the partition column // locations. Otherwise, for any partition columns not in `dataSchema`, Spark would just // append them to the end of `dataSchema`. dataSchema = DeltaColumnMapping.dropColumnMappingMetadata( - ColumnWithDefaultExprUtils.removeDefaultExpressions(metadata.schema)), + DeltaTableUtils.removeInternalMetadata(spark, metadata.schema)), bucketSpec = None, fileFormat(snapshot.protocol, metadata), hadoopOptions)(spark) @@ -542,7 +543,7 @@ class DeltaLog private( // locations. Otherwise, for any partition columns not in `dataSchema`, Spark would just // append them to the end of `dataSchema` dataSchema = DeltaColumnMapping.dropColumnMappingMetadata( - ColumnWithDefaultExprUtils.removeDefaultExpressions( + DeltaTableUtils.removeInternalMetadata(spark, SchemaUtils.dropNullTypeColumns(snapshotToUse.metadata.schema))), bucketSpec = bucketSpec, fileFormat(snapshotToUse.protocol, snapshotToUse.metadata), diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala index fcf6937b682..5eb19562908 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala @@ -21,7 +21,7 @@ import scala.util.{Failure, Success, Try} import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging -import org.apache.spark.sql.delta.sources.DeltaSourceUtils +import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SparkSession @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform} import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ /** * Extractor Object for pulling out the table scan of a Delta table. It could be a full scan @@ -418,4 +419,52 @@ object DeltaTableUtils extends PredicateHelper new Path(basePath, relativeChildPath) } } + + /** + * A list of Spark internal metadata keys that we may save in a Delta table schema + * unintentionally due to SPARK-43123. We need to remove them before handing over the schema to + * Spark to avoid Spark interpreting table columns incorrectly. + * + * Hard-coded strings are used intentionally as we want to capture possible keys used before + * SPARK-43123 regardless Spark versions. For example, if Spark changes any key string in future + * after SPARK-43123, the new string won't be leaked, but we still want to clean up the old key. + */ + val SPARK_INTERNAL_METADATA_KEYS = Seq( + "__autoGeneratedAlias", + "__metadata_col", + "__supports_qualified_star", // A key used by an old version. Doesn't exist in latest code + "__qualified_access_only", + "__file_source_metadata_col", + "__file_source_constant_metadata_col", + "__file_source_generated_metadata_col" + ) + + /** + * Remove leaked metadata keys from the persisted table schema. Old versions might leak metadata + * intentionally. This method removes all possible metadata keys to avoid Spark interpreting + * table columns incorrectly. + */ + def removeInternalMetadata(spark: SparkSession, persistedSchema: StructType): StructType = { + val schema = ColumnWithDefaultExprUtils.removeDefaultExpressions(persistedSchema) + if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SCHEMA_REMOVE_SPARK_INTERNAL_METADATA)) { + var updated = false + val updatedSchema = schema.map { field => + if (SPARK_INTERNAL_METADATA_KEYS.exists(field.metadata.contains)) { + updated = true + val newMetadata = new MetadataBuilder().withMetadata(field.metadata) + SPARK_INTERNAL_METADATA_KEYS.foreach(newMetadata.remove) + field.copy(metadata = newMetadata.build()) + } else { + field + } + } + if (updated) { + StructType(updatedSchema) + } else { + schema + } + } else { + schema + } + } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index a780259a9ef..8b187150d94 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -128,7 +128,7 @@ case class DeltaTableV2( private lazy val tableSchema: StructType = DeltaColumnMapping.dropColumnMappingMetadata( - ColumnWithDefaultExprUtils.removeDefaultExpressions(snapshot.schema)) + DeltaTableUtils.removeInternalMetadata(spark, snapshot.schema)) override def schema(): StructType = tableSchema diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala index 1a1d687c013..425cec3573e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala @@ -95,7 +95,7 @@ class DeltaDataSource .getOrElse(snapshot.schema) } - val schemaToUse = ColumnWithDefaultExprUtils.removeDefaultExpressions(readSchema) + val schemaToUse = DeltaTableUtils.removeInternalMetadata(sqlContext.sparkSession, readSchema) if (schemaToUse.isEmpty) { throw DeltaErrors.schemaNotSetException } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 9e704fd54d1..6e6c8b3125c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -336,6 +336,16 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_SCHEMA_REMOVE_SPARK_INTERNAL_METADATA = + buildConf("schema.removeSparkInternalMetadata") + .doc( + """Whether to remove leaked Spark's internal metadata from the table schema before returning + |to Spark. These internal metadata might be stored unintentionally in tables created by + |old Spark versions""".stripMargin) + .internal() + .booleanConf + .createWithDefault(true) + val DELTA_ASSUMES_DROP_CONSTRAINT_IF_EXISTS = buildConf("constraints.assumesDropIfExists.enabled") .doc("""If true, DROP CONSTRAINT quietly drops nonexistent constraints even without diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 5587d8f6f52..d673e10aee5 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -166,8 +166,7 @@ trait DeltaSourceBase extends Source protected var hasCheckedReadIncompatibleSchemaChangesOnStreamStart: Boolean = false override val schema: StructType = { - val schemaWithoutCDC = - ColumnWithDefaultExprUtils.removeDefaultExpressions(readSchemaAtSourceInit) + val schemaWithoutCDC = DeltaTableUtils.removeInternalMetadata(spark, readSchemaAtSourceInit) if (options.readChangeFeed) { CDCReader.cdcReadSchema(schemaWithoutCDC) } else { diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableUtilsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableUtilsSuite.scala index 3d8de5a3517..3532f9208ad 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableUtilsSuite.scala @@ -18,11 +18,14 @@ package org.apache.spark.sql.delta import java.net.URI +import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.hadoop.fs.{Path, RawLocalFileSystem} import org.apache.spark.SparkConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ class DeltaTableUtilsSuite extends SharedSparkSession with DeltaSQLCommandTest { @@ -49,6 +52,32 @@ class DeltaTableUtilsSuite extends SharedSparkSession with DeltaSQLCommandTest { assert(DeltaTableUtils.safeConcatPaths(basePathEmpty, "_delta_log") == new Path("s3://my-bucket/_delta_log")) } + + test("removeInternalMetadata") { + for (flag <- BOOLEAN_DOMAIN) { + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_REMOVE_SPARK_INTERNAL_METADATA.key -> flag.toString) { + for (internalMetadataKey <- DeltaTableUtils.SPARK_INTERNAL_METADATA_KEYS) { + val metadata = new MetadataBuilder() + .putString(internalMetadataKey, "foo") + .putString("other", "bar") + .build() + val schema = StructType(Seq(StructField("foo", StringType, metadata = metadata))) + val newSchema = DeltaTableUtils.removeInternalMetadata(spark, schema) + newSchema.foreach { f => + if (flag) { + // Flag on: should remove internal metadata + assert(!f.metadata.contains(internalMetadataKey)) + // Should reserve non internal metadata + assert(f.metadata.contains("other")) + } else { + // Flag off: no-op + assert(f.metadata == metadata) + } + } + } + } + } + } } private class MockS3FileSystem extends RawLocalFileSystem {