Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel]Add simple crc post commit for incremental crc writing. #4134

Merged
merged 78 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from 73 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
1c05213
basic crc simple writer
huan233usc Feb 7, 2025
ca06a90
first test
huan233usc Feb 7, 2025
3caed70
add tests
huan233usc Feb 7, 2025
3b7dc2c
add tests
huan233usc Feb 10, 2025
2f58dfa
fix scala format
huan233usc Feb 10, 2025
e7f4beb
add unit test
huan233usc Feb 10, 2025
52bda11
add doc
huan233usc Feb 10, 2025
d1626dc
format java
huan233usc Feb 10, 2025
2a1092d
format scala
huan233usc Feb 10, 2025
6c8069f
fix test
huan233usc Feb 10, 2025
471e172
fix test
huan233usc Feb 10, 2025
8c3c33b
refactor 1
huan233usc Feb 10, 2025
5d0b5a7
refactor 2
huan233usc Feb 10, 2025
6796fa5
remove redundant doc
huan233usc Feb 10, 2025
2a93c49
fix doc
huan233usc Feb 10, 2025
826f623
fix scala
huan233usc Feb 10, 2025
ebaf2c8
resolve comment
huan233usc Feb 12, 2025
000a36d
fix indent
huan233usc Feb 12, 2025
0c425be
fix test
huan233usc Feb 12, 2025
e972c31
fix indent
huan233usc Feb 12, 2025
73df627
fix indent
huan233usc Feb 12, 2025
1df8af5
rename method
huan233usc Feb 12, 2025
a3ba086
fix indent
huan233usc Feb 12, 2025
ee72c53
fix build
huan233usc Feb 12, 2025
c8cf5b1
fix test
huan233usc Feb 12, 2025
d922524
fix test naming
huan233usc Feb 14, 2025
d2fc815
add doc to tests, simply the test code
huan233usc Feb 14, 2025
d1504f0
rename params
huan233usc Feb 14, 2025
78477d1
fix comment
huan233usc Feb 14, 2025
7f7e9e7
fix comment
huan233usc Feb 14, 2025
bbd9ea3
add doc
huan233usc Feb 14, 2025
76a5c38
move to row to crc
huan233usc Feb 18, 2025
b723b7d
Merge branch 'master' into crc_simple
huan233usc Feb 18, 2025
f85c845
fix java
huan233usc Feb 18, 2025
97f36fc
fix indent
huan233usc Feb 18, 2025
56b2e7e
format scala
huan233usc Feb 18, 2025
a06bf98
format scala
huan233usc Feb 18, 2025
1e7828b
share utils code
huan233usc Feb 18, 2025
4cddb67
share utils code
huan233usc Feb 18, 2025
dc4c94c
refactor test
huan233usc Feb 18, 2025
bf6613b
refactor test
huan233usc Feb 18, 2025
6b81868
Merge branch 'master' into crc_simple
huan233usc Feb 18, 2025
daa03fd
add checks
huan233usc Feb 19, 2025
0ac1005
resolve review
huan233usc Feb 19, 2025
63f2f46
Refactor testcase to subclassing
huan233usc Feb 19, 2025
622d28d
revert unnecessary code
huan233usc Feb 19, 2025
13bb0a7
revert unnecessary code 2/n
huan233usc Feb 19, 2025
972109a
revert unnecessary code 3/n
huan233usc Feb 19, 2025
c52c58e
revert unnecessary code 4/n
huan233usc Feb 19, 2025
bba00bd
add null check
huan233usc Feb 19, 2025
93b9aeb
fixing doc
huan233usc Feb 19, 2025
a11badd
fixing doc
huan233usc Feb 19, 2025
cf33624
fixing doc
huan233usc Feb 19, 2025
a6189d7
fixing header
huan233usc Feb 19, 2025
26cd8b8
fixing java
huan233usc Feb 19, 2025
c0178ec
group method
huan233usc Feb 19, 2025
cff4b84
merge head
huan233usc Feb 19, 2025
658f770
revert
huan233usc Feb 19, 2025
6c70ba5
revert
huan233usc Feb 19, 2025
abc2bd9
revert
huan233usc Feb 19, 2025
305997e
revert
huan233usc Feb 19, 2025
e7710cb
typo
huan233usc Feb 19, 2025
0bec898
grouping
huan233usc Feb 19, 2025
d392927
todo
huan233usc Feb 19, 2025
f60b9c6
fix doc
huan233usc Feb 19, 2025
c4ae5e5
fix check
huan233usc Feb 19, 2025
a6be52b
fix test
huan233usc Feb 20, 2025
eb1bca1
fix test
huan233usc Feb 20, 2025
941d9c8
fix test
huan233usc Feb 20, 2025
a14d42e
utils method
huan233usc Feb 20, 2025
c4462a7
fix scala
huan233usc Feb 20, 2025
744beaf
fix test
huan233usc Feb 20, 2025
562cfe4
revert
huan233usc Feb 20, 2025
b639110
add docs
huan233usc Feb 20, 2025
5a34d7e
patch https://github.com/delta-io/delta/commit/aae9d649fd6a9ca4b87c74…
huan233usc Feb 20, 2025
b9b4aac
remove patch file
huan233usc Feb 20, 2025
7900389
Merge branch 'master' into crc_simple
huan233usc Feb 20, 2025
a2e2883
resolve commit
huan233usc Feb 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ enum PostCommitHookType {
* perform this operation, reading previous checkpoint + logs is required to construct a new
* checkpoint, with latency scaling based on log size (typically seconds to minutes).
*/
CHECKPOINT
CHECKPOINT,
/**
* Writes a checksum file at the version committed by the transaction. This hook is present when
* all required table statistics (e.g. table size) for checksum file are known when a
* transaction commits. This operation has a minimal latency with no requirement of reading
* previous checkpoint or logs.
*/
CHECKSUM_SIMPLE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay I find these docs super clear!

}

