Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config option to allow checkpointing with unsupported reader/writer versions #17

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,18 @@ type CheckpointConfiguration struct {
// Maximum numbers of rows to include in each multi-part checkpoint part
// Current default 50k
MaxRowsPerPart int
// Allow checkpointing even if the table reader version or writer version is greater than supported
// by this client. Defaults to false.
// **WARNING** If you set this to true and the table being checkpointed uses features that are not supported by this
// client, the resulting checkpoint might fail unpredictably and silently; this could cause data loss or corruption
UnsafeIgnoreUnsupportedReaderWriterVersionErrors bool
}

func NewCheckpointConfiguration() *CheckpointConfiguration {
checkpointConfiguration := new(CheckpointConfiguration)
// TODO try to find what Spark uses
checkpointConfiguration.MaxRowsPerPart = 50000
checkpointConfiguration.UnsafeIgnoreUnsupportedReaderWriterVersionErrors = false
return checkpointConfiguration
}

Expand Down
52 changes: 52 additions & 0 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,3 +1127,55 @@ func TestCheckpointUnlockFailure(t *testing.T) {
t.Fatal("did not create checkpoint")
}
}

func TestCheckpointInvalidVersion(t *testing.T) {
store, stateStore, lock, checkpointLock := setupCheckpointTest(t, "", false)

table := NewDeltaTable[tombstonesTestData, emptyTestStruct](store, lock, stateStore)

metadata := NewDeltaTableMetaData("", "", Format{}, GetSchema(new(tombstonesTestData)), make([]string, 0), map[string]string{string(DeletedFileRetentionDurationDeltaConfigKey): "interval 1 minute"})
protocol := Protocol{MinReaderVersion: 3, MinWriterVersion: 7}
err := table.Create(*metadata, protocol, CommitInfo{}, make([]AddPartitioned[tombstonesTestData, emptyTestStruct], 0))
if !errors.Is(err, ErrorUnsupportedReaderVersion) || !errors.Is(err, ErrorUnsupportedWriterVersion) {
t.Error("should return unsupported reader/writer version errors")
if err != nil {
t.Error(err)
}
}
add1 := getTestAdd[tombstonesTestData, emptyTestStruct](3 * 60 * 1000) // 3 mins ago
add2 := getTestAdd[tombstonesTestData, emptyTestStruct](2 * 60 * 1000) // 2 mins ago
v, err := testDoCommit(t, table, []Action{add1})
if err != nil {
t.Fatal(err)
}
if v != 1 {
t.Errorf("Version is %d, expected 1", v)
}
v, err = testDoCommit(t, table, []Action{add2})
if err != nil {
t.Fatal(err)
}
if v != 2 {
t.Errorf("Version is %d, expected 2", v)
}

// Create a checkpoint with default configuration - should fail
configuration := NewCheckpointConfiguration()
checkpointed, err := CreateCheckpoint[tombstonesTestData, emptyTestStruct](store, checkpointLock, configuration, 2)
if !errors.Is(err, ErrorUnsupportedReaderVersion) || !errors.Is(err, ErrorUnsupportedWriterVersion) {
t.Error("should return unsupported reader/writer version errors")
}
if checkpointed {
t.Error("should not create checkpoint with default configuration")
}

// Create a checkpoint with unsafe ignore option set in configuration - should succeed
configuration.UnsafeIgnoreUnsupportedReaderWriterVersionErrors = true
checkpointed, err = CreateCheckpoint[tombstonesTestData, emptyTestStruct](store, checkpointLock, configuration, 2)
if err != nil {
t.Error("should not return an error")
}
if !checkpointed {
t.Error("should create checkpoint with modified configuration")
}
}
12 changes: 8 additions & 4 deletions delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ func (table *DeltaTable[RowType, PartitionType]) Create(metadata DeltaTableMetaD

// If either version is too high, we return an error, but we still create the table first
if protocol.MinReaderVersion > MAX_READER_VERSION_SUPPORTED {
return ErrorUnsupportedReaderVersion
err = ErrorUnsupportedReaderVersion
}
if protocol.MinWriterVersion > MAX_WRITER_VERSION_SUPPORTED {
return ErrorUnsupportedWriterVersion
err = errors.Join(err, ErrorUnsupportedWriterVersion)
}

return nil
return err
}

// / Exists checks if a DeltaTable with version 0 exists in the object store.
Expand Down Expand Up @@ -468,7 +468,11 @@ func CreateCheckpoint[RowType any, PartitionType any](store storage.ObjectStore,
// The table doesn't need a commit lock or state store as we are not going to perform any commits
table, err := OpenTableWithVersion[RowType, PartitionType](store, nil, nil, version)
if err != nil {
return false, err
// If the UnsafeIgnoreUnsupportedReaderWriterVersionErrors option is true, we can ignore unsupported version errors
isUnsupportedVersionError := errors.Is(err, ErrorUnsupportedReaderVersion) || errors.Is(err, ErrorUnsupportedWriterVersion)
if !(isUnsupportedVersionError && checkpointConfiguration.UnsafeIgnoreUnsupportedReaderWriterVersionErrors) {
return false, err
}
}
locked, err := checkpointLock.TryLock()
if err != nil {
Expand Down