Skip to content

Commit

Permalink
[Kernel]Add simple crc post commit for incremental crc writing. (#4134)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
This PR introduces a new post commit hook - ChecksumSimple, for writing
CRC file after txn commit.
CRC file will only be written only commit version - 1's snapshot reads
CRC during state construction

Other case will be handled in a separate PR

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

E2e test
## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
huan233usc authored Feb 20, 2025
1 parent 8045052 commit 7e85686
Show file tree
Hide file tree
Showing 15 changed files with 799 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ enum PostCommitHookType {
* perform this operation, reading previous checkpoint + logs is required to construct a new
* checkpoint, with latency scaling based on log size (typically seconds to minutes).
*/
CHECKPOINT
CHECKPOINT,
/**
* Writes a checksum file at the version committed by the transaction. This hook is present when
* all required table statistics (e.g. table size) for checksum file are known when a
* transaction commits. This operation has a minimal latency with no requirement of reading
* previous checkpoint or logs.
*/
CHECKSUM_SIMPLE
}

/** Invokes the post commit operation whose implementation must be thread safe. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.replay.CRCInfo;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@
import io.delta.kernel.expressions.Column;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.hook.CheckpointHook;
import io.delta.kernel.internal.hook.ChecksumSimpleHook;
import io.delta.kernel.internal.metrics.TransactionMetrics;
import io.delta.kernel.internal.metrics.TransactionReportImpl;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.metrics.TransactionMetricsResult;
import io.delta.kernel.metrics.TransactionReport;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
Expand Down Expand Up @@ -342,6 +345,7 @@ private TransactionCommitResult doCommit(
dataAndMetadataActions.map(
action -> {
transactionMetrics.totalActionsCounter.increment();
// TODO: handle RemoveFiles.
if (!action.isNullAt(ADD_FILE_ORDINAL)) {
transactionMetrics.addFilesCounter.increment();
transactionMetrics.addFilesSizeInBytesCounter.increment(
Expand All @@ -362,6 +366,10 @@ private TransactionCommitResult doCommit(
postCommitHooks.add(new CheckpointHook(dataPath, commitAsVersion));
}

buildPostCommitCrcInfoIfCurrentCrcAvailable(
commitAsVersion, transactionMetrics.captureTransactionMetricsResult())
.ifPresent(crcInfo -> postCommitHooks.add(new ChecksumSimpleHook(crcInfo, logPath)));

return new TransactionCommitResult(commitAsVersion, postCommitHooks);
} catch (FileAlreadyExistsException e) {
throw e;
Expand Down Expand Up @@ -437,6 +445,36 @@ private void recordTransactionReport(
engine.getMetricsReporters().forEach(reporter -> reporter.report(transactionReport));
}

private Optional<CRCInfo> buildPostCommitCrcInfoIfCurrentCrcAvailable(
long commitAtVersion, TransactionMetricsResult metricsResult) {
if (isNewTable) {
return Optional.of(
new CRCInfo(
commitAtVersion,
metadata,
protocol,
metricsResult.getTotalAddFilesSizeInBytes(),
metricsResult.getNumAddFiles(),
Optional.of(txnId.toString())));
}

return readSnapshot
.getCurrentCrcInfo()
// in the case of a conflicting txn and successful retry the readSnapshot may not be
// commitVersion - 1
.filter(lastCrcInfo -> commitAtVersion == lastCrcInfo.getVersion() + 1)
.map(
lastCrcInfo ->
new CRCInfo(
commitAtVersion,
metadata,
protocol,
// TODO: handle RemoveFiles for calculating table size and num of files.
lastCrcInfo.getTableSizeBytes() + metricsResult.getTotalAddFilesSizeInBytes(),
lastCrcInfo.getNumFiles() + metricsResult.getNumAddFiles(),
Optional.of(txnId.toString())));
}

/**
* Get the part of the schema of the table that needs the statistics to be collected per file.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.util.InternalUtils;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CRCInfo {
private static final Logger logger = LoggerFactory.getLogger(CRCInfo.class);

// Constants for schema field names
private static final String TABLE_SIZE_BYTES = "tableSizeBytes";
private static final String NUM_FILES = "numFiles";
private static final String NUM_METADATA = "numMetadata";
private static final String NUM_PROTOCOL = "numProtocol";
private static final String METADATA = "metadata";
private static final String PROTOCOL = "protocol";
private static final String TXN_ID = "txnId";

public static final StructType CRC_FILE_SCHEMA =
new StructType()
.add(TABLE_SIZE_BYTES, LongType.LONG)
.add(NUM_FILES, LongType.LONG)
.add(NUM_METADATA, LongType.LONG)
.add(NUM_PROTOCOL, LongType.LONG)
.add(METADATA, Metadata.FULL_SCHEMA)
.add(PROTOCOL, Protocol.FULL_SCHEMA)
.add(TXN_ID, StringType.STRING, /*nullable*/ true);