/** Invokes the post commit operation whose implementation must be thread safe. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.replay.CRCInfo;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@
import io.delta.kernel.expressions.Column;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.hook.CheckpointHook;
import io.delta.kernel.internal.hook.ChecksumSimpleHook;
import io.delta.kernel.internal.metrics.TransactionMetrics;
import io.delta.kernel.internal.metrics.TransactionReportImpl;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.metrics.TransactionMetricsResult;
import io.delta.kernel.metrics.TransactionReport;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
Expand Down Expand Up @@ -342,6 +345,7 @@ private TransactionCommitResult doCommit(
dataAndMetadataActions.map(
action -> {
transactionMetrics.totalActionsCounter.increment();
// TODO: handle RemoveFiles.
if (!action.isNullAt(ADD_FILE_ORDINAL)) {
transactionMetrics.addFilesCounter.increment();
transactionMetrics.addFilesSizeInBytesCounter.increment(
Expand All @@ -362,6 +366,10 @@ private TransactionCommitResult doCommit(
postCommitHooks.add(new CheckpointHook(dataPath, commitAsVersion));
}

buildPostCommitCrcInfoIfCurrentCrcAvailable(
commitAsVersion, transactionMetrics.captureTransactionMetricsResult())
.ifPresent(crcInfo -> postCommitHooks.add(new ChecksumSimpleHook(crcInfo, logPath)));

return new TransactionCommitResult(commitAsVersion, postCommitHooks);
} catch (FileAlreadyExistsException e) {
throw e;
Expand Down Expand Up @@ -437,6 +445,36 @@ private void recordTransactionReport(
engine.getMetricsReporters().forEach(reporter -> reporter.report(transactionReport));
}

private Optional<CRCInfo> buildPostCommitCrcInfoIfCurrentCrcAvailable(
long commitAtVersion, TransactionMetricsResult metricsResult) {
if (isNewTable) {
return Optional.of(
new CRCInfo(
commitAtVersion,
metadata,
protocol,
metricsResult.getTotalAddFilesSizeInBytes(),
metricsResult.getNumAddFiles(),
Optional.of(txnId.toString())));
}

return readSnapshot
.getCurrentCrcInfo()
// in the case of a conflicting txn and successful retry the readSnapshot may not be
// commitVersion - 1
.filter(lastCrcInfo -> commitAtVersion == lastCrcInfo.getVersion() + 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: want to add a comment like // in the case of a conflicting txn and successful retry the readSnapshot may not be commitVersion - 1

When I first saw this it was intuitive to me immediately why we needed to filter

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note, in those cases, is it possible for us to see if there was a CRC written and use that for simple crc write?

This can be a P2+ but maybe we should track it somewhere (i.e. create a github issue?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a good optimization, filed #4177 for track

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should all just be a part of the rebase process. When we rebase, we need to list + read all the JSONs and do various conflict checkpoing. We might as well list + read the latest CRC file, too.

.map(
lastCrcInfo ->
new CRCInfo(
commitAtVersion,
metadata,
protocol,
// TODO: handle RemoveFiles for calculating table size and num of files.
lastCrcInfo.getTableSizeBytes() + metricsResult.getTotalAddFilesSizeInBytes(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a TODO that in the future we will need to handle RemoveFiles

lastCrcInfo.getNumFiles() + metricsResult.getNumAddFiles(),
Optional.of(txnId.toString())));
}

/**
* Get the part of the schema of the table that needs the statistics to be collected per file.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.util.InternalUtils;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CRCInfo {
private static final Logger logger = LoggerFactory.getLogger(CRCInfo.class);

// Constants for schema field names
private static final String TABLE_SIZE_BYTES = "tableSizeBytes";
private static final String NUM_FILES = "numFiles";
private static final String NUM_METADATA = "numMetadata";
private static final String NUM_PROTOCOL = "numProtocol";
private static final String METADATA = "metadata";
private static final String PROTOCOL = "protocol";
private static final String TXN_ID = "txnId";

public static final StructType CRC_FILE_SCHEMA =
new StructType()
.add(TABLE_SIZE_BYTES, LongType.LONG)
.add(NUM_FILES, LongType.LONG)
.add(NUM_METADATA, LongType.LONG)
.add(NUM_PROTOCOL, LongType.LONG)
.add(METADATA, Metadata.FULL_SCHEMA)
.add(PROTOCOL, Protocol.FULL_SCHEMA)
.add(TXN_ID, StringType.STRING, /*nullable*/ true);

