Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel]Add simple crc post commit for incremental crc writing. #4134

Merged
merged 78 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
1c05213
basic crc simple writer
huan233usc Feb 7, 2025
ca06a90
first test
huan233usc Feb 7, 2025
3caed70
add tests
huan233usc Feb 7, 2025
3b7dc2c
add tests
huan233usc Feb 10, 2025
2f58dfa
fix scala format
huan233usc Feb 10, 2025
e7f4beb
add unit test
huan233usc Feb 10, 2025
52bda11
add doc
huan233usc Feb 10, 2025
d1626dc
format java
huan233usc Feb 10, 2025
2a1092d
format scala
huan233usc Feb 10, 2025
6c8069f
fix test
huan233usc Feb 10, 2025
471e172
fix test
huan233usc Feb 10, 2025
8c3c33b
refactor 1
huan233usc Feb 10, 2025
5d0b5a7
refactor 2
huan233usc Feb 10, 2025
6796fa5
remove redundant doc
huan233usc Feb 10, 2025
2a93c49
fix doc
huan233usc Feb 10, 2025
826f623
fix scala
huan233usc Feb 10, 2025
ebaf2c8
resolve comment
huan233usc Feb 12, 2025
000a36d
fix indent
huan233usc Feb 12, 2025
0c425be
fix test
huan233usc Feb 12, 2025
e972c31
fix indent
huan233usc Feb 12, 2025
73df627
fix indent
huan233usc Feb 12, 2025
1df8af5
rename method
huan233usc Feb 12, 2025
a3ba086
fix indent
huan233usc Feb 12, 2025
ee72c53
fix build
huan233usc Feb 12, 2025
c8cf5b1
fix test
huan233usc Feb 12, 2025
d922524
fix test naming
huan233usc Feb 14, 2025
d2fc815
add doc to tests, simply the test code
huan233usc Feb 14, 2025
d1504f0
rename params
huan233usc Feb 14, 2025
78477d1
fix comment
huan233usc Feb 14, 2025
7f7e9e7
fix comment
huan233usc Feb 14, 2025
bbd9ea3
add doc
huan233usc Feb 14, 2025
76a5c38
move to row to crc
huan233usc Feb 18, 2025
b723b7d
Merge branch 'master' into crc_simple
huan233usc Feb 18, 2025
f85c845
fix java
huan233usc Feb 18, 2025
97f36fc
fix indent
huan233usc Feb 18, 2025
56b2e7e
format scala
huan233usc Feb 18, 2025
a06bf98
format scala
huan233usc Feb 18, 2025
1e7828b
share utils code
huan233usc Feb 18, 2025
4cddb67
share utils code
huan233usc Feb 18, 2025
dc4c94c
refactor test
huan233usc Feb 18, 2025
bf6613b
refactor test
huan233usc Feb 18, 2025
6b81868
Merge branch 'master' into crc_simple
huan233usc Feb 18, 2025
daa03fd
add checks
huan233usc Feb 19, 2025
0ac1005
resolve review
huan233usc Feb 19, 2025
63f2f46
Refactor testcase to subclassing
huan233usc Feb 19, 2025
622d28d
revert unnecessary code
huan233usc Feb 19, 2025
13bb0a7
revert unnecessary code 2/n
huan233usc Feb 19, 2025
972109a
revert unnecessary code 3/n
huan233usc Feb 19, 2025
c52c58e
revert unnecessary code 4/n
huan233usc Feb 19, 2025
bba00bd
add null check
huan233usc Feb 19, 2025
93b9aeb
fixing doc
huan233usc Feb 19, 2025
a11badd
fixing doc
huan233usc Feb 19, 2025
cf33624
fixing doc
huan233usc Feb 19, 2025
a6189d7
fixing header
huan233usc Feb 19, 2025
26cd8b8
fixing java
huan233usc Feb 19, 2025
c0178ec
group method
huan233usc Feb 19, 2025
cff4b84
merge head
huan233usc Feb 19, 2025
658f770
revert
huan233usc Feb 19, 2025
6c70ba5
revert
huan233usc Feb 19, 2025
abc2bd9
revert
huan233usc Feb 19, 2025
305997e
revert
huan233usc Feb 19, 2025
e7710cb
typo
huan233usc Feb 19, 2025
0bec898
grouping
huan233usc Feb 19, 2025
d392927
todo
huan233usc Feb 19, 2025
f60b9c6
fix doc
huan233usc Feb 19, 2025
c4ae5e5
fix check
huan233usc Feb 19, 2025
a6be52b
fix test
huan233usc Feb 20, 2025
eb1bca1
fix test
huan233usc Feb 20, 2025
941d9c8
fix test
huan233usc Feb 20, 2025
a14d42e
utils method
huan233usc Feb 20, 2025
c4462a7
fix scala
huan233usc Feb 20, 2025
744beaf
fix test
huan233usc Feb 20, 2025
562cfe4
revert
huan233usc Feb 20, 2025
b639110
add docs
huan233usc Feb 20, 2025
5a34d7e
patch https://github.com/delta-io/delta/commit/aae9d649fd6a9ca4b87c74…
huan233usc Feb 20, 2025
b9b4aac
remove patch file
huan233usc Feb 20, 2025
7900389
Merge branch 'master' into crc_simple
huan233usc Feb 20, 2025
a2e2883
resolve commit
huan233usc Feb 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ enum PostCommitHookType {
* perform this operation, reading previous checkpoint + logs is required to construct a new
* checkpoint, with latency scaling based on log size (typically seconds to minutes).
*/
CHECKPOINT
CHECKPOINT,
/**
* Writes a checksum file at the version committed by the transaction. This hook is present when
* all required table statistics(e.g. table size) for checksum file are known when a transaction
* commits. This operation has a minimal latency with no requirement of reading previous
* checkpoint or logs.
*/
CHECKSUM_SIMPLE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay I find these docs super clear!

}

