Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
huan233usc committed Feb 20, 2025
1 parent a6be52b commit eb1bca1
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
protected def verifyWrittenContent(
path: String,
expSchema: StructType,
expData: Seq[TestRow],
expPartitionColumns: Seq[String] = Seq(),
version: Option[Long] = Option.empty): Unit = {
expData: Seq[TestRow]): Unit = {
val actSchema = tableSchema(path)
assert(actSchema === expSchema)

Expand All @@ -396,12 +394,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
// Spark reads the timestamp partition columns in local timezone vs. Kernel reads in UTC. We
// need to set the timezone to UTC before reading the data using Spark to make the tests pass
withSparkTimeZone("UTC") {
val resultSpark = spark
.sql(s"SELECT * FROM delta.`$path`" + {
if (version.isDefined) s" VERSION AS OF ${version.get}" else ""
})
.collect()
.map(TestRow(_))
val resultSpark = spark.sql(s"SELECT * FROM delta.`$path`").collect().map(TestRow(_))
checkAnswer(resultSpark, expData)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ package io.delta.kernel.defaults
import java.io.File
import java.nio.file.Files
import java.util.{Locale, Optional}

import scala.collection.immutable.Seq
import scala.jdk.CollectionConverters.{asScalaBufferConverter, asScalaSetConverter}
import scala.language.implicitConversions

import io.delta.kernel.{Transaction, TransactionCommitResult}
import io.delta.kernel.data.Row
import io.delta.kernel.defaults.utils.TestRow
import io.delta.kernel.engine.Engine
import io.delta.kernel.hook.PostCommitHook.PostCommitHookType
import io.delta.kernel.internal.{SnapshotImpl, TransactionImpl}
import io.delta.kernel.internal.actions.Metadata
import io.delta.kernel.internal.checksum.CRCInfo
import io.delta.kernel.internal.checksum.{ChecksumReader, CRCInfo}
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.util.Utils.singletonCloseableIterator
import io.delta.kernel.types.StructType
import io.delta.kernel.utils.{CloseableIterable, FileStatus}
Expand All @@ -49,67 +52,20 @@ class DeltaTableWriteWithCrcSuite extends DeltaTableWritesSuite {
override def verifyWrittenContent(
path: String,
expSchema: StructType,
expData: Seq[TestRow],
expPartitionColumns: Seq[String] = Seq(),
version: Option[Long] = Option.empty): Unit = {
val actSchema = tableSchema(path)
assert(actSchema === expSchema)

// verify data using Kernel reader
checkTable(path, expData)

// verify data using Spark reader.
// Spark reads the timestamp partition columns in local timezone vs. Kernel reads in UTC. We
// need to set the timezone to UTC before reading the data using Spark to make the tests pass
withSparkTimeZone("UTC") {
val resultSpark = spark
.sql(s"SELECT * FROM delta.`$path`" + {
if (version.isDefined) s" VERSION AS OF ${version.get}" else ""
})
.collect()
.map(TestRow(_))
checkAnswer(resultSpark, expData)
}

checkChecksumContent(path, version, expSchema, expPartitionColumns)
expData: Seq[TestRow]): Unit = {
super.verifyWrittenContent(path, expSchema, expData)
checkChecksum(path, expSchema)
}

def checkChecksumContent(
def checkChecksum(
tablePath: String,
version: Option[Long],
expSchema: StructType,
expPartitionColumns: Seq[String]): Unit = {
val checksumVersion = version.getOrElse(latestSnapshot(tablePath, defaultEngine).getVersion)
val checksumFile = new File(f"$tablePath/_delta_log/$checksumVersion%020d.crc")

assert(Files.exists(checksumFile.toPath), s"Checksum file not found: ${checksumFile.getPath}")

val columnarBatches = defaultEngine
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(FileStatus.of(checksumFile.getPath)),
CRCInfo.CRC_FILE_SCHEMA,
Optional.empty())

assert(columnarBatches.hasNext, "Empty checksum file")
val crcRow = columnarBatches.next()
assert(crcRow.getSize === 1, s"Expected single row, found ${crcRow.getSize}")

val metadata = Metadata.fromColumnVector(
crcRow.getColumnVector(CRCInfo.CRC_FILE_SCHEMA.indexOf("metadata")),
/* rowId= */ 0)

assert(
metadata.getSchema === expSchema,
s"Schema mismatch.\nExpected: $expSchema\nActual: ${metadata.getSchema}")

val normalizedPartitions = expPartitionColumns.map(_.toLowerCase(Locale.ROOT)).toSet
assert(
metadata.getPartitionColNames.asScala === normalizedPartitions,
s"Partition columns mismatch.\n" +
s"Expected: $normalizedPartitions\n" +
s"Actual: ${metadata.getPartitionColNames.asScala}")

assert(!columnarBatches.hasNext, "Unexpected additional data in checksum file")
expSchema: StructType): Unit = {
val checksumVersion = latestSnapshot(tablePath, defaultEngine).getVersion
val crcInfo = ChecksumReader.getCRCInfo(
defaultEngine,
new Path(f"$tablePath/_delta_log/"),
checksumVersion,
checksumVersion)
assert(crcInfo.isPresent)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false)

verifyCommitInfo(tablePath, version = 0, Seq("Part1", "part2"))
verifyWrittenContent(tablePath, schema, Seq.empty, Seq("Part1", "part2"))
verifyWrittenContent(tablePath, schema, Seq.empty)
}
}

Expand Down Expand Up @@ -463,7 +463,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa

verifyCommitResult(commitResult0, expVersion = 0, expIsReadyForCheckpoint = false)
verifyCommitInfo(tblPath, version = 0, testPartitionColumns, operation = WRITE)
verifyWrittenContent(tblPath, testPartitionSchema, expData, testPartitionColumns)
verifyWrittenContent(tblPath, testPartitionSchema, expData)
}
}

