Skip to content

Commit

Permalink
incrementally compute allfiles
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvarya-db committed Nov 23, 2024
1 parent ec0ab0d commit 1b47d4d
Show file tree
Hide file tree
Showing 9 changed files with 897 additions and 22 deletions.
11 changes: 11 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,17 @@ object Checkpoints
if (spark.conf.get(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED)) {
snapshot.validateChecksum(Map("context" -> "writeCheckpoint"))
}
// Verify allFiles in checksum during checkpoint if we are not doing so already on every
// commit.
val allFilesInCRCEnabled = Snapshot.allFilesInCrcWritePathEnabled(spark, snapshot)
val shouldVerifyAllFilesInCRCEveryCommit =
Snapshot.allFilesInCrcVerificationEnabled(spark, snapshot)
if (allFilesInCRCEnabled && !shouldVerifyAllFilesInCRCEveryCommit) {
snapshot.checksumOpt.foreach { checksum =>
snapshot.validateFileListAgainstCRC(
checksum, contextOpt = Some("triggeredFromCheckpoint"))
}
}

val hadoopConf = deltaLog.newDeltaHadoopConf()

Expand Down
210 changes: 205 additions & 5 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta

import java.io.FileNotFoundException
import java.nio.charset.StandardCharsets.UTF_8
import java.util.TimeZone

// scalastyle:off import.ordering.noEmptyLine
import scala.collection.immutable.ListMap
Expand Down Expand Up @@ -135,6 +136,7 @@ trait RecordChecksum extends DeltaLogging {
* `versionToCompute - 1` or a snapshot. Note that the snapshot may
* belong to any version and this method will only use the snapshot if
* it corresponds to `versionToCompute - 1`.
* @param includeAddFilesInCrc True if the new checksum should include a [[AddFile]]s.
* @return Either the new checksum or an error code string if the checksum could not be computed.
*/
// scalastyle:off argcount
Expand All @@ -147,7 +149,8 @@ trait RecordChecksum extends DeltaLogging {
protocol: Protocol,
operationName: String,
txnIdOpt: Option[String],
previousVersionState: Either[Snapshot, VersionChecksum]
previousVersionState: Either[Snapshot, VersionChecksum],
includeAddFilesInCrc: Boolean
): Either[String, VersionChecksum] = {
// scalastyle:on argcount
if (!deltaLog.incrementalCommitEnabled) {
Expand Down Expand Up @@ -207,7 +210,8 @@ trait RecordChecksum extends DeltaLogging {
oldVersionChecksum,
oldSnapshot,
actions,
ignoreAddFilesInOperation
ignoreAddFilesInOperation,
includeAddFilesInCrc
)
}

Expand All @@ -234,7 +238,8 @@ trait RecordChecksum extends DeltaLogging {
oldVersionChecksum: VersionChecksum,
oldSnapshot: Option[Snapshot],
actions: Seq[Action],
ignoreAddFiles: Boolean
ignoreAddFiles: Boolean,
includeAllFilesInCRC: Boolean
) : Either[String, VersionChecksum] = {
// scalastyle:on argcount
oldSnapshot.foreach(s => require(s.version == (attemptVersion - 1)))
Expand Down Expand Up @@ -281,6 +286,75 @@ trait RecordChecksum extends DeltaLogging {
val domainMetadata = incrementallyComputeDomainMetadatas(
oldSnapshot, oldVersionChecksum, attemptVersion, actions)

val computeAddFiles = if (includeAllFilesInCRC) {
incrementallyComputeAddFiles(
oldSnapshot = oldSnapshot,
oldVersionChecksum = oldVersionChecksum,
attemptVersion = attemptVersion,
numFilesAfterCommit = numFiles,
actionsToCommit = actions)
} else if (numFiles == 0) {
// If the table becomes empty after the commit, addFiles should be empty.
Option(Nil)
} else {
None
}

val allFiles = computeAddFiles.filter { files =>
val computedNumFiles = files.size
val computedTableSizeBytes = files.map(_.size).sum
// Validate checksum of Incrementally computed files against the computed checksum from
// incremental commits.
if (computedNumFiles != numFiles || computedTableSizeBytes != tableSizeBytes) {
val filePathsFromPreviousVersion = oldVersionChecksum.allFiles
.orElse {
recordFrameProfile("Delta", "VersionChecksum.computeNewChecksum.allFiles") {
oldSnapshot.map(_.allFiles.collect().toSeq)
}
}
.getOrElse(Seq.empty)
.map(_.path)
val addFilePathsInThisCommit = actions.collect { case af: AddFile => af.path }
val removeFilePathsInThisCommit = actions.collect { case rf: RemoveFile => rf.path }
logWarning(log"Incrementally computed files does not match the incremental checksum " +
log"for commit attempt: ${MDC(DeltaLogKeys.VERSION, attemptVersion)}. " +
log"addFilePathsInThisCommit: [${MDC(DeltaLogKeys.PATHS,
addFilePathsInThisCommit.mkString(","))}], " +
log"removeFilePathsInThisCommit: [${MDC(DeltaLogKeys.PATHS2,
removeFilePathsInThisCommit.mkString(","))}], " +
log"filePathsFromPreviousVersion: [${MDC(DeltaLogKeys.PATHS3,
filePathsFromPreviousVersion.mkString(","))}], " +
log"computedFiles: [${MDC(DeltaLogKeys.PATHS4,
files.map(_.path).mkString(","))}]")
val eventData = Map(
"attemptVersion" -> attemptVersion,
"expectedNumFiles" -> numFiles,
"expectedTableSizeBytes" -> tableSizeBytes,
"computedNumFiles" -> computedNumFiles,
"computedTableSizeBytes" -> computedTableSizeBytes,
"numAddFilePathsInThisCommit" -> addFilePathsInThisCommit.size,
"numRemoveFilePathsInThisCommit" -> removeFilePathsInThisCommit.size,
"numFilesInPreviousVersion" -> filePathsFromPreviousVersion.size,
"operationName" -> operationName,
"addFilePathsInThisCommit" -> JsonUtils.toJson(addFilePathsInThisCommit.take(10)),
"removeFilePathsInThisCommit" -> JsonUtils.toJson(removeFilePathsInThisCommit.take(10)),
"filePathsFromPreviousVersion" -> JsonUtils.toJson(filePathsFromPreviousVersion.take(10)),
"computedFiles" -> JsonUtils.toJson(files.take(10))
)
recordDeltaEvent(
deltaLog,
opType = "delta.allFilesInCrc.checksumMismatch.aggregated",
data = eventData)
if (Utils.isTesting) {
throw new IllegalStateException("Incrementally Computed State failed checksum check" +
s" for commit $attemptVersion [$eventData]")
}
false
} else {
true
}
}

Right(VersionChecksum(
txnId = txnIdOpt,
tableSizeBytes = tableSizeBytes,
Expand All @@ -292,8 +366,8 @@ trait RecordChecksum extends DeltaLogging {
protocol = protocol,
setTransactions = setTransactions,
domainMetadata = domainMetadata,
histogramOpt = None,
allFiles = None
allFiles = allFiles,
histogramOpt = None
))
}

Expand Down Expand Up @@ -390,6 +464,62 @@ trait RecordChecksum extends DeltaLogging {
// "domain metadatas not stored" as [[Some]] vs. [[None]].
Some(logReplay.getDomainMetadatas.toSeq).filter(_.size <= threshold)
}

/**
* Incrementally compute [[Snapshot.allFiles]] for the commit `attemptVersion`.
*
* @param oldSnapshot - snapshot corresponding to `attemptVersion` - 1
* @param oldVersionChecksum - [[VersionChecksum]] corresponding to `attemptVersion` - 1
* @param attemptVersion - version which we want to commit
* @param numFilesAfterCommit - number of files in the table after the attemptVersion commit.
* @param actionsToCommit - actions for commit `attemptVersion`
* @return Optional sequence of AddFiles which represents the incrementally computed state for
* commit `attemptVersion`
*/
private def incrementallyComputeAddFiles(
oldSnapshot: Option[Snapshot],
oldVersionChecksum: VersionChecksum,
attemptVersion: Long,
numFilesAfterCommit: Long,
actionsToCommit: Seq[Action]): Option[Seq[AddFile]] = {

// We must enumerate both the pre- and post-commit file lists; give up if they are too big.
val incrementalAllFilesThreshold =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES)
val numFilesBeforeCommit = oldVersionChecksum.numFiles
if (Math.max(numFilesAfterCommit, numFilesBeforeCommit) > incrementalAllFilesThreshold) {
return None
}

// We try to get files for `attemptVersion - 1` from the old CRC first. If the old CRC doesn't
// have those files, then we will try to get that info from the oldSnapshot (corresponding to
// attemptVersion - 1). Note that oldSnapshot might not be present if another concurrent commits
// have happened in between. In this case we return and not store incrementally computed state
// to crc.
val oldAllFiles = oldVersionChecksum.allFiles
.orElse {
recordFrameProfile("Delta", "VersionChecksum.incrementallyComputeAddFiles") {
oldSnapshot.map(_.allFiles.collect().toSeq)
}
}
.getOrElse { return None }

val canonicalPath = new DeltaLog.CanonicalPathFunction(() => deltaLog.newDeltaHadoopConf())
def normalizePath(action: Action): Action = action match {
case af: AddFile => af.copy(path = canonicalPath(af.path))
case rf: RemoveFile => rf.copy(path = canonicalPath(rf.path))
case others => others
}

// We only work with AddFile, so RemoveFile and SetTransaction retention don't matter.
val logReplay = new InMemoryLogReplay(
minFileRetentionTimestamp = 0,
minSetTransactionRetentionTimestamp = None)

logReplay.append(attemptVersion - 1, oldAllFiles.map(normalizePath).toIterator)
logReplay.append(attemptVersion, actionsToCommit.map(normalizePath).toIterator)
Some(logReplay.allFiles)
}
}

