Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel]Add simple crc post commit for incremental crc writing. #4134

Merged
merged 78 commits into from
Feb 20, 2025

Conversation

huan233usc
Copy link
Collaborator

@huan233usc huan233usc commented Feb 8, 2025

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

This PR introduces a new post commit hook - ChecksumSimple, for writing CRC file after txn commit.
CRC file will only be written only commit version - 1's snapshot reads CRC during state construction

Other case will be handled in a separate PR

How was this patch tested?

E2e test

Does this PR introduce any user-facing changes?

No

@huan233usc huan233usc self-assigned this Feb 10, 2025
@huan233usc huan233usc marked this pull request as ready for review February 10, 2025 23:18
Copy link
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Left some feedback on some of the code comments + organziation. Thanks!

Also -- please update the PR description.


private final Path logPath;
// Constants for schema field names
private static final String TABLE_SIZE_BYTES = "tableSizeBytes";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static fields should come before member fields

}
}

test("create table as select and verify checksum") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry - what's the benefit of testing "create table as select" vs just "create table and insert to"?

are you expecting the CRC to be/look different between the two cases?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only difference is the 00000.crc, ctas has added file while create table doesn't, so I added this test for completeness.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the CTAS case, we believe it is a bit redundant here.

.withSchema(engine, new StructType().add("id", INTEGER))
.build(engine)

copyAndCommitTxnForUnpartitionedTable(txn, engine, sparkTablePath, versionAtCommit = 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't quite understand what this method is doing -- can you explain to me (on this PR, not in the code)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to make the test logic like

1.commit to spark table using ctas/dml, which produces a log with add file
2.read the log in step 1 and commit to the kernel table (make sore both table's log pointing to the same sets of parquet file to validate table size is correct in crc)
3.verify crc written stats are aligned

This method is mainly for step 2.

Does "copyDeltaLogFromSparkToKernel" a better name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some more documentation to the test class and method as follow up


return readSnapshot
.getCurrentCrcInfo()
.filter(lastCrcInfo -> commitAtVersion == lastCrcInfo.getVersion() + 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: want to add a comment like // in the case of a conflicting txn and successful retry the readSnapshot may not be commitVersion - 1

When I first saw this it was intuitive to me immediately why we needed to filter

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note, in those cases, is it possible for us to see if there was a CRC written and use that for simple crc write?

This can be a P2+ but maybe we should track it somewhere (i.e. create a github issue?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a good optimization, filed #4177 for track

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should all just be a part of the rebase process. When we rebase, we need to list + read all the JSONs and do various conflict checkpoing. We might as well list + read the latest CRC file, too.

Comment on lines 75 to 76
checkArgument(!batch.getColumnVector(getSchemaIndex(TABLE_SIZE_BYTES)).isNullAt(rowId));
checkArgument(!batch.getColumnVector(getSchemaIndex(NUM_FILES)).isNullAt(rowId));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think technically we want to do the isNullAt check before we access the value. Basically we should prefix any CV access with a null check

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I found the util function, you should just use InternalUtils.requireNonNull for all the required fields

See Metadata.fromColumnVector for examples on how to use it

)
}

private def checkMetadata(expectedMetadata: Metadata, actualMetadataRow: Row): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we fix these checkMetadata and checkProtocol methods in another PR I am good with this for now

}
}

/**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned that this method of testing isn't going to scale well, especially as we add support for other operations besides append for example.

Is there maybe an easier way we can do this or maybe this is overkill?

How do we test CRC writes within delta-spark? Can we match that level of testing?

Off the top of my head I think I would (1) want to test the crc content is correct and (2) check that delta spark can read/use the crc generated by Kernel. I'm not sure we need to assert they are exactly the same.

Open to differing opinions but would like to avoid overcomplicating our tests if possible.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise maybe there's at least some way to simplify these tests? For example, I think we can insert values into spark using the DF writer instead of building a SQL query. I also wonder is there any major difference for partitioned vs unpartitioned for CRC specifically?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if we need this, I think maybe there are simpler ways to do the conversion from Spark table to Delta actions. Can you try using TableImpl.getRawChanges maybe? You can possibly just query the file actions and then directly commit them to the Kernel table.

I'm thinking it would be much simpler and better to basically

  • In the test, make the changes directly to the Spark table (self defining)
  • Call fx to convert to Kernel commit (this doesn't require things like partition info, as that can be gathered from the add files I think. it should just take the version to convert as input)
  • Check CRC is the same

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark does the a/b comparison, (not at file level though)

assert(checksum.copy(txnId = None) === computedChecksum)

I think the value of this test is to ensure our behavior(especially file size collection) is the same as spark

Can you try using TableImpl.getRawChanges maybe?
This is an excellent suggestion! It simply the code a lot

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aae9d64 is what I was trying to outline earlier but hard to explain

assert(columnarBatches.hasNext, "Empty checksum file")
val crcRow = columnarBatches.next()
assert(crcRow.getSize === 1, s"Expected single row, found ${crcRow.getSize}")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you decide what to test content wise? Can you maybe explain what we check vs don't check and why?

@@ -460,4 +462,11 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
.stream()
.anyMatch(hook => hook.getType == PostCommitHookType.CHECKPOINT) === isReadyForCheckpoint)
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please document that all child suites should use this instead of .commit directly, and explain why this is!! also maybe put it at the top of the file in all caps (in DeltaTableWriteSuite)

checkChecksum(path, expSchema)
}

def checkChecksum(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please document what this does and doesn't check

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the method name to verifyChecksumValid and add docs

import io.delta.kernel.types.StructType
import io.delta.kernel.utils.{CloseableIterable, FileStatus}

class DeltaTableWriteWithCrcSuite extends DeltaTableWritesSuite {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class docs might also be useful here (i.e. we just check that crc is generated for all txn basically right? nothing more?)

Copy link
Collaborator

@vkorukanti vkorukanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, some followup test work which can be in followup PRs.

@@ -794,6 +797,8 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa

test("insert into table - idempotent writes") {
withTempDirAndEngine { (tblPath, engine) =>
// TODO: re-enable when CRC_FULL post commit hook is added, txn2 requires CRC_FULL
assume(this.suiteName != ("DeltaTableWriteWithCrcSuite"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add this to the ingore list in the DeltaTableWriteWithCrcSuite? This can be a separate PR. I think we may need support for ingoreList similar to delta-spark tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can make IgnoreTests a mixin trait

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address it seperately

@@ -460,4 +462,11 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
.stream()
.anyMatch(hook => hook.getType == PostCommitHookType.CHECKPOINT) === isReadyForCheckpoint)
}

protected def commitTransaction(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an oneliner doc to this base class on the need to commit through this method.

Copy link
Collaborator

@allisonport-db allisonport-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@scottsand-db scottsand-db merged commit 7e85686 into delta-io:master Feb 20, 2025
21 checks passed
assert(crc1.getMetadata.getPartitionColNames === crc2.getMetadata.getPartitionColNames)
}

// TODO docs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add these docs in one of the follow up PRs?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants