Skip to content

Commit

Permalink
Drop Type widening feature: read Parquet footers to collect files to …
Browse files Browse the repository at this point in the history
…rewrite
  • Loading branch information
johanl-db committed May 24, 2024
1 parent 039a29a commit 8fe1968
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec}
import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingCommand
import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
import org.apache.spark.sql.delta.managedcommit.ManagedCommitUtils
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}
import org.apache.spark.sql.util.ScalaExtensions._

Expand Down Expand Up @@ -309,8 +309,9 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
* @return Return the number of files rewritten.
*/
private def rewriteFilesIfNeeded(): Long = {
val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot)
if (numFilesToRewrite == 0L) return 0L
if (!TypeWideningMetadata.containsTypeWideningMetadata(table.initialSnapshot.schema)) {
return 0L
}

// Wrap `table` in a ResolvedTable that can be passed to DeltaReorgTableCommand. The catalog &
// table ID won't be used by DeltaReorgTableCommand.
Expand All @@ -323,8 +324,9 @@ case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
reorgTableSpec = DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None)
)(Nil)

reorg.run(table.spark)
numFilesToRewrite
val rows = reorg.run(table.spark)
val metrics = rows.head.getAs[OptimizeMetrics](1)
metrics.numFilesRemoved
}

/**
Expand Down
29 changes: 0 additions & 29 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,33 +111,4 @@ object TypeWidening {
)
}
}

/**
* Filter the given list of files to only keep files that were written before the latest type
* change, if any. These older files contain a column or field with a type that is different than
* in the current table schema and must be rewritten when dropping the type widening table feature
* to make the table readable by readers that don't support the feature.
*/
def filterFilesRequiringRewrite(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
TypeWideningMetadata.getLatestTypeChangeVersion(snapshot.metadata.schema) match {
case Some(latestVersion) =>
files.filter(_.defaultRowCommitVersion match {
case Some(version) => version < latestVersion
// Files written before the type widening table feature was added to the table don't
// have a defaultRowCommitVersion. That does mean they were written before the latest
// type change.
case None => true
})
case None =>
Seq.empty
}


/**
* Return the number of files that were written before the latest type change and that then
* contain a column or field with a type that is different from the current able schema.
*/
def numFilesRequiringRewrite(snapshot: Snapshot): Long = {
filterFilesRequiringRewrite(snapshot, snapshot.allFiles.collect()).size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta.{Snapshot, TypeWidening}
import org.apache.spark.sql.delta.{DeltaColumnMapping, Snapshot}
import org.apache.spark.sql.delta.actions.AddFile

import org.apache.spark.sql.{Row, SparkSession}
Expand Down Expand Up @@ -97,14 +97,15 @@ sealed trait DeltaReorgOperation {
* Collects files that need to be processed by the reorg operation from the list of candidate
* files.
*/
def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile]
def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile]
}