public static Optional<CRCInfo> fromColumnarBatch(
long version, ColumnarBatch batch, int rowId, String crcFilePath) {
// Read required fields.
Protocol protocol =
Protocol.fromColumnVector(batch.getColumnVector(getSchemaIndex(PROTOCOL)), rowId);
Metadata metadata =
Metadata.fromColumnVector(batch.getColumnVector(getSchemaIndex(METADATA)), rowId);
long tableSizeBytes =
InternalUtils.requireNonNull(
batch.getColumnVector(getSchemaIndex(TABLE_SIZE_BYTES)), rowId, TABLE_SIZE_BYTES)
.getLong(rowId);
long numFiles =
InternalUtils.requireNonNull(
batch.getColumnVector(getSchemaIndex(NUM_FILES)), rowId, NUM_FILES)
.getLong(rowId);

// Read optional fields
ColumnVector txnIdColumnVector = batch.getColumnVector(getSchemaIndex(TXN_ID));
Optional<String> txnId =
txnIdColumnVector.isNullAt(rowId)
? Optional.empty()
: Optional.of(txnIdColumnVector.getString(rowId));

// protocol and metadata are nullable per fromColumnVector's implementation.
if (protocol == null || metadata == null) {
logger.warn("Invalid checksum file missing protocol and/or metadata: {}", crcFilePath);
return Optional.empty();
}
return Optional.of(new CRCInfo(version, metadata, protocol, tableSizeBytes, numFiles, txnId));
}

private final long version;
private final Metadata metadata;
private final Protocol protocol;
private final long tableSizeBytes;
private final long numFiles;
private final Optional<String> txnId;

public CRCInfo(
long version,
Metadata metadata,
Protocol protocol,
long tableSizeBytes,
long numFiles,
Optional<String> txnId) {
checkArgument(tableSizeBytes >= 0);
checkArgument(numFiles >= 0);
this.version = version;
this.metadata = requireNonNull(metadata);
this.protocol = requireNonNull(protocol);
this.tableSizeBytes = tableSizeBytes;
this.numFiles = numFiles;
this.txnId = requireNonNull(txnId);
}

/** The version of the Delta table that this CRCInfo represents. */
public long getVersion() {
return version;
}

/** The {@link Metadata} stored in this CRCInfo. */
public Metadata getMetadata() {
return metadata;
}

/** The {@link Protocol} stored in this CRCInfo. */
public Protocol getProtocol() {
return protocol;
}

public long getNumFiles() {
return numFiles;
}

public long getTableSizeBytes() {
return tableSizeBytes;
}

public Optional<String> getTxnId() {
return txnId;
}

/**
* Encode as a {@link Row} object with the schema {@link CRCInfo#CRC_FILE_SCHEMA}.
*
* @return {@link Row} object with the schema {@link CRCInfo#CRC_FILE_SCHEMA}
*/
public Row toRow() {
Map<Integer, Object> values = new HashMap<>();
// Add required fields
values.put(getSchemaIndex(TABLE_SIZE_BYTES), tableSizeBytes);
values.put(getSchemaIndex(NUM_FILES), numFiles);
values.put(getSchemaIndex(NUM_METADATA), 1L);
values.put(getSchemaIndex(NUM_PROTOCOL), 1L);
values.put(getSchemaIndex(METADATA), metadata.toRow());
values.put(getSchemaIndex(PROTOCOL), protocol.toRow());

// Add optional fields
txnId.ifPresent(txn -> values.put(getSchemaIndex(TXN_ID), txn));
return new GenericRow(CRC_FILE_SCHEMA, values);
}

private static int getSchemaIndex(String fieldName) {
return CRC_FILE_SCHEMA.indexOf(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.replay;
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.util.FileNames.*;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
Expand Down Expand Up @@ -96,7 +96,7 @@ private static Optional<CRCInfo> readChecksumFile(Engine engine, Path filePath)
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(FileStatus.of(filePath.toString())),
CRCInfo.FULL_SCHEMA,
CRCInfo.CRC_FILE_SCHEMA,
Optional.empty())) {
// We do this instead of iterating through the rows or using `getSingularRow` so we
// can use the existing fromColumnVector methods in Protocol, Metadata, Format etc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Writers for writing checksum files from a snapshot */
public class ChecksumWriter {

private static final Logger logger = LoggerFactory.getLogger(ChecksumWriter.class);

private final Path logPath;

public ChecksumWriter(Path logPath) {
this.logPath = requireNonNull(logPath);
}

/** Writes a checksum file */
public void writeCheckSum(Engine engine, CRCInfo crcInfo) throws IOException {
Path newChecksumPath = FileNames.checksumFile(logPath, crcInfo.getVersion());
logger.info("Writing checksum file to path: {}", newChecksumPath);
wrapEngineExceptionThrowsIO(
() -> {
engine
.getJsonHandler()
.writeJsonFileAtomically(
newChecksumPath.toString(),
singletonCloseableIterator(crcInfo.toRow()),
false /* overwrite */);
logger.info("Write checksum file `{}` succeeds", newChecksumPath);
return null;
},
"Write checksum file `%s`",
newChecksumPath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.hook;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.checksum.ChecksumWriter;
import io.delta.kernel.internal.fs.Path;
import java.io.IOException;

/**
* A post-commit hook that writes a new checksum file at the version committed by the transaction.
* This hook performs a simple checksum operation without requiring previous checkpoint or log
* reading.
*/
public class ChecksumSimpleHook implements PostCommitHook {

private final CRCInfo crcInfo;
private final Path logPath;

public ChecksumSimpleHook(CRCInfo crcInfo, Path logPath) {
this.crcInfo = requireNonNull(crcInfo);
this.logPath = requireNonNull(logPath);
}

@Override
public void threadSafeInvoke(Engine engine) throws IOException {
checkArgument(engine != null);
new ChecksumWriter(logPath).writeCheckSum(engine, crcInfo);
}

@Override
public PostCommitHookType getType() {
return PostCommitHookType.CHECKSUM_SIMPLE;
}
}
Loading

0 comments on commit 7e85686

Please sign in to comment.