Skip to content

Commit

Permalink
feat: support using a log store to manage concurrent commits (#35)
Browse files Browse the repository at this point in the history
* Rebase on master

* Clean up API design by moving fields to object store level

* Run formatter

* Update some comments

* Fix error message typo

* Remove unused parameter

* Fix calls to NewDeltaTableWithLogStore()

* Rename AddCommitInfo to AddCommitInfoIfNotPresent

* Rename Read to ReadActions

* Remove GitHub checks for Codecov

* Fix Codecov error

* Update comments for Client interfaces

---------

Co-authored-by: Rahul Madnawat <rahulmadnawat@rivian.com>
  • Loading branch information
rahulmadnawat and rahulmadnawat authored Sep 25, 2023
1 parent 99d71da commit abb80c1
Show file tree
Hide file tree
Showing 24 changed files with 2,080 additions and 515 deletions.
22 changes: 11 additions & 11 deletions checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,15 +978,15 @@ func TestCheckpointCleanupExpiredLogs(t *testing.T) {
}
now := time.Now()
// With cleanup enabled, 25 and 15 minutes ago should be deleted, 5 should not
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(0).Raw), now.Add(-25*time.Minute), now.Add(-25*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(0).Raw), now.Add(-25*time.Minute), now.Add(-25*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(2).Raw), now.Add(-5*time.Minute), now.Add(-5*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(2).Raw), now.Add(-5*time.Minute), now.Add(-5*time.Minute))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1105,27 +1105,27 @@ func TestCheckpointCleanupTimeAdjustment(t *testing.T) {
// 3: 13 min ago
// 4: 12 min ago
// 5: 6 min ago
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(0).Raw), now.Add(-20*time.Minute), now.Add(-20*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(0).Raw), now.Add(-20*time.Minute), now.Add(-20*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(2).Raw), now.Add(-10*time.Minute), now.Add(-10*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(2).Raw), now.Add(-10*time.Minute), now.Add(-10*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-13*time.Minute), now.Add(-13*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(3).Raw), now.Add(-13*time.Minute), now.Add(-13*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-12*time.Minute), now.Add(-12*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(3).Raw), now.Add(-12*time.Minute), now.Add(-12*time.Minute))
if err != nil {
t.Fatal(err)
}
err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-6*time.Minute), now.Add(-6*time.Minute))
err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(3).Raw), now.Add(-6*time.Minute), now.Add(-6*time.Minute))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1194,7 +1194,7 @@ func TestCheckpointLocked(t *testing.T) {
t.Fatal("unable to obtain lock")
}

localLock := filelock.New(store.BaseURI, "_delta_log/_checkpoint.lock", filelock.Options{})
localLock := filelock.New(store.BaseURI(), "_delta_log/_checkpoint.lock", filelock.Options{})

checkpointed, err := CreateCheckpoint(store, localLock, NewCheckpointConfiguration(), 5)
if !errors.Is(err, lock.ErrLockNotObtained) {
Expand All @@ -1220,7 +1220,7 @@ func TestCheckpointLocked(t *testing.T) {

func TestCheckpointUnlockFailure(t *testing.T) {
store, _, _, _ := setupCheckpointTest(t, "testdata/checkpoints/simple")
brokenLock := testBrokenUnlockLocker{*filelock.New(store.BaseURI, "_delta_log/_commit.lock", filelock.Options{TTL: 60 * time.Second})}
brokenLock := testBrokenUnlockLocker{*filelock.New(store.BaseURI(), "_delta_log/_commit.lock", filelock.Options{TTL: 60 * time.Second})}

checkpointed, err := CreateCheckpoint(store, &brokenLock, NewCheckpointConfiguration(), 5)
if !errors.Is(err, lock.ErrUnableToUnlock) {
Expand Down
15 changes: 15 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
coverage:
status:
project:
default:
informational: true
target: auto
threshold: "0.5%"
patch:
default:
informational: true
target: auto
threshold: "0.5%"

github_checks:
annotations: false
Loading

0 comments on commit abb80c1

Please sign in to comment.