/**
* Reorg operation to purge files with soft deleted rows.
*/
class DeltaPurgeOperation extends DeltaReorgOperation {
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
override def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile])
: Seq[AddFile] =
files.filter { file =>
(file.deletionVector != null && file.numPhysicalRecords.isEmpty) ||
file.numDeletedRecords > 0L
Expand All @@ -115,7 +116,8 @@ class DeltaPurgeOperation extends DeltaReorgOperation {
* Reorg operation to upgrade the iceberg compatibility version of a table.
*/
class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorgOperation {
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = {
override def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile])
: Seq[AddFile] = {
def shouldRewriteToBeIcebergCompatible(file: AddFile): Boolean = {
if (file.tags == null) return true
val icebergCompatVersion = file.tags.getOrElse(AddFile.Tags.ICEBERG_COMPAT_VERSION.name, "0")
Expand All @@ -129,7 +131,12 @@ class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorg
* Internal reorg operation to rewrite files to conform to the current table schema when dropping
* the type widening table feature.
*/
class DeltaRewriteTypeWideningOperation extends DeltaReorgOperation {
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
TypeWidening.filterFilesRequiringRewrite(snapshot, files)
class DeltaRewriteTypeWideningOperation extends DeltaReorgOperation with ReorgTableHelper {
override def filterFilesToReorg(spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile])
: Seq[AddFile] = {
val physicalSchema = DeltaColumnMapping.renameColumns(snapshot.schema)
filterParquetFiles(spark, snapshot, files) {
schema => fileHasDifferentTypes(schema, physicalSchema)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,10 @@ class OptimizeExecutor(
val partitionSchema = txn.metadata.partitionSchema

val filesToProcess = optimizeContext.reorg match {
case Some(reorgOperation) => reorgOperation.filterFilesToReorg(txn.snapshot, candidateFiles)
case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
case Some(reorgOperation) =>
reorgOperation.filterFilesToReorg(sparkSession, txn.snapshot, candidateFiles)
case None =>
filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
}
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.commands.optimize.OptimizeRunner.generateCandidateFileMap
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetToSparkSchemaConverter}
import org.apache.spark.sql.types.{AtomicType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

trait ReorgTableHelper {
/**
* Determine whether `fileSchema` has any columns that has a type that differ from
* `tablePhysicalSchema`.
*/
private[databricks] def fileHasDifferentTypes(
fileSchema: StructType,
tablePhysicalSchema: StructType): Boolean = {
SchemaMergingUtils.transformColumns(fileSchema, tablePhysicalSchema) {
case (_, StructField(_, fileType: AtomicType, _, _),
Some(StructField(_, tableType: AtomicType, _, _)), _) if fileType != tableType =>
return true
case (_, field, _, _) => field
}
false
}

/**
* Apply a filter on the list of AddFile to only keep the files that have their physical parquet
* schema that satisfies the given filter function.
*/
def filterParquetFiles(
spark: SparkSession, snapshot: Snapshot, files: Seq[AddFile])(
filterFileFn: StructType => Boolean)
: Seq[AddFile] = {
val serializedConf = new SerializableConfiguration(spark.sessionState.newHadoopConf())
val ignoreCorruptFiles = spark.sessionState.conf.ignoreCorruptFiles
val assumeBinaryIsString = spark.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = spark.sessionState.conf.isParquetINT96AsTimestamp
val dataPath = new Path(snapshot.deltaLog.dataPath.toString)

filterParquetFiles(files, dataPath, serializedConf.value, ignoreCorruptFiles,
assumeBinaryIsString, assumeInt96IsTimestamp)(filterFileFn)
}

protected def filterParquetFiles(
files: Seq[AddFile],
dataPath: Path,
configuration: Configuration,
ignoreCorruptFiles: Boolean,
assumeBinaryIsString: Boolean,
assumeInt96IsTimestamp: Boolean)(
filterFileFn: StructType => Boolean): Seq[AddFile] = {
val nameToAddFileMap = generateCandidateFileMap(dataPath, files)

val fileStatuses = nameToAddFileMap.map { case (absPath, addFile) =>
new FileStatus(
/* length */ addFile.size,
/* isDir */ false,
/* blockReplication */ 0,
/* blockSize */ 1,
/* modificationTime */ addFile.modificationTime,
new Path(absPath)
)
}

val footers = DeltaFileOperations.readParquetFootersInParallel(
configuration,
fileStatuses.toList,
ignoreCorruptFiles)

val converter =
new ParquetToSparkSchemaConverter(assumeBinaryIsString, assumeInt96IsTimestamp)

val filesNeedToRewrite = footers.filter { footer =>
val fileSchema = ParquetFileFormat.readSchemaFromFooter(footer, converter)
filterFileFn(fileSchema)
}.map(_.getFile.toString)
filesNeedToRewrite.map(absPath => nameToAddFileMap(absPath))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit

import com.databricks.spark.util.Log4jUsageLogger
import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate
import org.apache.spark.sql.delta.actions.RemoveFile
import org.apache.spark.sql.delta.actions.{RemoveFile, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.AlterTableDropFeatureDeltaCommand
import org.apache.spark.sql.delta.commands.cdc.CDCReader
Expand Down Expand Up @@ -72,6 +72,7 @@ trait DeltaTypeWideningTestMixin extends SharedSparkSession with DeltaDMLTestUti
protected override def sparkConf: SparkConf = {
super.sparkConf
.set(DeltaConfigs.ENABLE_TYPE_WIDENING.defaultTablePropertyKey, "true")
.set(TableFeatureProtocolUtils.defaultPropertyKey(TimestampNTZTableFeature), "supported")
// Ensure we don't silently cast test inputs to null on overflow.
.set(SQLConf.ANSI_ENABLED.key, "true")
}
Expand Down Expand Up @@ -1012,7 +1013,7 @@ trait DeltaTypeWideningStatsTests {
* Tests covering adding and removing the type widening table feature. Dropping the table feature
* also includes rewriting data files with the old type and removing type widening metadata.
*/
trait DeltaTypeWideningTableFeatureTests {
trait DeltaTypeWideningTableFeatureTests extends DeltaTypeWideningTestCases {
self: QueryTest
with ParquetTest
with RowTrackingTestUtils
Expand Down Expand Up @@ -1346,6 +1347,21 @@ trait DeltaTypeWideningTableFeatureTests {
}
}

for {
testCase <- supportedTestCases
}
test(s"drop feature after type change ${testCase.fromType.sql} -> ${testCase.toType.sql}") {
append(testCase.initialValuesDF.repartition(2))
sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE ${testCase.toType.sql}")
append(testCase.additionalValuesDF.repartition(3))
dropTableFeature(
expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE,
expectedNumFilesRewritten = 2,
expectedColumnTypes = Map("value" -> testCase.toType)
)
checkAnswer(readDeltaTable(tempPath), testCase.expectedResult)
}

test("drop feature after a type change with schema evolution") {
setupManualClock()
sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA")
Expand Down Expand Up @@ -1448,7 +1464,7 @@ trait DeltaTypeWideningTableFeatureTests {

dropTableFeature(
expectedOutcome = ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE,
expectedNumFilesRewritten = 3,
expectedNumFilesRewritten = 2,
expectedColumnTypes = Map("a" -> IntegerType)
)
checkAnswer(readDeltaTable(tempPath),
Expand Down

0 comments on commit 8fe1968

Please sign in to comment.