Skip to content

Commit

Permalink
add checks
Browse files Browse the repository at this point in the history
  • Loading branch information
huan233usc committed Feb 19, 2025
1 parent 6b81868 commit daa03fd
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.ColumnarBatch;
Expand Down Expand Up @@ -87,12 +88,14 @@ public CRCInfo(
long tableSizeBytes,
long numFiles,
Optional<String> txnId) {
checkArgument(tableSizeBytes >= 0);
checkArgument(numFiles >=0 );
this.version = version;
this.metadata = requireNonNull(metadata);
this.protocol = requireNonNull(protocol);
this.tableSizeBytes = tableSizeBytes;
this.numFiles = numFiles;
this.txnId = txnId;
this.txnId = requireNonNull(txnId);
}

/** The version of the Delta table that this CRCInfo represents. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public ChecksumWriter(Path logPath) {

/** Writes a checksum file */
public void writeCheckSum(Engine engine, CRCInfo crcInfo) throws IOException {
checkArgument(crcInfo.getNumFiles() >= 0 && crcInfo.getTableSizeBytes() >= 0);
Path newChecksumPath = FileNames.checksumFile(logPath, crcInfo.getVersion());
logger.info("Writing checksum file to path: {}", newChecksumPath);
wrapEngineExceptionThrowsIO(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,17 @@ class ChecksumSimpleComparisonSuite extends DeltaTableWriteSuiteBase with TestUt
}

/**
* Insert into partition spark table, copy the commit log to kernel table
* and verify the checksum files for are consistent between spark and kernel
* Insert into unpartitioned spark table, read the added file from the commit log,
* commit them to kernel table and verify the checksum files are consistent
* between spark and kernel
* */
private def insertIntoPartitionedTableAndCheckCrc(
private def insertIntoUnpartitionedTableAndCheckCrc(
engine: Engine,
sparkTablePath: String,
kernelTablePath: String,
versionAtCommit: Long): Unit = {
var valueToAppend = "(0, 0)"
var addedPartition = Set(0)
(0L to versionAtCommit).foreach(i => {
val partitionValue = 2 * i
addedPartition = addedPartition + partitionValue.toInt
valueToAppend = valueToAppend + s",($i, $partitionValue)"
})
var valueToAppend = "(0)"
(0L to versionAtCommit).foreach(i => valueToAppend = valueToAppend + s",($i)")
spark.sql(
s"INSERT INTO delta.`$sparkTablePath` values $valueToAppend"
)
Expand All @@ -124,27 +120,27 @@ class ChecksumSimpleComparisonSuite extends DeltaTableWriteSuiteBase with TestUt
.createTransactionBuilder(engine, "test-engine", Operation.WRITE)
.build(engine)

convertSparkDeltaLogToKernelCommit(
txn,
engine,
sparkTablePath,
versionAtCommit,
Some(addedPartition)
)
convertSparkDeltaLogToKernelCommit(txn, engine, sparkTablePath, versionAtCommit)
assertChecksumEquals(engine, sparkTablePath, kernelTablePath, versionAtCommit)
}

/**
* Insert into unpartitioned spark table, copy the commit log to kernel table
* and verify the checksum files for are consistent between spark and kernel
* Insert into partitioned spark table, read the added file from the commit log,
* commit them to kernel table and verify the checksum files are consistent
* between spark and kernel
* */
private def insertIntoUnpartitionedTableAndCheckCrc(
private def insertIntoPartitionedTableAndCheckCrc(
engine: Engine,
sparkTablePath: String,
kernelTablePath: String,
versionAtCommit: Long): Unit = {
var valueToAppend = "(0)"
(0L to versionAtCommit).foreach(i => valueToAppend = valueToAppend + s",($i)")
var valueToAppend = "(0, 0)"
var addedPartition = Set(0)
(0L to versionAtCommit).foreach(i => {
val partitionValue = 2 * i
addedPartition = addedPartition + partitionValue.toInt
valueToAppend = valueToAppend + s",($i, $partitionValue)"
})
spark.sql(
s"INSERT INTO delta.`$sparkTablePath` values $valueToAppend"
)
Expand All @@ -154,7 +150,13 @@ class ChecksumSimpleComparisonSuite extends DeltaTableWriteSuiteBase with TestUt
.createTransactionBuilder(engine, "test-engine", Operation.WRITE)
.build(engine)

convertSparkDeltaLogToKernelCommit(txn, engine, sparkTablePath, versionAtCommit)
convertSparkDeltaLogToKernelCommit(
txn,
engine,
sparkTablePath,
versionAtCommit,
Some(addedPartition)
)
assertChecksumEquals(engine, sparkTablePath, kernelTablePath, versionAtCommit)
}

Expand Down Expand Up @@ -272,49 +274,36 @@ class ChecksumSimpleComparisonSuite extends DeltaTableWriteSuiteBase with TestUt
)

while (columnarBatches.hasNext) {
processColumnarBatch(columnarBatches.next(), partition, addFiles)
collectAddFilesFromLogRows(columnarBatches.next(), partition, addFiles)
}

toCloseableIterator(addFiles.iterator())
}

private def processColumnarBatch(
batch: ColumnarBatch,
private def collectAddFilesFromLogRows(
logFileRows: ColumnarBatch,
partition: Option[String],
addFiles: util.ArrayList[DataFileStatus]): Unit = {
val rows = batch.getRows
val rows = logFileRows.getRows
while (rows.hasNext) {
val row = rows.next()
val addIndex = row.getSchema.indexOf("add")

if (!row.isNullAt(addIndex)) {
processAddFile(row.getStruct(addIndex), partition, addFiles)
val addFile = new AddFile(row.getStruct(addIndex))
if (partition.isEmpty ||
partition.get == VectorUtils
.toJavaMap(addFile.getPartitionValues)
.get(PARTITION_COLUMN)) {
addFiles.add(
new DataFileStatus(
addFile.getPath,
addFile.getSize,
addFile.getModificationTime,
Optional.empty() // TODO: populate stats once #4139 is fixed
)
)
}
}
}
}

private def processAddFile(
addFileRow: Row,
partition: Option[String],
addFiles: util.ArrayList[DataFileStatus]): Unit = {

val addFile = new AddFile(addFileRow)

if (shouldIncludeFile(addFile, partition)) {
addFiles.add(
new DataFileStatus(
addFile.getPath,
addFile.getSize,
addFile.getModificationTime,
Optional.empty() // TODO: populate stats once #4139 is fixed
)
)
}
}

private def shouldIncludeFile(addFile: AddFile, partition: Option[String]): Boolean = {
partition.isEmpty ||
partition.get == VectorUtils.toJavaMap(addFile.getPartitionValues).get(PARTITION_COLUMN)
}

}

0 comments on commit daa03fd

Please sign in to comment.