public static Optional<CRCInfo> fromColumnarBatch(
long version, ColumnarBatch batch, int rowId, String crcFilePath) {
// Read required fields.
Protocol protocol =
Protocol.fromColumnVector(batch.getColumnVector(getSchemaIndex(PROTOCOL)), rowId);
Metadata metadata =
Metadata.fromColumnVector(batch.getColumnVector(getSchemaIndex(METADATA)), rowId);
long tableSizeBytes =
InternalUtils.requireNonNull(
batch.getColumnVector(getSchemaIndex(TABLE_SIZE_BYTES)), rowId, TABLE_SIZE_BYTES)
.getLong(rowId);
long numFiles =
InternalUtils.requireNonNull(
batch.getColumnVector(getSchemaIndex(NUM_FILES)), rowId, NUM_FILES)
.getLong(rowId);

// Read optional fields
ColumnVector txnIdColumnVector = batch.getColumnVector(getSchemaIndex(TXN_ID));
Optional<String> txnId =
txnIdColumnVector.isNullAt(rowId)
? Optional.empty()
: Optional.of(txnIdColumnVector.getString(rowId));

// protocol and metadata are nullable per fromColumnVector's implementation.
if (protocol == null || metadata == null) {
logger.warn("Invalid checksum file missing protocol and/or metadata: {}", crcFilePath);
return Optional.empty();
}
return Optional.of(new CRCInfo(version, metadata, protocol, tableSizeBytes, numFiles, txnId));
}

private final long version;
private final Metadata metadata;
private final Protocol protocol;
private final long tableSizeBytes;
private final long numFiles;
private final Optional<String> txnId;

public CRCInfo(
long version,
Metadata metadata,
Protocol protocol,
long tableSizeBytes,
long numFiles,
Optional<String> txnId) {
checkArgument(tableSizeBytes >= 0);
checkArgument(numFiles >= 0);
this.version = version;
this.metadata = requireNonNull(metadata);
this.protocol = requireNonNull(protocol);
this.tableSizeBytes = tableSizeBytes;
this.numFiles = numFiles;
this.txnId = requireNonNull(txnId);
}