object RecordChecksum {
Expand Down Expand Up @@ -523,6 +653,76 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot =>
false
}

/**
* Validate [[Snapshot.allFiles]] against given checksum.allFiles.
* Returns true if validation succeeds, else return false.
* In Unit Tests, this method throws [[IllegalStateException]] so that issues can be caught during
* development.
*/
def validateFileListAgainstCRC(checksum: VersionChecksum, contextOpt: Option[String]): Boolean = {
val fileSortKey = (f: AddFile) => (f.path, f.modificationTime, f.size)
val filesFromCrc = checksum.allFiles.map(_.sortBy(fileSortKey)).getOrElse { return true }
val filesFromStateReconstruction = recordFrameProfile("Delta", "snapshot.allFiles") {
allFilesViaStateReconstruction.collect().toSeq.sortBy(fileSortKey)
}
if (filesFromCrc == filesFromStateReconstruction) return true

val filesFromCrcWithoutStats = filesFromCrc.map(_.copy(stats = ""))
val filesFromStateReconstructionWithoutStats =
filesFromStateReconstruction.map(_.copy(stats = ""))
val mismatchWithStatsOnly =
filesFromCrcWithoutStats == filesFromStateReconstructionWithoutStats

if (mismatchWithStatsOnly) {
// Normalize stats in CRC as per the table schema
val filesFromStateReconstructionMap =
filesFromStateReconstruction.map(af => (af.path, af)).toMap
val parser = DeltaFileProviderUtils.createJsonStatsParser(statsSchema)
var normalizedStatsDiffer = false
filesFromCrc.foreach { addFile =>
val statsFromSR = filesFromStateReconstructionMap(addFile.path).stats
val statsFromSRParsed = parser(statsFromSR)
val statsFromCrcParsed = parser(addFile.stats)
if (statsFromSRParsed != statsFromCrcParsed) {
normalizedStatsDiffer = true
}
}
if (!normalizedStatsDiffer) return true
}
// If incremental all-files-in-crc validation fails, then there is a possibility that the
// issue is not just with incremental all-files-in-crc computation but with overall incremental
// commits. So run the incremental commit crc validation and find out whether that is also
// failing.
val contextForIncrementalCommitCheck = contextOpt.map(c => s"$c.").getOrElse("") +
"delta.allFilesInCrc.checksumMismatch.validateFileListAgainstCRC"
var errorForIncrementalCommitCrcValidation = ""
val incrementalCommitCrcValidationPassed = try {
validateChecksum(Map("context" -> contextForIncrementalCommitCheck))
} catch {
case NonFatal(e) =>
errorForIncrementalCommitCrcValidation += e.getMessage
false
}
val eventData = Map(
"version" -> version,
"mismatchWithStatsOnly" -> mismatchWithStatsOnly,
"filesCountFromCrc" -> filesFromCrc.size,
"filesCountFromStateReconstruction" -> filesFromStateReconstruction.size,
"filesFromCrc" -> JsonUtils.toJson(filesFromCrc),
"incrementalCommitCrcValidationPassed" -> incrementalCommitCrcValidationPassed,
"errorForIncrementalCommitCrcValidation" -> errorForIncrementalCommitCrcValidation,
"context" -> contextOpt.getOrElse("")
)
val message = s"Incremental state reconstruction validation failed for version " +
s"$version [${eventData.mkString(",")}]"
logInfo(message)
recordDeltaEvent(
this.deltaLog,
opType = "delta.allFilesInCrc.checksumMismatch.differentAllFiles",
data = eventData)
if (Utils.isTesting) throw new IllegalStateException(message)
false
}
/**
* Validates the given `checksum` against [[Snapshot.computedState]].
* Returns an tuple of Maps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2535,6 +2535,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
protected def incrementallyDeriveChecksum(
attemptVersion: Long,
currentTransactionInfo: CurrentTransactionInfo): Option[VersionChecksum] = {
// Don't include [[AddFile]]s in CRC if this commit is modifying the schema of table in some
// way. This is to make sure we don't carry any DROPPED column from previous CRC to this CRC
// forever and can start fresh from next commit.
// If the oldSnapshot itself is missing, we don't incrementally compute the checksum.
val allFilesInCrcWritePathEnabled =
Snapshot.allFilesInCrcWritePathEnabled(spark, snapshot) &&
(snapshot.version == -1 || snapshot.metadata.schema == metadata.schema)

incrementallyDeriveChecksum(
spark,
Expand All @@ -2545,7 +2552,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
protocol = currentTransactionInfo.protocol,
operationName = currentTransactionInfo.op.name,
txnIdOpt = Some(currentTransactionInfo.txnId),
previousVersionState = scala.Left(snapshot)
previousVersionState = scala.Left(snapshot),
includeAddFilesInCrc = allFilesInCrcWritePathEnabled
).toOption
}

Expand Down
Loading

0 comments on commit 1b47d4d

Please sign in to comment.