Expand All @@ -485,7 +485,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa

verifyCommitResult(commitResult0, expVersion = 0, expIsReadyForCheckpoint = false)
verifyCommitInfo(tblPath, version = 0, partitionCols, operation = WRITE)
verifyWrittenContent(tblPath, testPartitionSchema, expData, partitionCols)
verifyWrittenContent(tblPath, testPartitionSchema, expData)
}
{
val commitResult1 = appendData(
Expand All @@ -498,7 +498,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa

verifyCommitResult(commitResult1, expVersion = 1, expIsReadyForCheckpoint = false)
verifyCommitInfo(tblPath, version = 1, partitionCols = null, operation = WRITE)
verifyWrittenContent(tblPath, testPartitionSchema, expData, partitionCols)
verifyWrittenContent(tblPath, testPartitionSchema, expData)
}
}
}
Expand Down Expand Up @@ -564,8 +564,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
verifyWrittenContent(
tblPath,
schema,
if (i == 0) expV0Data else expV0Data ++ expV1Data,
partCols)
if (i == 0) expV0Data else expV0Data ++ expV1Data)
}
}
}
Expand Down Expand Up @@ -620,7 +619,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
// delete all commit files before version 30 in both cases and expect the read to pass as
// there is a checkpoint at version 30 and should be used for state reconstruction.
deleteDeltaFilesBefore(tblPath, beforeVersion = 30)
verifyWrittenContent(tblPath, schema, expData, partCols)
verifyWrittenContent(tblPath, schema, expData)
}
}
}
Expand Down Expand Up @@ -742,7 +741,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa

verifyCommitResult(commitResult, expVersion = i, i % checkpointInterval == 0)
verifyCommitInfo(tblPath, version = i, partitionCols = null, operation = WRITE)
verifyWrittenContent(tblPath, schema, expData, partCols)
verifyWrittenContent(tblPath, schema, expData)
}

assertCheckpointExists(tblPath, atVersion = checkpointInterval)
Expand Down

0 comments on commit eb1bca1

Please sign in to comment.