/** Invokes the post commit operation whose implementation must be thread safe. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.replay.CRCInfo;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
import io.delta.kernel.expressions.Column;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.hook.CheckpointHook;
import io.delta.kernel.internal.hook.ChecksumSimpleHook;
import io.delta.kernel.internal.metrics.TransactionMetrics;
import io.delta.kernel.internal.metrics.TransactionReportImpl;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.metrics.TransactionMetricsResult;
import io.delta.kernel.metrics.TransactionReport;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
Expand Down Expand Up @@ -361,6 +364,9 @@ private TransactionCommitResult doCommit(
postCommitHooks.add(new CheckpointHook(dataPath, commitAsVersion));
}

buildPostCommitCrcInfo(commitAsVersion, transactionMetrics.captureTransactionMetricsResult())
.ifPresent(crcInfo -> postCommitHooks.add(new ChecksumSimpleHook(crcInfo, logPath)));

return new TransactionCommitResult(commitAsVersion, postCommitHooks);
} catch (FileAlreadyExistsException e) {
throw e;
Expand Down Expand Up @@ -436,6 +442,53 @@ private void recordTransactionReport(
engine.getMetricsReporters().forEach(reporter -> reporter.report(transactionReport));
}

private Optional<CRCInfo> buildPostCommitCrcInfo(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add simple to method name? or some other reference that this is only if previous crc is avail

long commitAtVersion, TransactionMetricsResult metricsResult) {
// Create table
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: superfluous comment

if (isNewTable) {
return Optional.of(
new CRCInfo(
commitAtVersion,
metadata,
protocol,
metricsResult.getTotalAddFilesSizeInBytes(),
metricsResult.getNumAddFiles(),
Optional.of(txnId.toString())));
}
// We cannot compute the table statistic if the crc info of commitAtVersion is missing
if (!readSnapshot.getCurrentCrcInfo().isPresent()
|| commitAtVersion != readSnapshot.getCurrentCrcInfo().get().getVersion() + 1) {
return Optional.empty();
}

return calculateNewCrcInfo(commitAtVersion, metricsResult);
}

private Optional<CRCInfo> calculateNewCrcInfo(
long commitAtVersion, TransactionMetricsResult metricsResult) {
CRCInfo lastCrcInfo =
readSnapshot
.getCurrentCrcInfo()
.orElseThrow(() -> new IllegalStateException("CRC info must be present at this point"));

return Optional.of(
new CRCInfo(
commitAtVersion,
metadata,
protocol,
calculateNewTableSize(lastCrcInfo, metricsResult),
calculateNewFileCount(lastCrcInfo, metricsResult),
Optional.of(txnId.toString())));
}

private long calculateNewTableSize(CRCInfo lastCrcInfo, TransactionMetricsResult metricsResult) {
return lastCrcInfo.getTableSizeBytes() + metricsResult.getTotalAddFilesSizeInBytes();
}

private long calculateNewFileCount(CRCInfo lastCrcInfo, TransactionMetricsResult metricsResult) {
return lastCrcInfo.getNumFiles() + metricsResult.getNumAddFiles();
}

/**
* Get the part of the schema of the table that needs the statistics to be collected per file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.replay;
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.checksum.ChecksumUtils.CRC_FILE_SCHEMA;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.types.StructType;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,31 +30,46 @@ public class CRCInfo {

public static Optional<CRCInfo> fromColumnarBatch(
long version, ColumnarBatch batch, int rowId, String crcFilePath) {
Protocol protocol = Protocol.fromColumnVector(batch.getColumnVector(PROTOCOL_ORDINAL), rowId);
Metadata metadata = Metadata.fromColumnVector(batch.getColumnVector(METADATA_ORDINAL), rowId);
Protocol protocol =
Protocol.fromColumnVector(
batch.getColumnVector(CRC_FILE_SCHEMA.indexOf("protocol")), rowId);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if you define PROTOCOL then you might as well use it here, eh?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my bad, I forgot to update them.

Metadata metadata =
Metadata.fromColumnVector(
batch.getColumnVector(CRC_FILE_SCHEMA.indexOf("metadata")), rowId);
long tableSizeBytes =
batch.getColumnVector(CRC_FILE_SCHEMA.indexOf("tableSizeBytes")).getLong(rowId);
long numFiles = batch.getColumnVector(CRC_FILE_SCHEMA.indexOf("numFiles")).getLong(rowId);
Optional<String> txnId =
Optional.ofNullable(
batch.getColumnVector(CRC_FILE_SCHEMA.indexOf("txnId")).getString(rowId));
// protocol and metadata are nullable per fromColumnVector's implementation.
if (protocol == null || metadata == null) {
logger.warn("Invalid checksum file missing protocol and/or metadata: {}", crcFilePath);
return Optional.empty();
}
return Optional.of(new CRCInfo(version, metadata, protocol));
return Optional.of(new CRCInfo(version, metadata, protocol, tableSizeBytes, numFiles, txnId));
}

// We can add additional fields later
public static final StructType FULL_SCHEMA =
new StructType().add("protocol", Protocol.FULL_SCHEMA).add("metadata", Metadata.FULL_SCHEMA);

private static final int PROTOCOL_ORDINAL = 0;
private static final int METADATA_ORDINAL = 1;

private final long version;
private final Metadata metadata;
private final Protocol protocol;
private final long tableSizeBytes;
private final long numFiles;
private final Optional<String> txnId;

protected CRCInfo(long version, Metadata metadata, Protocol protocol) {
public CRCInfo(
long version,
Metadata metadata,
Protocol protocol,
long tableSizeBytes,
long numFiles,
Optional<String> txnId) {
this.version = version;
this.metadata = requireNonNull(metadata);
this.protocol = requireNonNull(protocol);
this.tableSizeBytes = tableSizeBytes;
this.numFiles = numFiles;
this.txnId = txnId;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still requireNoneNull here

}

/** The version of the Delta table that this CRCInfo represents. */
Expand All @@ -71,4 +86,16 @@ public Metadata getMetadata() {
public Protocol getProtocol() {
return protocol;
}

public long getNumFiles() {
return numFiles;
}

public long getTableSizeBytes() {
return tableSizeBytes;
}

public Optional<String> getTxnId() {
return txnId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.replay;
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.util.FileNames.*;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
Expand Down Expand Up @@ -96,7 +96,7 @@ private static Optional<CRCInfo> readChecksumFile(Engine engine, Path filePath)
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(FileStatus.of(filePath.toString())),
CRCInfo.FULL_SCHEMA,
ChecksumUtils.CRC_FILE_SCHEMA,
Optional.empty())) {
// We do this instead of iterating through the rows or using `getSingularRow` so we
// can use the existing fromColumnVector methods in Protocol, Metadata, Format etc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.checksum;

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;

/** Helper class for shared components in checksum file read and write */
public final class ChecksumUtils {

public static StructType CRC_FILE_SCHEMA =
new StructType()
.add("tableSizeBytes", LongType.LONG)
.add("numFiles", LongType.LONG)
.add("numMetadata", LongType.LONG)
.add("numProtocol", LongType.LONG)
.add("metadata", Metadata.FULL_SCHEMA)
.add("protocol", Protocol.FULL_SCHEMA)
.add("txnId", StringType.STRING, /*nullable*/ true);

private ChecksumUtils() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.checksum.ChecksumUtils.CRC_FILE_SCHEMA;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Writers for writing checksum files from a snapshot */
public class ChecksumWriter {

private static final Logger logger = LoggerFactory.getLogger(ChecksumWriter.class);

private final Path logPath;
// Constants for schema field names
private static final String TABLE_SIZE_BYTES = "tableSizeBytes";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static fields should come before member fields

private static final String NUM_FILES = "numFiles";
private static final String NUM_METADATA = "numMetadata";
private static final String NUM_PROTOCOL = "numProtocol";
private static final String METADATA = "metadata";
private static final String PROTOCOL = "protocol";
private static final String TXN_ID = "txnId";

public ChecksumWriter(Path logPath) {
this.logPath = requireNonNull(logPath);
}

/** Writes a checksum file */
public void writeCheckSum(Engine engine, CRCInfo crcInfo) throws IOException {
// No sufficient information to write checksum file.
Path newChecksumPath = FileNames.checksumFile(logPath, crcInfo.getVersion());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some validations to make sure we are not writing some incorrect values (e.g. numMetadata is not 1)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add validations, make sure to add tests for it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also instead add this validation as part of the CRCInfo constructor?

logger.debug("Writing checksum file to path: {}", newChecksumPath);
wrapEngineExceptionThrowsIO(
() -> {
engine
.getJsonHandler()
.writeJsonFileAtomically(
newChecksumPath.toString(),
singletonCloseableIterator(buildCheckSumRow(crcInfo)),
false /* overwrite */);
return null;
},
"Write checksum file `%s`",
newChecksumPath);
}

private Row buildCheckSumRow(CRCInfo crcInfo) {
Copy link
Collaborator

@scottsand-db scottsand-db Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just have toRow on CRCInfo? I think that's a better spot for this than inside of ChecksumWriter. i.e. lower coupling, higher cohesion

Map<Integer, Object> values = new HashMap<>();
// Add required fields
values.put(getSchemaIndex(TABLE_SIZE_BYTES), crcInfo.getTableSizeBytes());
values.put(getSchemaIndex(NUM_FILES), crcInfo.getNumFiles());
values.put(getSchemaIndex(NUM_METADATA), 1L);
values.put(getSchemaIndex(NUM_PROTOCOL), 1L);
values.put(getSchemaIndex(METADATA), crcInfo.getMetadata().toRow());
values.put(getSchemaIndex(PROTOCOL), crcInfo.getProtocol().toRow());

// Add optional fields
crcInfo.getTxnId().ifPresent(txn -> values.put(getSchemaIndex(TXN_ID), txn));
return new GenericRow(CRC_FILE_SCHEMA, values);
}

private int getSchemaIndex(String fieldName) {
return CRC_FILE_SCHEMA.indexOf(fieldName);
}
}
Loading
Loading