/** The version of the Delta table that this CRCInfo represents. */
public long getVersion() {
return version;
}

/** The {@link Metadata} stored in this CRCInfo. */
public Metadata getMetadata() {
return metadata;
}

/** The {@link Protocol} stored in this CRCInfo. */
public Protocol getProtocol() {
return protocol;
}

public long getNumFiles() {
return numFiles;
}

public long getTableSizeBytes() {
return tableSizeBytes;
}

public Optional<String> getTxnId() {
return txnId;
}

/**
* Encode as a {@link Row} object with the schema {@link CRCInfo#CRC_FILE_SCHEMA}.
*
* @return {@link Row} object with the schema {@link CRCInfo#CRC_FILE_SCHEMA}
*/
public Row toRow() {
Map<Integer, Object> values = new HashMap<>();
// Add required fields
values.put(getSchemaIndex(TABLE_SIZE_BYTES), tableSizeBytes);
values.put(getSchemaIndex(NUM_FILES), numFiles);
values.put(getSchemaIndex(NUM_METADATA), 1L);
values.put(getSchemaIndex(NUM_PROTOCOL), 1L);
values.put(getSchemaIndex(METADATA), metadata.toRow());
values.put(getSchemaIndex(PROTOCOL), protocol.toRow());

// Add optional fields
txnId.ifPresent(txn -> values.put(getSchemaIndex(TXN_ID), txn));
return new GenericRow(CRC_FILE_SCHEMA, values);
}

private static int getSchemaIndex(String fieldName) {
return CRC_FILE_SCHEMA.indexOf(fieldName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.replay;
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.util.FileNames.*;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
Expand Down Expand Up @@ -96,7 +96,7 @@ private static Optional<CRCInfo> readChecksumFile(Engine engine, Path filePath)
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(FileStatus.of(filePath.toString())),
CRCInfo.FULL_SCHEMA,
CRCInfo.CRC_FILE_SCHEMA,
Optional.empty())) {
// We do this instead of iterating through the rows or using `getSingularRow` so we
// can use the existing fromColumnVector methods in Protocol, Metadata, Format etc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Writers for writing checksum files from a snapshot */
public class ChecksumWriter {

private static final Logger logger = LoggerFactory.getLogger(ChecksumWriter.class);

private final Path logPath;

public ChecksumWriter(Path logPath) {
this.logPath = requireNonNull(logPath);
}

/** Writes a checksum file */
public void writeCheckSum(Engine engine, CRCInfo crcInfo) throws IOException {
Path newChecksumPath = FileNames.checksumFile(logPath, crcInfo.getVersion());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some validations to make sure we are not writing some incorrect values (e.g. numMetadata is not 1)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add validations, make sure to add tests for it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also instead add this validation as part of the CRCInfo constructor?

logger.info("Writing checksum file to path: {}", newChecksumPath);
wrapEngineExceptionThrowsIO(
() -> {
engine
.getJsonHandler()
.writeJsonFileAtomically(
newChecksumPath.toString(),
singletonCloseableIterator(crcInfo.toRow()),
false /* overwrite */);
logger.info("Write checksum file `{}` succeeds", newChecksumPath);
return null;
},
"Write checksum file `%s`",
newChecksumPath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.hook;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.engine.Engine;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.checksum.ChecksumWriter;
import io.delta.kernel.internal.fs.Path;
import java.io.IOException;

/**
* A post-commit hook that writes a new checksum file at the version committed by the transaction.
* This hook performs a simple checksum operation without requiring previous checkpoint or log
* reading.
*/
public class ChecksumSimpleHook implements PostCommitHook {

private final CRCInfo crcInfo;
private final Path logPath;

public ChecksumSimpleHook(CRCInfo crcInfo, Path logPath) {
this.crcInfo = requireNonNull(crcInfo);
this.logPath = requireNonNull(logPath);
}

@Override
public void threadSafeInvoke(Engine engine) throws IOException {
checkArgument(engine != null);
new ChecksumWriter(logPath).writeCheckSum(engine, crcInfo);
}

@Override
public PostCommitHookType getType() {
return PostCommitHookType.CHECKSUM_SIMPLE;
}
}
Loading
Loading