From cadc12a5ebdf0ed736955e58b1dbbeb12271ca39 Mon Sep 17 00:00:00 2001 From: Rahul Madnawat Date: Fri, 8 Sep 2023 14:29:53 -0700 Subject: [PATCH 1/9] Fix typo --- logstore/logstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstore/logstore.go b/logstore/logstore.go index 37ea7bfe..ddfb4a1f 100644 --- a/logstore/logstore.go +++ b/logstore/logstore.go @@ -15,7 +15,7 @@ package logstore import "github.com/rivian/delta-go/storage" type LogStore interface { - // Puts an entry into a log store in an exlusive way + // Puts an entry into a log store in an exclusive way Put(entry *CommitEntry, overwrite bool) error // Gets an entry corresponding to the Delta log file with given `tablePath` and `fileName` From b081a8ebaaf2c03f400ea5cc3009f354736169a9 Mon Sep 17 00:00:00 2001 From: Rahul Madnawat Date: Fri, 8 Sep 2023 15:39:37 -0700 Subject: [PATCH 2/9] Fix Dependabot error --- .github/dependabot.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index a1da9b63..459b7f17 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -4,7 +4,7 @@ updates: directory: "/" schedule: interval: daily - - package-ecosystem: GitHub-actions + - package-ecosystem: github-actions directory: "/" schedule: interval: daily \ No newline at end of file From 5321340a5eb39d755d4f99b56f56b1005a7b7f80 Mon Sep 17 00:00:00 2001 From: Rahul Madnawat Date: Mon, 11 Sep 2023 11:39:28 -0700 Subject: [PATCH 3/9] Remove method from being method of struct --- checkpoint_test.go | 32 ++++++++++++++++---------------- delta.go | 12 ++++++------ delta_test.go | 20 +++++++++----------- 3 files changed, 31 insertions(+), 33 deletions(-) diff --git a/checkpoint_test.go b/checkpoint_test.go index 7b220e1e..a8bfc1e4 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -136,7 +136,7 @@ func TestSimpleCheckpoint(t *testing.T) { } // Remove the previous log to make sure we use the checkpoint when loading - err = store.Delete(table.CommitUriFromVersion(4)) + err = store.Delete(CommitUriFromVersion(4)) if err != nil { t.Error(err) } @@ -181,7 +181,7 @@ func TestSimpleCheckpoint(t *testing.T) { } } // Remove the previous log to make sure we use the checkpoint when loading - err = store.Delete(table.CommitUriFromVersion(9)) + err = store.Delete(CommitUriFromVersion(9)) if err != nil { t.Error(err) } @@ -295,7 +295,7 @@ func TestTombstones(t *testing.T) { // Load the checkpoint // Remove the previous log to make sure we use the checkpoint when loading - err = store.Delete(table.CommitUriFromVersion(1)) + err = store.Delete(CommitUriFromVersion(1)) if err != nil { t.Error(err) } @@ -615,7 +615,7 @@ func TestMultiPartCheckpoint(t *testing.T) { } // Remove the previous commit to make sure we load the checkpoint files - err = store.Delete(table.CommitUriFromVersion(11)) + err = store.Delete(CommitUriFromVersion(11)) if err != nil { t.Error(err) } @@ -925,15 +925,15 @@ func TestCheckpointCleanupExpiredLogs(t *testing.T) { } now := time.Now() // With cleanup enabled, 25 and 15 minutes ago should be deleted, 5 should not - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(0).Raw), now.Add(-25*time.Minute), now.Add(-25*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(0).Raw), now.Add(-25*time.Minute), now.Add(-25*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(2).Raw), now.Add(-5*time.Minute), now.Add(-5*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(2).Raw), now.Add(-5*time.Minute), now.Add(-5*time.Minute)) if err != nil { t.Fatal(err) } @@ -1052,27 +1052,27 @@ func TestCheckpointCleanupTimeAdjustment(t *testing.T) { // 3: 13 min ago // 4: 12 min ago // 5: 6 min ago - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(0).Raw), now.Add(-20*time.Minute), now.Add(-20*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(0).Raw), now.Add(-20*time.Minute), now.Add(-20*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(2).Raw), now.Add(-10*time.Minute), now.Add(-10*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(2).Raw), now.Add(-10*time.Minute), now.Add(-10*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(3).Raw), now.Add(-13*time.Minute), now.Add(-13*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-13*time.Minute), now.Add(-13*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(3).Raw), now.Add(-12*time.Minute), now.Add(-12*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-12*time.Minute), now.Add(-12*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, table.CommitUriFromVersion(3).Raw), now.Add(-6*time.Minute), now.Add(-6*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-6*time.Minute), now.Add(-6*time.Minute)) if err != nil { t.Fatal(err) } @@ -1098,21 +1098,21 @@ func TestCheckpointCleanupTimeAdjustment(t *testing.T) { t.Fatal("did not remove version 1") } // We can't load versions 2 and 3 but the logs should persist - _, err = store.Head(table.CommitUriFromVersion(2)) + _, err = store.Head(CommitUriFromVersion(2)) if errors.Is(err, storage.ErrorObjectDoesNotExist) { t.Fatal("should not remove version 2") } if err != nil { t.Fatal(err) } - _, err = store.Head(table.CommitUriFromVersion(3)) + _, err = store.Head(CommitUriFromVersion(3)) if errors.Is(err, storage.ErrorObjectDoesNotExist) { t.Fatal("should not remove version 3") } if err != nil { t.Fatal(err) } - _, err = store.Head(table.CommitUriFromVersion(4)) + _, err = store.Head(CommitUriFromVersion(4)) if errors.Is(err, storage.ErrorObjectDoesNotExist) { t.Fatal("should not remove version 4") } diff --git a/delta.go b/delta.go index d3352864..b7217661 100644 --- a/delta.go +++ b/delta.go @@ -93,7 +93,7 @@ func (table *DeltaTable) CreateTransaction(options *DeltaTransactionOptions) *De } // / Return the uri of commit version. -func (table *DeltaTable) CommitUriFromVersion(version int64) *storage.Path { +func CommitUriFromVersion(version int64) *storage.Path { str := fmt.Sprintf("%020d.json", version) path := storage.PathFromIter([]string{"_delta_log", str}) return &path @@ -192,7 +192,7 @@ func (table *DeltaTable) Create(metadata DeltaTableMetaData, protocol Protocol, // / Exists checks if a DeltaTable with version 0 exists in the object store. func (table *DeltaTable) Exists() (bool, error) { - path := table.CommitUriFromVersion(0) + path := CommitUriFromVersion(0) meta, err := table.Store.Head(path) if errors.Is(err, storage.ErrorObjectDoesNotExist) { @@ -231,7 +231,7 @@ func (table *DeltaTable) Exists() (bool, error) { // / Read a commit log and return the actions from the log func (table *DeltaTable) ReadCommitVersion(version int64) ([]Action, error) { - path := table.CommitUriFromVersion(version) + path := CommitUriFromVersion(version) return ReadCommitLog(table.Store, path) } @@ -248,7 +248,7 @@ func (table *DeltaTable) LoadVersion(version *int64) error { var err error var checkpointLoadError error if version != nil { - commitURI := table.CommitUriFromVersion(*version) + commitURI := CommitUriFromVersion(*version) _, err := table.Store.Head(commitURI) if errors.Is(err, storage.ErrorObjectDoesNotExist) { return ErrorInvalidVersion @@ -460,7 +460,7 @@ func (table *DeltaTable) updateIncremental(maxVersion *int64) error { // / If the next commit doesn't exist, returns false in the third return parameter func (table *DeltaTable) nextCommitDetails() (int64, []Action, bool, error) { nextVersion := table.State.Version + 1 - nextCommitURI := table.CommitUriFromVersion(nextVersion) + nextCommitURI := CommitUriFromVersion(nextVersion) noMoreCommits := false actions, err := ReadCommitLog(table.Store, nextCommitURI) if errors.Is(err, storage.ErrorObjectDoesNotExist) { @@ -822,7 +822,7 @@ func (transaction *DeltaTransaction) TryCommit(commit *PreparedCommit) (err erro // 3) Try to Rename the file from := storage.NewPath(commit.URI.Raw) - to := transaction.DeltaTable.CommitUriFromVersion(version) + to := CommitUriFromVersion(version) err = transaction.DeltaTable.Store.RenameIfNotExists(from, to) if err != nil { log.Debugf("delta-go: RenameIfNotExists(from=%s, to=%s) attempt failed. %v", from.Raw, to.Raw, err) diff --git a/delta_test.go b/delta_test.go index 13fdac39..7a50218b 100644 --- a/delta_test.go +++ b/delta_test.go @@ -432,7 +432,7 @@ func TestDeltaTableExists(t *testing.T) { t.Error(err) } // Delete original version file - commitPath := filepath.Join(tmpDir, table.CommitUriFromVersion(0).Raw) + commitPath := filepath.Join(tmpDir, CommitUriFromVersion(0).Raw) err = os.Remove(commitPath) if err != nil { t.Error(err) @@ -449,7 +449,7 @@ func TestDeltaTableExists(t *testing.T) { } // Move the new version file to a backup folder that starts with _delta_log - commitPath = filepath.Join(tmpDir, table.CommitUriFromVersion(1).Raw) + commitPath = filepath.Join(tmpDir, CommitUriFromVersion(1).Raw) os.MkdirAll(filepath.Join(tmpDir, "_delta_log.bak"), 0700) fakeCommitPath := filepath.Join(tmpDir, "_delta_log.bak/00000000000000000000.json") err = os.Rename(commitPath, fakeCommitPath) @@ -508,11 +508,11 @@ func TestDeltaTableTryCommitLoopWithCommitExists(t *testing.T) { } //Some other process writes commit 0002.json - fakeCommit2 := filepath.Join(tmpDir, table.CommitUriFromVersion(2).Raw) + fakeCommit2 := filepath.Join(tmpDir, CommitUriFromVersion(2).Raw) os.WriteFile(fakeCommit2, []byte("temp commit data"), 0700) //Some other process writes commit 0003.json - fakeCommit3 := filepath.Join(tmpDir, table.CommitUriFromVersion(3).Raw) + fakeCommit3 := filepath.Join(tmpDir, CommitUriFromVersion(3).Raw) os.WriteFile(fakeCommit3, []byte("temp commit data"), 0700) //create the next commit, should be 004.json after trying 003.json @@ -536,8 +536,8 @@ func TestDeltaTableTryCommitLoopWithCommitExists(t *testing.T) { t.Errorf("want table.State.Version=4, has %d", table.State.Version) } - if !fileExists(filepath.Join(tmpDir, table.CommitUriFromVersion(4).Raw)) { - t.Errorf("File %s should exist", table.CommitUriFromVersion(4).Raw) + if !fileExists(filepath.Join(tmpDir, CommitUriFromVersion(4).Raw)) { + t.Errorf("File %s should exist", CommitUriFromVersion(4).Raw) } } @@ -597,7 +597,7 @@ func TestCommitConcurrent(t *testing.T) { t.Errorf("Final Version in lock should be 100") } - lastCommitFile := filepath.Join(tmpDir, table.CommitUriFromVersion(100).Raw) + lastCommitFile := filepath.Join(tmpDir, CommitUriFromVersion(100).Raw) if !fileExists(lastCommitFile) { t.Errorf("File should exist") } @@ -707,7 +707,7 @@ func TestCommitConcurrentWithParquet(t *testing.T) { t.Errorf("Final Version in lock should be 100") } - lastCommitFile := filepath.Join(tmpDir, table.CommitUriFromVersion(100).Raw) + lastCommitFile := filepath.Join(tmpDir, CommitUriFromVersion(100).Raw) if !fileExists(lastCommitFile) { t.Errorf("File should exist") } @@ -931,10 +931,8 @@ func TestCommitUriFromVersion(t *testing.T) { {input: 1234567890123456789, want: "_delta_log/01234567890123456789.json"}, } - table, _, _ := setupTest(t) - for _, tc := range tests { - got := table.CommitUriFromVersion(tc.input) + got := CommitUriFromVersion(tc.input) if got.Raw != tc.want { t.Errorf("expected %s, got %s", tc.want, got) } From 81d22001b089706751e74ff78930c81e69a735a9 Mon Sep 17 00:00:00 2001 From: Rahul Madnawat Date: Mon, 11 Sep 2023 14:16:12 -0700 Subject: [PATCH 4/9] Remove unnecessary pointers --- checkpoint.go | 18 +++---- checkpoint_test.go | 2 +- delta.go | 20 ++++---- delta_test.go | 4 +- internal/s3mock/s3mock.go | 16 +++--- lock/filelock/filelock.go | 4 +- logstore/dynamodblogstore/dynamodblogstore.go | 10 ++-- .../dynamodblogstore/dynamodblogstore_test.go | 12 ++--- logstore/logstore.go | 4 +- state/filestate/filestate.go | 4 +- storage/filestore/filestore.go | 28 +++++------ storage/filestore/filestore_test.go | 2 +- storage/s3store/s3store.go | 28 +++++------ storage/s3store/s3store_test.go | 8 +-- storage/storage.go | 50 +++++++++---------- tablestate.go | 2 +- 16 files changed, 105 insertions(+), 107 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index e90bc807..6817d589 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -87,15 +87,15 @@ func checkpointFromBytes(bytes []byte) (*CheckPoint, error) { return checkpoint, nil } -func lastCheckpointPath() *storage.Path { +func lastCheckpointPath() storage.Path { path := storage.PathFromIter([]string{"_delta_log", "_last_checkpoint"}) - return &path + return path } // / Return the checkpoint version and total parts, and the current part index if the URI is a valid checkpoint filename // / If the checkpoint is single-part then part and checkpoint.Parts will both be zero // / If the URI is not a valid checkpoint filename then checkpoint will be nil -func checkpointInfoFromURI(path *storage.Path) (checkpoint *CheckPoint, part int32, parseErr error) { +func checkpointInfoFromURI(path storage.Path) (checkpoint *CheckPoint, part int32, parseErr error) { // Check for a single-part checkpoint groups := checkpointRegex.FindStringSubmatch(path.Base()) if len(groups) == 2 { @@ -145,7 +145,7 @@ func doesCheckpointVersionExist(store storage.ObjectStore, version int64, valida // List all files starting with the version prefix. This will also find commit logs and possible crc files str := fmt.Sprintf("%020d", version) path := storage.PathFromIter([]string{"_delta_log", str}) - possibleCheckpointFiles, err := store.List(&path, nil) + possibleCheckpointFiles, err := store.List(path, nil) if err != nil { return false, err } @@ -155,7 +155,7 @@ func doesCheckpointVersionExist(store storage.ObjectStore, version int64, valida totalParts := int32(0) for _, possibleCheckpointFile := range possibleCheckpointFiles.Objects { - checkpoint, currentPart, err := checkpointInfoFromURI(&possibleCheckpointFile.Location) + checkpoint, currentPart, err := checkpointInfoFromURI(possibleCheckpointFile.Location) if err != nil { return false, err } @@ -231,11 +231,11 @@ func createCheckpointFor(tableState *DeltaTableState, store storage.ObjectStore, checkpointFileName = fmt.Sprintf("%020d.checkpoint.%010d.%010d.parquet", tableState.Version, part+1, numParts) } checkpointPath := storage.PathFromIter([]string{"_delta_log", checkpointFileName}) - _, err = store.Head(&checkpointPath) + _, err = store.Head(checkpointPath) if !errors.Is(err, storage.ErrorObjectDoesNotExist) { return ErrorCheckpointAlreadyExists } - err = store.Put(&checkpointPath, parquetBytes) + err = store.Put(checkpointPath, parquetBytes) if err != nil { return err } @@ -302,7 +302,7 @@ func flushDeleteFiles(store storage.ObjectStore, maybeToDelete []DeletionCandida lastMaybeToDelete := maybeToDelete[len(maybeToDelete)-1] if lastMaybeToDelete.Version < beforeVersion && lastMaybeToDelete.Meta.LastModified.UnixMilli() <= maxTimestamp.UnixMilli() { for _, deleteFile := range maybeToDelete { - err := store.Delete(&deleteFile.Meta.Location) + err := store.Delete(deleteFile.Meta.Location) if err != nil { return deleted, err } @@ -334,7 +334,7 @@ func removeExpiredLogsAndCheckpoints(beforeVersion int64, maxTimestamp time.Time if errors.Is(err, storage.ErrorObjectDoesNotExist) { break } - isValid, version := CommitOrCheckpointVersionFromUri(&meta.Location) + isValid, version := CommitOrCheckpointVersionFromUri(meta.Location) // Spark and Rust clients also use the file's last updated timestamp rather than opening the commit and using internal state if isValid && version < beforeVersion && meta.LastModified.Before(maxTimestamp) { candidatesForDeletion = append(candidatesForDeletion, DeletionCandidate{Version: version, Meta: *meta}) diff --git a/checkpoint_test.go b/checkpoint_test.go index a8bfc1e4..bd07eeac 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -34,7 +34,7 @@ import ( ) // / Helper function to set up test state -func setupCheckpointTest(t *testing.T, inputFolder string, overrideStore bool) (store *filestore.FileObjectStore, state state.StateStore, lock lock.Locker, checkpointLock lock.Locker) { +func setupCheckpointTest(t *testing.T, inputFolder string, overrideStore bool) (store filestore.FileObjectStore, state state.StateStore, lock lock.Locker, checkpointLock lock.Locker) { t.Helper() tmpDir := t.TempDir() diff --git a/delta.go b/delta.go index b7217661..02680204 100644 --- a/delta.go +++ b/delta.go @@ -93,25 +93,25 @@ func (table *DeltaTable) CreateTransaction(options *DeltaTransactionOptions) *De } // / Return the uri of commit version. -func CommitUriFromVersion(version int64) *storage.Path { +func CommitUriFromVersion(version int64) storage.Path { str := fmt.Sprintf("%020d.json", version) path := storage.PathFromIter([]string{"_delta_log", str}) - return &path + return path } // / The base path of commit uri's -func BaseCommitUri() *storage.Path { +func BaseCommitUri() storage.Path { return storage.NewPath("_delta_log/") } // / Return true if URI is a valid commit filename (not a checkpoint file, and not a temp commit) -func IsValidCommitUri(path *storage.Path) bool { +func IsValidCommitUri(path storage.Path) bool { match := commitFileRegex.MatchString(path.Base()) return match } // / Return true plus the version if the URI is a valid commit filename -func CommitVersionFromUri(path *storage.Path) (bool, int64) { +func CommitVersionFromUri(path storage.Path) (bool, int64) { groups := commitFileRegex.FindStringSubmatch(path.Base()) if len(groups) == 2 { version, err := strconv.ParseInt(groups[1], 10, 64) @@ -123,7 +123,7 @@ func CommitVersionFromUri(path *storage.Path) (bool, int64) { } // / Return true plus the version if the URI is a valid commit or checkpoint filename -func CommitOrCheckpointVersionFromUri(path *storage.Path) (bool, int64) { +func CommitOrCheckpointVersionFromUri(path storage.Path) (bool, int64) { groups := commitOrCheckpointRegex.FindStringSubmatch(path.Base()) if len(groups) == 5 { version, err := strconv.ParseInt(groups[1], 10, 64) @@ -206,7 +206,7 @@ func (table *DeltaTable) Exists() (bool, error) { } for _, result := range results.Objects { // Check each result to see if it is a version file - isValidCommitUri := IsValidCommitUri(&result.Location) + isValidCommitUri := IsValidCommitUri(result.Location) if isValidCommitUri { return true, nil @@ -377,7 +377,7 @@ func (table *DeltaTable) findLatestCheckpointsForVersion(version *int64) (checkp // Finally, if list results are ordered, check if this is a regular commit and the version is greater // than the max version if listResultsAreOrdered && version != nil { - isCommit, checkpointVersion := CommitVersionFromUri(&meta.Location) + isCommit, checkpointVersion := CommitVersionFromUri(meta.Location) if isCommit && checkpointVersion > *version { break } @@ -545,7 +545,7 @@ func validateCheckpointAndCleanup(table *DeltaTable, store storage.ObjectStore, } // / Read a commit log and return the actions inside it -func ReadCommitLog(store storage.ObjectStore, location *storage.Path) ([]Action, error) { +func ReadCommitLog(store storage.ObjectStore, location storage.Path) ([]Action, error) { commitData, err := store.Get(location) if err != nil { return nil, err @@ -738,7 +738,7 @@ func (transaction *DeltaTransaction) PrepareCommit(operation DeltaOperation, app path := storage.Path{Raw: filepath.Join("_delta_log", fileName)} commit := PreparedCommit{URI: path} - err = transaction.DeltaTable.Store.Put(&path, logEntry) + err = transaction.DeltaTable.Store.Put(path, logEntry) if err != nil { return commit, err } diff --git a/delta_test.go b/delta_test.go index 7a50218b..60e8989f 100644 --- a/delta_test.go +++ b/delta_test.go @@ -39,8 +39,8 @@ import ( ) func TestDeltaTransactionPrepareCommit(t *testing.T) { - store := filestore.FileObjectStore{BaseURI: &storage.Path{Raw: "tmp/"}} - l := filelock.New(nil, "tmp/_delta_log/_commit.lock", filelock.Options{}) + store := filestore.FileObjectStore{BaseURI: storage.Path{Raw: "tmp/"}} + l := filelock.New(storage.NewPath(""), "tmp/_delta_log/_commit.lock", filelock.Options{}) deltaTable := DeltaTable{Store: &store, LockClient: l} options := DeltaTransactionOptions{MaxRetryCommitAttempts: 3} os.MkdirAll("tmp/_delta_log/", 0700) diff --git a/internal/s3mock/s3mock.go b/internal/s3mock/s3mock.go index 7046ee4c..9ddad671 100644 --- a/internal/s3mock/s3mock.go +++ b/internal/s3mock/s3mock.go @@ -38,7 +38,7 @@ type S3MockClient struct { // newS3MockClient creates a mock S3 client that uses a filestore in a temporary directory to // store, retrieve, and manipulate files -func NewS3MockClient(t *testing.T, baseURI *storage.Path) (*S3MockClient, error) { +func NewS3MockClient(t *testing.T, baseURI storage.Path) (*S3MockClient, error) { tmpDir := t.TempDir() tmpPath := storage.NewPath(tmpDir) fileStore := filestore.FileObjectStore{BaseURI: tmpPath} @@ -58,10 +58,10 @@ func NewS3MockClient(t *testing.T, baseURI *storage.Path) (*S3MockClient, error) } // getFilePathFromS3Input generates the local file path from the S3 bucket and key -func getFilePathFromS3Input(bucket string, key string) (*storage.Path, error) { +func getFilePathFromS3Input(bucket string, key string) (storage.Path, error) { filePath, err := url.JoinPath(bucket, key) if err != nil { - return nil, err + return storage.NewPath(""), err } return storage.NewPath(filePath), nil } @@ -199,17 +199,17 @@ func (m *S3MockClient) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV } // getFilePath returns the path of the location on the baseURI, ignoring the URI scheme -func getFilePath(baseURI *storage.Path, location *storage.Path) (*storage.Path, error) { +func getFilePath(baseURI storage.Path, location storage.Path) (storage.Path, error) { baseURL, err := baseURI.ParseURL() if err != nil { - return nil, err + return storage.NewPath(""), err } path, err := url.JoinPath(baseURL.Host, baseURL.Path, location.Raw) return storage.NewPath(path), err } // getFile returns a file from the underlying filestore, for use in unit tests -func (m *S3MockClient) GetFile(baseURI *storage.Path, location *storage.Path) ([]byte, error) { +func (m *S3MockClient) GetFile(baseURI storage.Path, location storage.Path) ([]byte, error) { filePath, err := getFilePath(baseURI, location) if err != nil { return nil, err @@ -218,7 +218,7 @@ func (m *S3MockClient) GetFile(baseURI *storage.Path, location *storage.Path) ([ } // putFile writes data to a file in the underlying filestore for use in unit tests -func (m *S3MockClient) PutFile(baseURI *storage.Path, location *storage.Path, data []byte) error { +func (m *S3MockClient) PutFile(baseURI storage.Path, location storage.Path, data []byte) error { filePath, err := getFilePath(baseURI, location) if err != nil { return err @@ -227,7 +227,7 @@ func (m *S3MockClient) PutFile(baseURI *storage.Path, location *storage.Path, da } // fileExists checks if a file exists in the underlying filestore for use in unit tests -func (m *S3MockClient) FileExists(baseURI *storage.Path, location *storage.Path) (bool, error) { +func (m *S3MockClient) FileExists(baseURI storage.Path, location storage.Path) (bool, error) { filePath, err := getFilePath(baseURI, location) if err != nil { return false, err diff --git a/lock/filelock/filelock.go b/lock/filelock/filelock.go index d4f94afe..d92daa91 100644 --- a/lock/filelock/filelock.go +++ b/lock/filelock/filelock.go @@ -24,7 +24,7 @@ import ( ) type FileLock struct { - baseURI *storage.Path + baseURI storage.Path key string lock *flock.Flock opts Options @@ -55,7 +55,7 @@ func (opts *Options) setOptionsDefaults() { } // Creates a new FileLock instance -func New(baseURI *storage.Path, key string, opts Options) *FileLock { +func New(baseURI storage.Path, key string, opts Options) *FileLock { opts.setOptionsDefaults() l := new(FileLock) diff --git a/logstore/dynamodblogstore/dynamodblogstore.go b/logstore/dynamodblogstore/dynamodblogstore.go index d0cd655f..5a2b4fe0 100644 --- a/logstore/dynamodblogstore/dynamodblogstore.go +++ b/logstore/dynamodblogstore/dynamodblogstore.go @@ -221,7 +221,7 @@ func (ls *DynamoDBLogStore) Put(entry *logstore.CommitEntry, overwrite bool) err } // Gets an entry corresponding to the Delta log file with given `tablePath` and `fileName` from a DynamoDBLogStore instance -func (ls *DynamoDBLogStore) Get(tablePath *storage.Path, fileName *storage.Path) (*logstore.CommitEntry, error) { +func (ls *DynamoDBLogStore) Get(tablePath storage.Path, fileName storage.Path) (*logstore.CommitEntry, error) { attributes := map[string]types.AttributeValue{AttrTablePath: &types.AttributeValueMemberS{Value: tablePath.Raw}, AttrFileName: &types.AttributeValueMemberS{Value: fileName.Raw}} gii := dynamodb.GetItemInput{Key: attributes, TableName: aws.String(ls.tableName), ConsistentRead: aws.Bool(true)} @@ -241,7 +241,7 @@ func (ls *DynamoDBLogStore) Get(tablePath *storage.Path, fileName *storage.Path) } // Gets the latest entry corresponding to the Delta log file for given `tablePath` from a DynamoDBLogStore instance -func (ls *DynamoDBLogStore) GetLatest(tablePath *storage.Path) (*logstore.CommitEntry, error) { +func (ls *DynamoDBLogStore) GetLatest(tablePath storage.Path) (*logstore.CommitEntry, error) { qi := dynamodb.QueryInput{TableName: &ls.tableName, ConsistentRead: aws.Bool(true), ScanIndexForward: aws.Bool(false), Limit: aws.Int32(1), ExpressionAttributeValues: map[string]types.AttributeValue{ ":partitionKey": &types.AttributeValueMemberS{Value: tablePath.Raw}, }, KeyConditionExpression: aws.String(fmt.Sprintf("%s = :partitionKey", AttrTablePath))} @@ -277,9 +277,9 @@ func (ls *DynamoDBLogStore) dbResultToCommitEntry(item map[string]types.Attribut } return logstore.NewCommitEntry( - *storage.NewPath(item[AttrTablePath].(*types.AttributeValueMemberS).Value), - *storage.NewPath(item[AttrFileName].(*types.AttributeValueMemberS).Value), - *storage.NewPath(item[AttrTempPath].(*types.AttributeValueMemberS).Value), + storage.NewPath(item[AttrTablePath].(*types.AttributeValueMemberS).Value), + storage.NewPath(item[AttrFileName].(*types.AttributeValueMemberS).Value), + storage.NewPath(item[AttrTempPath].(*types.AttributeValueMemberS).Value), item[AttrComplete].(*types.AttributeValueMemberS).Value == "true", expireTimeAttr, ) diff --git a/logstore/dynamodblogstore/dynamodblogstore_test.go b/logstore/dynamodblogstore/dynamodblogstore_test.go index be55875c..515f337f 100644 --- a/logstore/dynamodblogstore/dynamodblogstore_test.go +++ b/logstore/dynamodblogstore/dynamodblogstore_test.go @@ -27,7 +27,7 @@ func TestGet(t *testing.T) { t.Error("failed to create DynamoDB log store") } - ece, err := logstore.NewCommitEntry(*storage.NewPath("usr/local/"), *storage.NewPath("01.json"), *storage.NewPath("01.tmp"), false, uint64(0)) + ece, err := logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) if err != nil { t.Error("failed to create commit entry") } @@ -36,7 +36,7 @@ func TestGet(t *testing.T) { t.Error("failed to put commit entry") } - ece, err = logstore.NewCommitEntry(*storage.NewPath("usr/local/"), *storage.NewPath("01.json"), *storage.NewPath("01.tmp"), false, uint64(0)) + ece, err = logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) if err != nil { t.Error("failed to create commit entry") } @@ -68,7 +68,7 @@ func TestGetLatest(t *testing.T) { t.Error("failed to create DynamoDB log store") } - eceFirst, err := logstore.NewCommitEntry(*storage.NewPath("usr/local/"), *storage.NewPath("01.json"), *storage.NewPath("01.tmp"), false, uint64(0)) + eceFirst, err := logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) if err != nil { t.Error("failed to create commit entry") } @@ -77,7 +77,7 @@ func TestGetLatest(t *testing.T) { t.Error("failed to put commit entry") } - eceSecond, err := logstore.NewCommitEntry(*storage.NewPath("usr/local/"), *storage.NewPath("02.json"), *storage.NewPath("02.tmp"), false, uint64(0)) + eceSecond, err := logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("02.json"), storage.NewPath("02.tmp"), false, uint64(0)) if err != nil { t.Error("failed to create commit entry") } @@ -107,7 +107,7 @@ func TestPutOverwrite(t *testing.T) { t.Error("failed to create DynamoDB log store") } - ece, err := logstore.NewCommitEntry(*storage.NewPath("usr/local/"), *storage.NewPath("01.json"), *storage.NewPath("01.tmp"), false, uint64(0)) + ece, err := logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) if err != nil { t.Error("failed to create commit entry") } @@ -116,7 +116,7 @@ func TestPutOverwrite(t *testing.T) { t.Error("failed to put commit entry") } - ece, err = logstore.NewCommitEntry(*storage.NewPath("usr/local/"), *storage.NewPath("01.json"), *storage.NewPath("01.tmp"), true, uint64(0)) + ece, err = logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), true, uint64(0)) if err != nil { t.Error("failed to create commit entry") } diff --git a/logstore/logstore.go b/logstore/logstore.go index ddfb4a1f..847ce3e9 100644 --- a/logstore/logstore.go +++ b/logstore/logstore.go @@ -19,8 +19,8 @@ type LogStore interface { Put(entry *CommitEntry, overwrite bool) error // Gets an entry corresponding to the Delta log file with given `tablePath` and `fileName` - Get(tablePath *storage.Path, fileName *storage.Path) (*CommitEntry, error) + Get(tablePath storage.Path, fileName storage.Path) (*CommitEntry, error) // Gets the latest entry corresponding to the Delta log file for given `tablePath` - GetLatest(tablePath *storage.Path) (*CommitEntry, error) + GetLatest(tablePath storage.Path) (*CommitEntry, error) } diff --git a/state/filestate/filestate.go b/state/filestate/filestate.go index 64b7d8c8..ed02c8be 100644 --- a/state/filestate/filestate.go +++ b/state/filestate/filestate.go @@ -23,14 +23,14 @@ import ( ) type FileStateStore struct { - BaseURI *storage.Path + BaseURI storage.Path Key string } // Compile time check that FileStateStore implements state.StateStore var _ state.StateStore = (*FileStateStore)(nil) -func New(baseURI *storage.Path, key string) *FileStateStore { +func New(baseURI storage.Path, key string) *FileStateStore { fs := new(FileStateStore) fs.BaseURI = baseURI fs.Key = key diff --git a/storage/filestore/filestore.go b/storage/filestore/filestore.go index 11bc99f4..854e8ed5 100644 --- a/storage/filestore/filestore.go +++ b/storage/filestore/filestore.go @@ -25,19 +25,17 @@ import ( // FileObjectStore provides local file storage type FileObjectStore struct { - BaseURI *storage.Path + BaseURI storage.Path } // Compile time check that FileObjectStore implements storage.ObjectStore var _ storage.ObjectStore = (*FileObjectStore)(nil) -func New(baseURI *storage.Path) *FileObjectStore { - fs := new(FileObjectStore) - fs.BaseURI = baseURI - return fs +func New(baseURI storage.Path) FileObjectStore { + return FileObjectStore{BaseURI: baseURI} } -func (s *FileObjectStore) Put(location *storage.Path, bytes []byte) error { +func (s FileObjectStore) Put(location storage.Path, bytes []byte) error { writePath := filepath.Join(s.BaseURI.Raw, location.Raw) err := os.MkdirAll(filepath.Dir(writePath), 0700) if err != nil { @@ -50,7 +48,7 @@ func (s *FileObjectStore) Put(location *storage.Path, bytes []byte) error { return nil } -func (s *FileObjectStore) RenameIfNotExists(from *storage.Path, to *storage.Path) error { +func (s FileObjectStore) RenameIfNotExists(from storage.Path, to storage.Path) error { // return ErrorObjectAlreadyExists if the destination file exists _, err := s.Head(to) if !errors.Is(err, storage.ErrorObjectDoesNotExist) { @@ -60,7 +58,7 @@ func (s *FileObjectStore) RenameIfNotExists(from *storage.Path, to *storage.Path return s.Rename(from, to) } -func (s *FileObjectStore) Get(location *storage.Path) ([]byte, error) { +func (s FileObjectStore) Get(location storage.Path) ([]byte, error) { filePath := filepath.Join(s.BaseURI.Raw, location.Raw) data, err := os.ReadFile(filePath) if os.IsNotExist(err) { @@ -72,7 +70,7 @@ func (s *FileObjectStore) Get(location *storage.Path) ([]byte, error) { return data, nil } -func (s *FileObjectStore) Head(location *storage.Path) (storage.ObjectMeta, error) { +func (s FileObjectStore) Head(location storage.Path) (storage.ObjectMeta, error) { filePath := filepath.Join(s.BaseURI.Raw, location.Raw) var meta storage.ObjectMeta info, err := os.Stat(filePath) @@ -93,7 +91,7 @@ func (s *FileObjectStore) Head(location *storage.Path) (storage.ObjectMeta, erro return meta, nil } -func (s *FileObjectStore) Rename(from *storage.Path, to *storage.Path) error { +func (s FileObjectStore) Rename(from storage.Path, to storage.Path) error { // rename source to destination f := s.BaseURI.Join(from) t := s.BaseURI.Join(to) @@ -104,7 +102,7 @@ func (s *FileObjectStore) Rename(from *storage.Path, to *storage.Path) error { return nil } -func (s *FileObjectStore) Delete(location *storage.Path) error { +func (s FileObjectStore) Delete(location storage.Path) error { filePath := filepath.Join(s.BaseURI.Raw, location.Raw) err := os.Remove(filePath) if err != nil { @@ -128,7 +126,7 @@ func objectMetaFromFileInfo(info fs.FileInfo, name string, isDir bool, parentDir } else { meta.Size = info.Size() } - meta.Location = *storage.NewPath(location) + meta.Location = storage.NewPath(location) return meta } @@ -175,7 +173,7 @@ func listFilesInDirRecursively(baseURI string, dir string, prefix string) ([]sto return out, nil } -func (s *FileObjectStore) ListAll(prefix *storage.Path) (storage.ListResult, error) { +func (s FileObjectStore) ListAll(prefix storage.Path) (storage.ListResult, error) { var listResult storage.ListResult dir, filePrefix := filepath.Split(prefix.Raw) @@ -217,10 +215,10 @@ func (s *FileObjectStore) ListAll(prefix *storage.Path) (storage.ListResult, err return listResult, nil } -func (s *FileObjectStore) List(prefix *storage.Path, previousResult *storage.ListResult) (storage.ListResult, error) { +func (s FileObjectStore) List(prefix storage.Path, previousResult *storage.ListResult) (storage.ListResult, error) { return s.ListAll(prefix) } -func (s *FileObjectStore) IsListOrdered() bool { +func (s FileObjectStore) IsListOrdered() bool { return true } diff --git a/storage/filestore/filestore_test.go b/storage/filestore/filestore_test.go index fc19ad05..41caae47 100644 --- a/storage/filestore/filestore_test.go +++ b/storage/filestore/filestore_test.go @@ -201,7 +201,7 @@ func TestList(t *testing.T) { } type args struct { - prefix *storage.Path + prefix storage.Path } tests := []struct { name string diff --git a/storage/s3store/s3store.go b/storage/s3store/s3store.go index 44a392f8..febd93c2 100644 --- a/storage/s3store/s3store.go +++ b/storage/s3store/s3store.go @@ -41,7 +41,7 @@ type S3ClientAPI interface { type S3ObjectStore struct { // Source object key Client S3ClientAPI - BaseURI *storage.Path + BaseURI storage.Path baseURL *url.URL bucket string path string @@ -52,7 +52,7 @@ type S3ObjectStore struct { // Compile time check that S3ObjectStore implements storage.ObjectStore var _ storage.ObjectStore = (*S3ObjectStore)(nil) -func New(client S3ClientAPI, baseURI *storage.Path) (*S3ObjectStore, error) { +func New(client S3ClientAPI, baseURI storage.Path) (*S3ObjectStore, error) { store := new(S3ObjectStore) store.Client = client store.BaseURI = baseURI @@ -70,7 +70,7 @@ func New(client S3ClientAPI, baseURI *storage.Path) (*S3ObjectStore, error) { return store, nil } -func (s *S3ObjectStore) Put(location *storage.Path, data []byte) error { +func (s *S3ObjectStore) Put(location storage.Path, data []byte) error { key, err := url.JoinPath(s.path, location.Raw) if err != nil { return errors.Join(storage.ErrorURLJoinPath, err) @@ -88,7 +88,7 @@ func (s *S3ObjectStore) Put(location *storage.Path, data []byte) error { } -func (s *S3ObjectStore) Get(location *storage.Path) ([]byte, error) { +func (s *S3ObjectStore) Get(location storage.Path) ([]byte, error) { key, err := url.JoinPath(s.path, location.Raw) if err != nil { return nil, errors.Join(storage.ErrorURLJoinPath, err) @@ -114,7 +114,7 @@ func (s *S3ObjectStore) Get(location *storage.Path) ([]byte, error) { return bodyBytes, nil } -func (s *S3ObjectStore) Delete(location *storage.Path) error { +func (s *S3ObjectStore) Delete(location storage.Path) error { key, err := url.JoinPath(s.path, location.Raw) if err != nil { return errors.Join(storage.ErrorURLJoinPath, err) @@ -130,7 +130,7 @@ func (s *S3ObjectStore) Delete(location *storage.Path) error { return nil } -func (s *S3ObjectStore) RenameIfNotExists(from *storage.Path, to *storage.Path) error { +func (s *S3ObjectStore) RenameIfNotExists(from storage.Path, to storage.Path) error { // return ErrorObjectAlreadyExists if the destination file exists _, err := s.Head(to) if !errors.Is(err, storage.ErrorObjectDoesNotExist) { @@ -144,7 +144,7 @@ func (s *S3ObjectStore) RenameIfNotExists(from *storage.Path, to *storage.Path) return nil } -func (s *S3ObjectStore) Rename(from *storage.Path, to *storage.Path) error { +func (s *S3ObjectStore) Rename(from storage.Path, to storage.Path) error { srcKey, err := url.JoinPath(s.path, from.Raw) if err != nil { return errors.Join(storage.ErrorURLJoinPath, err) @@ -173,7 +173,7 @@ func (s *S3ObjectStore) Rename(from *storage.Path, to *storage.Path) error { return nil } -func (s *S3ObjectStore) Head(location *storage.Path) (storage.ObjectMeta, error) { +func (s *S3ObjectStore) Head(location storage.Path) (storage.ObjectMeta, error) { var m storage.ObjectMeta key, err := url.JoinPath(s.path, location.Raw) if err != nil { @@ -193,14 +193,14 @@ func (s *S3ObjectStore) Head(location *storage.Path) (storage.ObjectMeta, error) return m, errors.Join(storage.ErrorHeadObject, err) } - m.Location = *location + m.Location = location m.LastModified = *result.LastModified m.Size = result.ContentLength return m, nil } -func getListInputAndTrimPrefix(s *S3ObjectStore, prefix *storage.Path, previousResult *storage.ListResult) (s3.ListObjectsV2Input, string, error) { +func getListInputAndTrimPrefix(s *S3ObjectStore, prefix storage.Path, previousResult *storage.ListResult) (s3.ListObjectsV2Input, string, error) { // We will need the store path with the trailing / for trimming results pathWithSeparators := s.path if !strings.HasSuffix(pathWithSeparators, "/") { @@ -236,7 +236,7 @@ func getListInputAndTrimPrefix(s *S3ObjectStore, prefix *storage.Path, previousR return listInput, pathWithSeparators, nil } -func (s *S3ObjectStore) List(prefix *storage.Path, previousResult *storage.ListResult) (storage.ListResult, error) { +func (s *S3ObjectStore) List(prefix storage.Path, previousResult *storage.ListResult) (storage.ListResult, error) { listInput, resultsTrimPrefix, err := getListInputAndTrimPrefix(s, prefix, previousResult) if err != nil { return storage.ListResult{}, err @@ -251,7 +251,7 @@ func (s *S3ObjectStore) List(prefix *storage.Path, previousResult *storage.ListR for _, result := range results.Contents { location := strings.TrimPrefix(*result.Key, resultsTrimPrefix) listResult.Objects = append(listResult.Objects, storage.ObjectMeta{ - Location: *storage.NewPath(location), + Location: storage.NewPath(location), LastModified: *result.LastModified, Size: result.Size, }) @@ -262,7 +262,7 @@ func (s *S3ObjectStore) List(prefix *storage.Path, previousResult *storage.ListR return listResult, nil } -func (s *S3ObjectStore) ListAll(prefix *storage.Path) (storage.ListResult, error) { +func (s *S3ObjectStore) ListAll(prefix storage.Path) (storage.ListResult, error) { var listResult storage.ListResult listInput, resultsTrimPrefix, err := getListInputAndTrimPrefix(s, prefix, nil) if err != nil { @@ -279,7 +279,7 @@ func (s *S3ObjectStore) ListAll(prefix *storage.Path) (storage.ListResult, error for _, result := range page.Contents { location := strings.TrimPrefix(*result.Key, resultsTrimPrefix) listResult.Objects = append(listResult.Objects, storage.ObjectMeta{ - Location: *storage.NewPath(location), + Location: storage.NewPath(location), LastModified: *result.LastModified, Size: result.Size, }) diff --git a/storage/s3store/s3store_test.go b/storage/s3store/s3store_test.go index 75d963e1..c605fe11 100644 --- a/storage/s3store/s3store_test.go +++ b/storage/s3store/s3store_test.go @@ -27,7 +27,7 @@ import ( ) // Test helper: setupTest does common setup for our tests, creating a mock S3 client and an S3ObjectStore -func setupTest(t *testing.T) (baseURI *storage.Path, mockClient *s3mock.S3MockClient, s3Store *S3ObjectStore) { +func setupTest(t *testing.T) (baseURI storage.Path, mockClient *s3mock.S3MockClient, s3Store *S3ObjectStore) { t.Helper() baseURI = storage.NewPath("s3://test-bucket/test-delta-table") mockClient, err := s3mock.NewS3MockClient(t, baseURI) @@ -42,7 +42,7 @@ func setupTest(t *testing.T) (baseURI *storage.Path, mockClient *s3mock.S3MockCl } // Test helper: verify the file exists and has the expected contents -func verifyFileContents(t *testing.T, baseURI *storage.Path, path *storage.Path, mockClient *s3mock.S3MockClient, data []byte, errorMessage string) { +func verifyFileContents(t *testing.T, baseURI storage.Path, path storage.Path, mockClient *s3mock.S3MockClient, data []byte, errorMessage string) { t.Helper() results, err := mockClient.GetFile(baseURI, path) if err != nil { @@ -54,7 +54,7 @@ func verifyFileContents(t *testing.T, baseURI *storage.Path, path *storage.Path, } // Test helper: verify the file does not exist -func verifyFileDoesNotExist(t *testing.T, baseURI *storage.Path, path *storage.Path, mockClient *s3mock.S3MockClient, errorMessage string) { +func verifyFileDoesNotExist(t *testing.T, baseURI storage.Path, path storage.Path, mockClient *s3mock.S3MockClient, errorMessage string) { t.Helper() fileExists, err := mockClient.FileExists(baseURI, path) if fileExists { @@ -443,7 +443,7 @@ func TestList(t *testing.T) { } type args struct { - prefix *storage.Path + prefix storage.Path } tests := []struct { name string diff --git a/storage/storage.go b/storage/storage.go index d1f21253..a311adeb 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -42,27 +42,27 @@ type Path struct { Raw string } -func NewPath(raw string) *Path { +func NewPath(raw string) Path { p := new(Path) p.Raw = raw - return p + return *p } -func (p *Path) CommitPathForVersion(version int64) string { +func (p Path) CommitPathForVersion(version int64) string { s := fmt.Sprintf("%020d.json", version) return filepath.Join(p.Raw, s) } // Calls url.Parse on Path.Raw -func (p *Path) ParseURL() (*url.URL, error) { +func (p Path) ParseURL() (*url.URL, error) { return url.Parse(p.Raw) } -func (p *Path) Base() string { +func (p Path) Base() string { return filepath.Base(p.Raw) } -func (p *Path) Ext() string { +func (p Path) Ext() string { return filepath.Ext(p.Raw) } @@ -71,7 +71,7 @@ func PathFromIter(elem []string) Path { return Path{Raw: s} } -func (p *Path) Join(path *Path) Path { +func (p Path) Join(path Path) Path { return Path{Raw: filepath.Join(p.Raw, path.Raw)} } @@ -136,7 +136,7 @@ func (ld *LockData) Json() []byte { // ObjectStore Universal API to multiple object store services. type ObjectStore interface { /// Save the provided bytes to the specified location. - Put(location *Path, bytes []byte) error + Put(location Path, bytes []byte) error // /// Get a multi-part upload that allows writing data in chunks // /// @@ -148,29 +148,29 @@ type ObjectStore interface { // /// For some object stores (S3, GCS, and local in particular), if the // /// writer fails or panics, you must call [ObjectStore::abort_multipart] // /// to clean up partially written data. - // PutMultipart(location *Path) error + // PutMultipart(location Path) error // /// Cleanup an aborted upload. // /// // /// See documentation for individual stores for exact behavior, as capabilities // /// vary by object store. - // AbortMultipart(location *Path, multipart_id *MultipartId) error + // AbortMultipart(location Path, multipart_id *MultipartId) error /// Return the bytes that are stored at the specified location. - Get(location *Path) ([]byte, error) + Get(location Path) ([]byte, error) // /// Return the bytes that are stored at the specified location // /// in the given byte range - // GetRange(location *Path, r Range) error + // GetRange(location Path, r Range) error // /// Return the bytes that are stored at the specified location // /// in the given byte ranges - // GetRanges(location *Path, ranges []Range) ([]byte, error) + // GetRanges(location Path, ranges []Range) ([]byte, error) /// Return the metadata for the specified location - Head(location *Path) (ObjectMeta, error) + Head(location Path) (ObjectMeta, error) // /// Delete the object at the specified location. - Delete(location *Path) error + Delete(location Path) error /// List the objects with the given prefix. This may be limited to a certain number of objects (e.g. 1000) /// based on the underlying object storage's limitations. @@ -178,14 +178,14 @@ type ObjectStore interface { /// /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of /// `foo/bar_baz/x`. - List(prefix *Path, previousResult *ListResult) (ListResult, error) + List(prefix Path, previousResult *ListResult) (ListResult, error) /// List all objects with the given prefix. If the underlying object storage returns a limited number of objects, /// this will perform paging as required to return all results /// /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of /// `foo/bar_baz/x`. - ListAll(prefix *Path) (ListResult, error) + ListAll(prefix Path) (ListResult, error) /// Returns true if this store returns list results sorted IsListOrdered() bool @@ -194,7 +194,7 @@ type ObjectStore interface { // /// // /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of // /// `foo/bar_baz/x`. - // List(prefix *Path) (bufio.Scanner, ObjectMeta) + // List(prefix Path) (bufio.Scanner, ObjectMeta) // /// List objects with the given prefix and an implementation specific // /// delimiter. Returns common prefixes (directories) in addition to object @@ -202,12 +202,12 @@ type ObjectStore interface { // /// // /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of // /// `foo/bar_baz/x`. - // ListWithDelimiter(prefix *Path) ListResult + // ListWithDelimiter(prefix Path) ListResult // /// Copy an object from one path to another in the same object store. // /// // /// If there exists an object at the destination, it will be overwritten. - // Copy(from *Path, to *Path) error + // Copy(from Path, to Path) error // /// Move an object from one path to another in the same object store. // /// @@ -215,7 +215,7 @@ type ObjectStore interface { // /// check when deleting source that it was the same object that was originally copied. // /// /// If there exists an object at the destination, it will be overwritten. - Rename(from *Path, to *Path) error + Rename(from Path, to Path) error // /// Copy an object from one path to another, only if destination is empty. // /// @@ -224,23 +224,23 @@ type ObjectStore interface { // /// Performs an atomic operation if the underlying object storage supports it. // /// If atomic operations are not supported by the underlying object storage (like S3) // /// it will return an error. - // CopyIfNotExists(from *Path, to *Path) error + // CopyIfNotExists(from Path, to Path) error // Move an object from one path to another in the same object store. // Will return an error if the destination already has an object. - RenameIfNotExists(from *Path, to *Path) error + RenameIfNotExists(from Path, to Path) error } // / Wrapper around List that will perform paging if required type ListIterator struct { store ObjectStore - prefix *Path + prefix Path listResult *ListResult nextIndex int } -func NewListIterator(prefix *Path, store ObjectStore) *ListIterator { +func NewListIterator(prefix Path, store ObjectStore) *ListIterator { iterator := new(ListIterator) iterator.listResult = nil iterator.prefix = prefix diff --git a/tablestate.go b/tablestate.go index 3d79aea5..5318800c 100644 --- a/tablestate.go +++ b/tablestate.go @@ -208,7 +208,7 @@ func stateFromCheckpoint(table *DeltaTable, checkpoint *CheckPoint) (*DeltaTable newState := NewDeltaTableState(checkpoint.Version) checkpointDataPaths := table.GetCheckpointDataPaths(checkpoint) for _, location := range checkpointDataPaths { - checkpointBytes, err := table.Store.Get(&location) + checkpointBytes, err := table.Store.Get(location) if err != nil { return nil, err } From 0f43dc1eb76438efab03f9d9e605e6788fcb06cd Mon Sep 17 00:00:00 2001 From: Rahul Madnawat Date: Mon, 11 Sep 2023 15:17:07 -0700 Subject: [PATCH 5/9] Remove superfluous empty lines --- README.md | 2 -- action.go | 2 -- action_test.go | 9 --------- checkpoint_test.go | 2 -- delta.go | 1 - delta_test.go | 3 --- schema.go | 3 --- schema_test.go | 2 -- state/filestate/filestate_test.go | 1 - state/redisstate/rediststate.go | 1 - state/redisstate/rediststate_test.go | 1 - storage/filestore/filestore_test.go | 4 ---- storage/storage_test.go | 13 ------------- 13 files changed, 44 deletions(-) delete mode 100644 storage/storage_test.go diff --git a/README.md b/README.md index cb7c4f93..24243ac6 100644 --- a/README.md +++ b/README.md @@ -136,7 +136,6 @@ type testData struct { } func (data *testData) getSchema() delta.SchemaTypeStruct { - // schema := GetSchema(data) schema := delta.SchemaTypeStruct{ Fields: []delta.SchemaField{ @@ -198,7 +197,6 @@ type payload struct { } func writeParquet[T any](data []T, filename string) (*payload, error) { - p := new(payload) // if err := parquet.WriteFile(filename, data); err != nil { diff --git a/action.go b/action.go index af652ba5..7e0c4142 100644 --- a/action.go +++ b/action.go @@ -485,7 +485,6 @@ func StatsFromJson(b []byte) (*Stats, error) { // the struct property is passed in as a pointer to ensure that it can be evaluated as nil[NULL] // TODO Handle struct types func UpdateStats[T constraints.Ordered](s *Stats, k string, vpt *T) { - var v T if s.NullCount == nil { s.NullCount = make(map[string]int64) @@ -524,5 +523,4 @@ func UpdateStats[T constraints.Ordered](s *Stats, k string, vpt *T) { s.MaxValues[k] = v } - } diff --git a/action_test.go b/action_test.go index a1d27547..96ffbfc3 100644 --- a/action_test.go +++ b/action_test.go @@ -75,7 +75,6 @@ func TestLogEntryFromActions(t *testing.T) { } func TestLogEntryFromAction(t *testing.T) { - commit := make(CommitInfo) commit["path"] = "part-1.snappy.parquet" commit["size"] = 1 @@ -91,7 +90,6 @@ func TestLogEntryFromAction(t *testing.T) { // Test from example at https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata func TestLogEntryFromActionChangeMetaData(t *testing.T) { - expectedStr := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(` { "metaData":{ @@ -140,7 +138,6 @@ func TestLogEntryFromActionChangeMetaData(t *testing.T) { // TestUpdateStats tests gathering stats over a data set that includes pointers func TestUpdateStats(t *testing.T) { - type rowType struct { Id int `parquet:"id,snappy"` Label string `parquet:"label,dict,snappy"` @@ -161,7 +158,6 @@ func TestUpdateStats(t *testing.T) { stats := Stats{} for _, row := range data { - stats.NumRecords++ UpdateStats(&stats, "id", &row.Id) UpdateStats(&stats, "label", &row.Label) @@ -174,7 +170,6 @@ func TestUpdateStats(t *testing.T) { if statsString != expectedStr { t.Errorf("has:\n%s\nwant:\n%s", statsString, expectedStr) } - } func TestStatsFromJSON(t *testing.T) { @@ -224,18 +219,15 @@ func TestStatsFromJSON(t *testing.T) { } func TestFormatDefault(t *testing.T) { - format := new(Format).Default() b, _ := json.Marshal(format) expectedStr := `{"provider":"parquet","options":{}}` if string(b) != expectedStr { t.Errorf("has:\n%s\nwant:\n%s", string(b), expectedStr) } - } func TestWriteOperationParameters(t *testing.T) { - write := Write{Mode: Append, PartitionBy: []string{"date"}} commit := write.GetCommitInfo() commit["timestamp"] = 1675020556534 @@ -253,7 +245,6 @@ func TestWriteOperationParameters(t *testing.T) { if !reflect.DeepEqual(expectedStr, string(logs)) { t.Errorf("expected %s, but got %s", expectedStr, string(logs)) } - } func TestWrite_GetCommitInfo(t *testing.T) { diff --git a/checkpoint_test.go b/checkpoint_test.go index bd07eeac..6de901d9 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -898,9 +898,7 @@ func TestCheckpointCleanupExpiredLogs(t *testing.T) { } for _, enableCleanupInTableConfig := range tests { - for _, disableCleanupInCheckpointConfig := range tests { - store, stateStore, lock, checkpointLock := setupCheckpointTest(t, "", false) table := NewDeltaTable(store, lock, stateStore) diff --git a/delta.go b/delta.go index 02680204..12e10d80 100644 --- a/delta.go +++ b/delta.go @@ -702,7 +702,6 @@ func (transaction *DeltaTransaction) Commit(operation DeltaOperation, appMetadat // / the transaction object could be dropped and the actual commit could be executed // / with `DeltaTable.try_commit_transaction`. func (transaction *DeltaTransaction) PrepareCommit(operation DeltaOperation, appMetadata map[string]any) (PreparedCommit, error) { - anyCommitInfo := false for _, action := range transaction.Actions { switch action.(type) { diff --git a/delta_test.go b/delta_test.go index 60e8989f..a8902652 100644 --- a/delta_test.go +++ b/delta_test.go @@ -234,7 +234,6 @@ func TestDeltaTableTryCommitTransaction(t *testing.T) { } func TestTryCommitWithExistingLock(t *testing.T) { - tmpDir := t.TempDir() fileLockKey := filepath.Join(tmpDir, "_delta_log/_commit.lock") os.MkdirAll(filepath.Dir(fileLockKey), 0700) @@ -790,7 +789,6 @@ func (t *testData) UnmarshalJSON(data []byte) error { } func (data *testData) getSchema() SchemaTypeStruct { - // schema := GetSchema(data) schema := SchemaTypeStruct{ Fields: []SchemaField{ @@ -852,7 +850,6 @@ type payload struct { } func writeParquet[T any](data []T, filename string) (*payload, error) { - p := new(payload) file, err := os.Create(filename) diff --git a/schema.go b/schema.go index 82ddb200..190f3127 100644 --- a/schema.go +++ b/schema.go @@ -41,7 +41,6 @@ type SchemaTypeStruct struct { // TODO this does not handle nested maps, arrays, or structs correctly // See TestMetadataGetSchema in action_test.go for example outputs for those cases func (s *SchemaTypeStruct) Json() []byte { - type constructorSchemaField struct { // will always be struct Type SchemaDataType `json:"type"` @@ -97,12 +96,10 @@ const ( // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#schema-serialization-format // i.e. Value int is not currently readable with spark.read.format("delta").load("...") func GetSchema(i any) SchemaTypeStruct { - t := reflect.TypeOf(i).Elem() numFields := t.NumField() var fields []SchemaField for index := 0; index < numFields; index++ { - structField := t.FieldByIndex([]int{index}) typeTag := structField.Tag.Get("type") diff --git a/schema_test.go b/schema_test.go index 829f1dbc..dbdfe3de 100644 --- a/schema_test.go +++ b/schema_test.go @@ -18,7 +18,6 @@ import ( ) func TestGetSchema(t *testing.T) { - type SubStruct struct { Data string `parquet:"data"` } @@ -73,7 +72,6 @@ func TestGetSchema(t *testing.T) { } func TestGetSchemaWithWeirdTypes(t *testing.T) { - type RowType struct { Bin []byte `parquet:"bin,snappy"` Timestamp int64 `parquet:"timestamp,snappy" type:"timestamp"` diff --git a/state/filestate/filestate_test.go b/state/filestate/filestate_test.go index f0e9e5d8..f7684625 100644 --- a/state/filestate/filestate_test.go +++ b/state/filestate/filestate_test.go @@ -21,7 +21,6 @@ import ( ) func TestGetPutData(t *testing.T) { - tmpDir := t.TempDir() // fileLockKey := filepath.Join(tmpDir, "_commit.state") path := storage.NewPath(tmpDir) diff --git a/state/redisstate/rediststate.go b/state/redisstate/rediststate.go index 059dee9e..6beacad2 100644 --- a/state/redisstate/rediststate.go +++ b/state/redisstate/rediststate.go @@ -58,7 +58,6 @@ func (s *RedisStateStore) Get() (state.CommitState, error) { } func (s *RedisStateStore) Put(commitState state.CommitState) error { - data, _ := json.Marshal(commitState) err := s.RedisClient.Set(s.ctx, s.Key, data, 0).Err() if err != nil { diff --git a/state/redisstate/rediststate_test.go b/state/redisstate/rediststate_test.go index 49860510..05eca24c 100644 --- a/state/redisstate/rediststate_test.go +++ b/state/redisstate/rediststate_test.go @@ -44,7 +44,6 @@ func TestMain(m *testing.M) { } func TestGetPutData(t *testing.T) { - var redisOpts = &redis.Options{ Network: "unix", Addr: servers[0].Socket(), diff --git a/storage/filestore/filestore_test.go b/storage/filestore/filestore_test.go index 41caae47..4957d537 100644 --- a/storage/filestore/filestore_test.go +++ b/storage/filestore/filestore_test.go @@ -24,7 +24,6 @@ import ( ) func TestPut(t *testing.T) { - tmpDir := t.TempDir() // fileLockKey := filepath.Join(tmpDir, "_delta_log/_commit.lock") @@ -52,7 +51,6 @@ func TestPut(t *testing.T) { } func TestHead(t *testing.T) { - tmpDir := t.TempDir() // fileLockKey := filepath.Join(tmpDir, "_delta_log/_commit.lock") @@ -91,7 +89,6 @@ func TestHead(t *testing.T) { } func TestRenameIfNotExists(t *testing.T) { - tmpDir := t.TempDir() tmpPath := storage.NewPath(tmpDir) @@ -127,7 +124,6 @@ func TestRenameIfNotExists(t *testing.T) { } func TestDelete(t *testing.T) { - tmpDir := t.TempDir() tmpPath := storage.NewPath(tmpDir) diff --git a/storage/storage_test.go b/storage/storage_test.go deleted file mode 100644 index 27ad4faf..00000000 --- a/storage/storage_test.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2023 Rivian Automotive, Inc. -// Licensed under the Apache License, Version 2.0 (the “License”); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an “AS IS” BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package storage From c1beb29aec5a548e660d81c9e4258ab21eb60d11 Mon Sep 17 00:00:00 2001 From: Rahul Madnawat Date: Mon, 11 Sep 2023 15:56:36 -0700 Subject: [PATCH 6/9] Modernize dynamostate package --- lock/dynamolock/dynamolock.go | 23 +++-- lock/filelock/filelock.go | 8 +- lock/redislock/redislock.go | 10 +-- logstore/dynamodblogstore/dynamodblogstore.go | 40 ++++----- state/dynamostate/dynamostate.go | 88 ++++++++++++------- state/dynamostate/dynamostate_test.go | 37 ++------ state/filestate/filestate.go | 4 +- state/localstate/localstate.go | 2 +- state/redisstate/rediststate.go | 4 +- state/state_test.go | 13 --- 10 files changed, 114 insertions(+), 115 deletions(-) delete mode 100644 state/state_test.go diff --git a/lock/dynamolock/dynamolock.go b/lock/dynamolock/dynamolock.go index 55c68cba..26d1e2ab 100644 --- a/lock/dynamolock/dynamolock.go +++ b/lock/dynamolock/dynamolock.go @@ -24,6 +24,15 @@ import ( "github.com/rivian/delta-go/lock" ) +const ( + KeyAttr string = "key" + DefaultTTL time.Duration = 60 * time.Second + DefaultHeartbeat time.Duration = 1 * time.Second + DefaultMaxRetryTableCreateAttempts uint16 = 20 + DefaultRCU int64 = 5 + DefaultWCU int64 = 5 +) + type DynamoLock struct { tableName string lockClient *dynamolock.Client @@ -47,12 +56,6 @@ type Options struct { WCU int64 } -const ( - DefaultTTL time.Duration = 60 * time.Second - DefaultHeartbeat time.Duration = 1 * time.Second - DefaultMaxRetryTableCreateAttempts uint16 = 20 -) - // Sets the default options func (opts *Options) setOptionsDefaults() { if opts.TTL == 0 { @@ -61,6 +64,12 @@ func (opts *Options) setOptionsDefaults() { if opts.MaxRetryTableCreateAttempts == 0 { opts.MaxRetryTableCreateAttempts = DefaultMaxRetryTableCreateAttempts } + if opts.RCU == 0 { + opts.RCU = DefaultRCU + } + if opts.WCU == 0 { + opts.WCU = DefaultWCU + } } // Creates a new DynamoLock instance @@ -79,7 +88,7 @@ func New(client dynamodbutils.DynamoDBClient, tableName string, key string, opts createTableInput := dynamodb.CreateTableInput{ KeySchema: []types.KeySchemaElement{ { - AttributeName: aws.String("key"), + AttributeName: aws.String(KeyAttr), KeyType: types.KeyTypeHash, }, }, diff --git a/lock/filelock/filelock.go b/lock/filelock/filelock.go index d92daa91..66afa546 100644 --- a/lock/filelock/filelock.go +++ b/lock/filelock/filelock.go @@ -23,6 +23,10 @@ import ( "github.com/rivian/delta-go/storage" ) +const ( + DefaultTTL time.Duration = 60 * time.Second +) + type FileLock struct { baseURI storage.Path key string @@ -43,10 +47,6 @@ type Options struct { DeleteOnRelease bool } -const ( - DefaultTTL time.Duration = 60 * time.Second -) - // Sets the default options func (opts *Options) setOptionsDefaults() { if opts.TTL == 0 { diff --git a/lock/redislock/redislock.go b/lock/redislock/redislock.go index bdcbcc62..e0df56f9 100644 --- a/lock/redislock/redislock.go +++ b/lock/redislock/redislock.go @@ -22,6 +22,11 @@ import ( "github.com/rivian/delta-go/lock" ) +const ( + DefaultTTL time.Duration = 60 * time.Second + DefaultMaxTries int = 20 +) + type RedisLock struct { key string redsyncInstance *redsync.Redsync @@ -39,11 +44,6 @@ type Options struct { // Compile time check that MutexWrapper implements lock.Locker var _ lock.Locker = (*RedisLock)(nil) -const ( - DefaultTTL time.Duration = 60 * time.Second - DefaultMaxTries int = 20 -) - // Sets the default options func (opts *Options) setOptionsDefaults() { if opts.TTL == 0 { diff --git a/logstore/dynamodblogstore/dynamodblogstore.go b/logstore/dynamodblogstore/dynamodblogstore.go index 5a2b4fe0..36ec9a94 100644 --- a/logstore/dynamodblogstore/dynamodblogstore.go +++ b/logstore/dynamodblogstore/dynamodblogstore.go @@ -35,11 +35,11 @@ var ( const ( // DynamoDB table attribute keys - AttrTablePath string = "tablePath" - AttrFileName string = "fileName" - AttrTempPath string = "tempPath" - AttrComplete string = "complete" - AttrExpireTime string = "expireTime" + TablePathAttr string = "tablePath" + FileNameAttr string = "fileName" + TempPathAttr string = "tempPath" + CompleteAttr string = "complete" + ExpireTimeAttr string = "expireTime" // The delay, in seconds, after a commit entry has been committed to the delta log at which // point it is safe to be deleted from the log store. @@ -176,21 +176,21 @@ func NewDynamoDBLogStore(lso DynamoDBLogStoreOptions) (*DynamoDBLogStore, error) createTableInput := dynamodb.CreateTableInput{ AttributeDefinitions: []types.AttributeDefinition{ { - AttributeName: aws.String(AttrTablePath), + AttributeName: aws.String(TablePathAttr), AttributeType: types.ScalarAttributeTypeS, }, { - AttributeName: aws.String(AttrFileName), + AttributeName: aws.String(FileNameAttr), AttributeType: types.ScalarAttributeTypeS, }, }, KeySchema: []types.KeySchemaElement{ { - AttributeName: aws.String(AttrTablePath), + AttributeName: aws.String(TablePathAttr), KeyType: types.KeyTypeHash, }, { - AttributeName: aws.String(AttrFileName), + AttributeName: aws.String(FileNameAttr), KeyType: types.KeyTypeRange, }, }, @@ -222,7 +222,7 @@ func (ls *DynamoDBLogStore) Put(entry *logstore.CommitEntry, overwrite bool) err // Gets an entry corresponding to the Delta log file with given `tablePath` and `fileName` from a DynamoDBLogStore instance func (ls *DynamoDBLogStore) Get(tablePath storage.Path, fileName storage.Path) (*logstore.CommitEntry, error) { - attributes := map[string]types.AttributeValue{AttrTablePath: &types.AttributeValueMemberS{Value: tablePath.Raw}, AttrFileName: &types.AttributeValueMemberS{Value: fileName.Raw}} + attributes := map[string]types.AttributeValue{TablePathAttr: &types.AttributeValueMemberS{Value: tablePath.Raw}, FileNameAttr: &types.AttributeValueMemberS{Value: fileName.Raw}} gii := dynamodb.GetItemInput{Key: attributes, TableName: aws.String(ls.tableName), ConsistentRead: aws.Bool(true)} gio, err := ls.client.GetItem(context.TODO(), &gii) @@ -244,7 +244,7 @@ func (ls *DynamoDBLogStore) Get(tablePath storage.Path, fileName storage.Path) ( func (ls *DynamoDBLogStore) GetLatest(tablePath storage.Path) (*logstore.CommitEntry, error) { qi := dynamodb.QueryInput{TableName: &ls.tableName, ConsistentRead: aws.Bool(true), ScanIndexForward: aws.Bool(false), Limit: aws.Int32(1), ExpressionAttributeValues: map[string]types.AttributeValue{ ":partitionKey": &types.AttributeValueMemberS{Value: tablePath.Raw}, - }, KeyConditionExpression: aws.String(fmt.Sprintf("%s = :partitionKey", AttrTablePath))} + }, KeyConditionExpression: aws.String(fmt.Sprintf("%s = :partitionKey", TablePathAttr))} qo, err := ls.client.Query(context.TODO(), &qi) if err != nil { log.Debugf("delta-go: Failed Query. %v", err) @@ -265,11 +265,11 @@ func (ls *DynamoDBLogStore) dbResultToCommitEntry(item map[string]types.Attribut var expireTimeAttr uint64 var err error - _, ok := item[AttrExpireTime] + _, ok := item[ExpireTimeAttr] if !ok { expireTimeAttr = 0 } else { - expireTimeAttr, err = strconv.ParseUint(item[AttrExpireTime].(*types.AttributeValueMemberN).Value, 10, 64) + expireTimeAttr, err = strconv.ParseUint(item[ExpireTimeAttr].(*types.AttributeValueMemberN).Value, 10, 64) if err != nil { log.Debugf("delta-go: Failed to interpet expire time attribute as uint64. %v", err) return nil, err @@ -277,20 +277,20 @@ func (ls *DynamoDBLogStore) dbResultToCommitEntry(item map[string]types.Attribut } return logstore.NewCommitEntry( - storage.NewPath(item[AttrTablePath].(*types.AttributeValueMemberS).Value), - storage.NewPath(item[AttrFileName].(*types.AttributeValueMemberS).Value), - storage.NewPath(item[AttrTempPath].(*types.AttributeValueMemberS).Value), - item[AttrComplete].(*types.AttributeValueMemberS).Value == "true", + storage.NewPath(item[TablePathAttr].(*types.AttributeValueMemberS).Value), + storage.NewPath(item[FileNameAttr].(*types.AttributeValueMemberS).Value), + storage.NewPath(item[TempPathAttr].(*types.AttributeValueMemberS).Value), + item[CompleteAttr].(*types.AttributeValueMemberS).Value == "true", expireTimeAttr, ) } // Creates a put item request for an item to be inserted into a DynamoDBLogStore instance func (ls *DynamoDBLogStore) createPutItemRequest(entry *logstore.CommitEntry, overwrite bool) (*dynamodb.PutItemInput, error) { - attributes := map[string]types.AttributeValue{AttrTablePath: &types.AttributeValueMemberS{Value: entry.TablePath.Raw}, AttrFileName: &types.AttributeValueMemberS{Value: entry.FileName.Raw}, AttrTempPath: &types.AttributeValueMemberS{Value: entry.TempPath.Raw}, AttrComplete: &types.AttributeValueMemberS{Value: *aws.String(strconv.FormatBool(entry.Complete))}} + attributes := map[string]types.AttributeValue{TablePathAttr: &types.AttributeValueMemberS{Value: entry.TablePath.Raw}, FileNameAttr: &types.AttributeValueMemberS{Value: entry.FileName.Raw}, TempPathAttr: &types.AttributeValueMemberS{Value: entry.TempPath.Raw}, CompleteAttr: &types.AttributeValueMemberS{Value: *aws.String(strconv.FormatBool(entry.Complete))}} if entry.ExpireTime != 0 { - attributes[AttrExpireTime] = &types.AttributeValueMemberN{Value: *aws.String(fmt.Sprint(entry.ExpireTime))} + attributes[ExpireTimeAttr] = &types.AttributeValueMemberN{Value: *aws.String(fmt.Sprint(entry.ExpireTime))} } pir := &dynamodb.PutItemInput{ @@ -298,7 +298,7 @@ func (ls *DynamoDBLogStore) createPutItemRequest(entry *logstore.CommitEntry, ov Item: attributes} if !overwrite { - pir.ConditionExpression = aws.String(fmt.Sprintf("attribute_not_exists(%s)", AttrFileName)) + pir.ConditionExpression = aws.String(fmt.Sprintf("attribute_not_exists(%s)", FileNameAttr)) } return pir, nil diff --git a/state/dynamostate/dynamostate.go b/state/dynamostate/dynamostate.go index d1116a0d..541f8165 100644 --- a/state/dynamostate/dynamostate.go +++ b/state/dynamostate/dynamostate.go @@ -13,56 +13,88 @@ package dynamostate import ( + "context" "fmt" "strconv" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/dynamodb" - "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/rivian/delta-go/internal/dynamodbutils" "github.com/rivian/delta-go/state" ) -const KEY string = "key" -const VERSION string = "version" +const ( + KeyAttr string = "key" + VersionAttr string = "version" + DefaultMaxRetryTableCreateAttempts uint16 = 20 + DefaultRCU int64 = 5 + DefaultWCU int64 = 5 +) type DynamoState struct { Table string Key string - Client dynamodbiface.DynamoDBAPI + Client dynamodbutils.DynamoDBClient +} + +type Options struct { + MaxRetryTableCreateAttempts uint16 + RCU int64 + WCU int64 +} + +// Sets the default options +func (opts *Options) setOptionsDefaults() { + if opts.MaxRetryTableCreateAttempts == 0 { + opts.MaxRetryTableCreateAttempts = DefaultMaxRetryTableCreateAttempts + } + if opts.RCU == 0 { + opts.RCU = DefaultRCU + } + if opts.WCU == 0 { + opts.WCU = DefaultWCU + } } // Compile time check that DynamoState implements state.StateStore var _ state.StateStore = (*DynamoState)(nil) -func New(client dynamodbiface.DynamoDBAPI, tableName string, key string) (*DynamoState, error) { +func New(client dynamodbutils.DynamoDBClient, tableName string, key string, opts Options) (*DynamoState, error) { + opts.setOptionsDefaults() + + createTableInput := dynamodb.CreateTableInput{ + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String(KeyAttr), + KeyType: types.KeyTypeHash, + }, + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(opts.RCU), + WriteCapacityUnits: aws.Int64(opts.WCU), + }, + TableName: aws.String(tableName), + } + dynamodbutils.TryEnsureDynamoDBTableExists(client, tableName, createTableInput, opts.MaxRetryTableCreateAttempts) + tb := new(DynamoState) tb.Table = tableName tb.Key = key tb.Client = client - //TODO Check if table exists and handle exception - // describeTableInput := &dynamodb.DescribeTableInput{TableName: aws.String(tableName)} - // desciribeTableOutput, err := client.DescribeTable(describeTableInput) - // if desciribeTableOutput.Table.TableStatus != aws.String(dynamodb.TableStatusActive) { - // return nil, errors.New("dynamodb table is not active") - // } - // if err != nil { - // return nil, errors.New("dynamodb table is not active") - // } return tb, nil } func (l *DynamoState) Get() (state.CommitState, error) { input := &dynamodb.GetItemInput{ TableName: aws.String(l.Table), - Key: map[string]*dynamodb.AttributeValue{ - KEY: { - S: aws.String(l.Key), - }, + Key: map[string]types.AttributeValue{ + KeyAttr: &types.AttributeValueMemberS{Value: l.Key}, }, } // Call the GetItem operation to retrieve the item with the specified key value. - result, err := l.Client.GetItem(input) + result, err := l.Client.GetItem(context.TODO(), input) if err != nil { //TODO wrap error rather than printing fmt.Println("Error retrieving item.", err) @@ -74,7 +106,7 @@ func (l *DynamoState) Get() (state.CommitState, error) { return state.CommitState{Version: -1}, err } - versionValue := *result.Item["version"].S + versionValue := result.Item[VersionAttr].(*types.AttributeValueMemberS).Value version, err := strconv.Atoi(versionValue) if err != nil { //TODO wrap error rather than printing @@ -92,18 +124,14 @@ func (l *DynamoState) Put(commitS state.CommitState) error { // Create a PutItemInput object with the item data input := &dynamodb.PutItemInput{ TableName: aws.String(l.Table), - Item: map[string]*dynamodb.AttributeValue{ - KEY: { - S: aws.String(l.Key), - }, - VERSION: { - S: aws.String(versionString), - }, + Item: map[string]types.AttributeValue{ + KeyAttr: &types.AttributeValueMemberS{Value: l.Key}, + VersionAttr: &types.AttributeValueMemberS{Value: versionString}, }, } // Call PutItem to insert the new item into the table - _, err := l.Client.PutItem(input) + _, err := l.Client.PutItem(context.TODO(), input) if err != nil { //TODO wrap error rather than printing fmt.Println("Error inserting item.", err) diff --git a/state/dynamostate/dynamostate_test.go b/state/dynamostate/dynamostate_test.go index 4f332ac5..c80783b1 100644 --- a/state/dynamostate/dynamostate_test.go +++ b/state/dynamostate/dynamostate_test.go @@ -16,47 +16,22 @@ import ( "fmt" "testing" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/dynamodb" - "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + "github.com/rivian/delta-go/internal/dynamodbutils" "github.com/rivian/delta-go/state" ) -type mockDynamoDBClient struct { - dynamodbiface.DynamoDBAPI -} - -func (m *mockDynamoDBClient) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) { - return nil, nil -} - -func (m *mockDynamoDBClient) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) { - return &dynamodb.GetItemOutput{ - Item: map[string]*dynamodb.AttributeValue{ - "version": { - S: aws.String("0"), - }, - }, - }, nil -} - -// Create a function to return the mock client -func createMockDynamoDBClient() *mockDynamoDBClient { - return &mockDynamoDBClient{} -} - func TestGet(t *testing.T) { // svc, err := dynamolock.GetDynamoDBClient() // if err != nil { // log.Fatal(err) // } - dynamoState, err := New(createMockDynamoDBClient(), "storage-table", "_commit.state") + dynamoState, err := New(dynamodbutils.NewMockClient(), "storage-table", "_commit.state", Options{}) if err != nil { - t.Errorf("Error occurred in retriving version.") + t.Errorf("Error occurred in retrieving version.") } commitS, err := dynamoState.Get() if err != nil { - t.Errorf("Error occurred in retriving version.") + t.Errorf("Error occurred in retrieving version.") } versionString := fmt.Sprintf("%v", commitS.Version) if len(string(versionString)) < 1 { @@ -65,9 +40,9 @@ func TestGet(t *testing.T) { } func TestPut(t *testing.T) { - dynamoState, err := New(createMockDynamoDBClient(), "storage-table", "_commit.state") + dynamoState, err := New(dynamodbutils.NewMockClient(), "storage-table", "_commit.state", Options{}) if err != nil { - t.Errorf("Error occurred in retriving version.") + t.Errorf("Error occurred in retrieving version.") } commitState := state.CommitState{Version: 0} err = dynamoState.Put(commitState) diff --git a/state/filestate/filestate.go b/state/filestate/filestate.go index ed02c8be..c412f84c 100644 --- a/state/filestate/filestate.go +++ b/state/filestate/filestate.go @@ -37,7 +37,7 @@ func New(baseURI storage.Path, key string) *FileStateStore { return fs } -func (s *FileStateStore) Get() (state.CommitState, error) { +func (s FileStateStore) Get() (state.CommitState, error) { getPath := filepath.Join(s.BaseURI.Raw, s.Key) var commitState state.CommitState data, err := os.ReadFile(getPath) @@ -56,7 +56,7 @@ func (s *FileStateStore) Get() (state.CommitState, error) { return commitState, nil } -func (s *FileStateStore) Put(commitState state.CommitState) error { +func (s FileStateStore) Put(commitState state.CommitState) error { putPath := filepath.Join(s.BaseURI.Raw, s.Key) err := os.MkdirAll(filepath.Dir(putPath), 0755) if err != nil { diff --git a/state/localstate/localstate.go b/state/localstate/localstate.go index 44f40f73..9a969d88 100644 --- a/state/localstate/localstate.go +++ b/state/localstate/localstate.go @@ -35,7 +35,7 @@ func New(currentTableVersion int64) *LocalStateStore { // Compile time check that LocalStateStore implements state.StateStore var _ state.StateStore = (*LocalStateStore)(nil) -func (stateStore *LocalStateStore) Get() (state.CommitState, error) { +func (stateStore LocalStateStore) Get() (state.CommitState, error) { return state.CommitState{Version: stateStore.version}, nil } diff --git a/state/redisstate/rediststate.go b/state/redisstate/rediststate.go index 6beacad2..6ba81952 100644 --- a/state/redisstate/rediststate.go +++ b/state/redisstate/rediststate.go @@ -38,7 +38,7 @@ func New(client redis.UniversalClient, key string) *RedisStateStore { return s } -func (s *RedisStateStore) Get() (state.CommitState, error) { +func (s RedisStateStore) Get() (state.CommitState, error) { var commitState state.CommitState data, err := s.RedisClient.Get(s.ctx, s.Key).Result() @@ -57,7 +57,7 @@ func (s *RedisStateStore) Get() (state.CommitState, error) { return commitState, nil } -func (s *RedisStateStore) Put(commitState state.CommitState) error { +func (s RedisStateStore) Put(commitState state.CommitState) error { data, _ := json.Marshal(commitState) err := s.RedisClient.Set(s.ctx, s.Key, data, 0).Err() if err != nil { diff --git a/state/state_test.go b/state/state_test.go deleted file mode 100644 index 01d99ae7..00000000 --- a/state/state_test.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2023 Rivian Automotive, Inc. -// Licensed under the Apache License, Version 2.0 (the “License”); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an “AS IS” BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package state From 4831d0847d47e2d158b0c972cab6795c58405ccf Mon Sep 17 00:00:00 2001 From: Rahul Madnawat Date: Wed, 13 Sep 2023 15:20:00 -0700 Subject: [PATCH 7/9] Stop specifying method receivers as pointers --- checkpoint_test.go | 2 +- state/filestate/filestate.go | 4 ++-- state/localstate/localstate.go | 2 +- state/redisstate/rediststate.go | 4 ++-- storage/filestore/filestore.go | 24 +++++++++++++----------- 5 files changed, 19 insertions(+), 17 deletions(-) diff --git a/checkpoint_test.go b/checkpoint_test.go index 6de901d9..0c924e95 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -34,7 +34,7 @@ import ( ) // / Helper function to set up test state -func setupCheckpointTest(t *testing.T, inputFolder string, overrideStore bool) (store filestore.FileObjectStore, state state.StateStore, lock lock.Locker, checkpointLock lock.Locker) { +func setupCheckpointTest(t *testing.T, inputFolder string, overrideStore bool) (store *filestore.FileObjectStore, state state.StateStore, lock lock.Locker, checkpointLock lock.Locker) { t.Helper() tmpDir := t.TempDir() diff --git a/state/filestate/filestate.go b/state/filestate/filestate.go index c412f84c..ed02c8be 100644 --- a/state/filestate/filestate.go +++ b/state/filestate/filestate.go @@ -37,7 +37,7 @@ func New(baseURI storage.Path, key string) *FileStateStore { return fs } -func (s FileStateStore) Get() (state.CommitState, error) { +func (s *FileStateStore) Get() (state.CommitState, error) { getPath := filepath.Join(s.BaseURI.Raw, s.Key) var commitState state.CommitState data, err := os.ReadFile(getPath) @@ -56,7 +56,7 @@ func (s FileStateStore) Get() (state.CommitState, error) { return commitState, nil } -func (s FileStateStore) Put(commitState state.CommitState) error { +func (s *FileStateStore) Put(commitState state.CommitState) error { putPath := filepath.Join(s.BaseURI.Raw, s.Key) err := os.MkdirAll(filepath.Dir(putPath), 0755) if err != nil { diff --git a/state/localstate/localstate.go b/state/localstate/localstate.go index 9a969d88..44f40f73 100644 --- a/state/localstate/localstate.go +++ b/state/localstate/localstate.go @@ -35,7 +35,7 @@ func New(currentTableVersion int64) *LocalStateStore { // Compile time check that LocalStateStore implements state.StateStore var _ state.StateStore = (*LocalStateStore)(nil) -func (stateStore LocalStateStore) Get() (state.CommitState, error) { +func (stateStore *LocalStateStore) Get() (state.CommitState, error) { return state.CommitState{Version: stateStore.version}, nil } diff --git a/state/redisstate/rediststate.go b/state/redisstate/rediststate.go index 6ba81952..6beacad2 100644 --- a/state/redisstate/rediststate.go +++ b/state/redisstate/rediststate.go @@ -38,7 +38,7 @@ func New(client redis.UniversalClient, key string) *RedisStateStore { return s } -func (s RedisStateStore) Get() (state.CommitState, error) { +func (s *RedisStateStore) Get() (state.CommitState, error) { var commitState state.CommitState data, err := s.RedisClient.Get(s.ctx, s.Key).Result() @@ -57,7 +57,7 @@ func (s RedisStateStore) Get() (state.CommitState, error) { return commitState, nil } -func (s RedisStateStore) Put(commitState state.CommitState) error { +func (s *RedisStateStore) Put(commitState state.CommitState) error { data, _ := json.Marshal(commitState) err := s.RedisClient.Set(s.ctx, s.Key, data, 0).Err() if err != nil { diff --git a/storage/filestore/filestore.go b/storage/filestore/filestore.go index 854e8ed5..70dd3272 100644 --- a/storage/filestore/filestore.go +++ b/storage/filestore/filestore.go @@ -31,11 +31,13 @@ type FileObjectStore struct { // Compile time check that FileObjectStore implements storage.ObjectStore var _ storage.ObjectStore = (*FileObjectStore)(nil) -func New(baseURI storage.Path) FileObjectStore { - return FileObjectStore{BaseURI: baseURI} +func New(baseURI storage.Path) *FileObjectStore { + fs := new(FileObjectStore) + fs.BaseURI = baseURI + return fs } -func (s FileObjectStore) Put(location storage.Path, bytes []byte) error { +func (s *FileObjectStore) Put(location storage.Path, bytes []byte) error { writePath := filepath.Join(s.BaseURI.Raw, location.Raw) err := os.MkdirAll(filepath.Dir(writePath), 0700) if err != nil { @@ -48,7 +50,7 @@ func (s FileObjectStore) Put(location storage.Path, bytes []byte) error { return nil } -func (s FileObjectStore) RenameIfNotExists(from storage.Path, to storage.Path) error { +func (s *FileObjectStore) RenameIfNotExists(from storage.Path, to storage.Path) error { // return ErrorObjectAlreadyExists if the destination file exists _, err := s.Head(to) if !errors.Is(err, storage.ErrorObjectDoesNotExist) { @@ -58,7 +60,7 @@ func (s FileObjectStore) RenameIfNotExists(from storage.Path, to storage.Path) e return s.Rename(from, to) } -func (s FileObjectStore) Get(location storage.Path) ([]byte, error) { +func (s *FileObjectStore) Get(location storage.Path) ([]byte, error) { filePath := filepath.Join(s.BaseURI.Raw, location.Raw) data, err := os.ReadFile(filePath) if os.IsNotExist(err) { @@ -70,7 +72,7 @@ func (s FileObjectStore) Get(location storage.Path) ([]byte, error) { return data, nil } -func (s FileObjectStore) Head(location storage.Path) (storage.ObjectMeta, error) { +func (s *FileObjectStore) Head(location storage.Path) (storage.ObjectMeta, error) { filePath := filepath.Join(s.BaseURI.Raw, location.Raw) var meta storage.ObjectMeta info, err := os.Stat(filePath) @@ -91,7 +93,7 @@ func (s FileObjectStore) Head(location storage.Path) (storage.ObjectMeta, error) return meta, nil } -func (s FileObjectStore) Rename(from storage.Path, to storage.Path) error { +func (s *FileObjectStore) Rename(from storage.Path, to storage.Path) error { // rename source to destination f := s.BaseURI.Join(from) t := s.BaseURI.Join(to) @@ -102,7 +104,7 @@ func (s FileObjectStore) Rename(from storage.Path, to storage.Path) error { return nil } -func (s FileObjectStore) Delete(location storage.Path) error { +func (s *FileObjectStore) Delete(location storage.Path) error { filePath := filepath.Join(s.BaseURI.Raw, location.Raw) err := os.Remove(filePath) if err != nil { @@ -173,7 +175,7 @@ func listFilesInDirRecursively(baseURI string, dir string, prefix string) ([]sto return out, nil } -func (s FileObjectStore) ListAll(prefix storage.Path) (storage.ListResult, error) { +func (s *FileObjectStore) ListAll(prefix storage.Path) (storage.ListResult, error) { var listResult storage.ListResult dir, filePrefix := filepath.Split(prefix.Raw) @@ -215,10 +217,10 @@ func (s FileObjectStore) ListAll(prefix storage.Path) (storage.ListResult, error return listResult, nil } -func (s FileObjectStore) List(prefix storage.Path, previousResult *storage.ListResult) (storage.ListResult, error) { +func (s *FileObjectStore) List(prefix storage.Path, previousResult *storage.ListResult) (storage.ListResult, error) { return s.ListAll(prefix) } -func (s FileObjectStore) IsListOrdered() bool { +func (s *FileObjectStore) IsListOrdered() bool { return true } From 7934033f32f71a0d8d6176c318cfd72ced97b403 Mon Sep 17 00:00:00 2001 From: Rahul Madnawat Date: Wed, 13 Sep 2023 15:23:56 -0700 Subject: [PATCH 8/9] Use Attribute type --- lock/dynamolock/dynamolock.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lock/dynamolock/dynamolock.go b/lock/dynamolock/dynamolock.go index 26d1e2ab..b46b851f 100644 --- a/lock/dynamolock/dynamolock.go +++ b/lock/dynamolock/dynamolock.go @@ -24,8 +24,11 @@ import ( "github.com/rivian/delta-go/lock" ) +// Represents attribute names in DynamoDB items +type Attribute string + const ( - KeyAttr string = "key" + Key Attribute = "key" DefaultTTL time.Duration = 60 * time.Second DefaultHeartbeat time.Duration = 1 * time.Second DefaultMaxRetryTableCreateAttempts uint16 = 20 @@ -88,7 +91,7 @@ func New(client dynamodbutils.DynamoDBClient, tableName string, key string, opts createTableInput := dynamodb.CreateTableInput{ KeySchema: []types.KeySchemaElement{ { - AttributeName: aws.String(KeyAttr), + AttributeName: aws.String(string(Key)), KeyType: types.KeyTypeHash, }, }, From a38a0d3814b4c5796cc8c4ed0545ec6d6b733efd Mon Sep 17 00:00:00 2001 From: Rahul Madnawat Date: Wed, 13 Sep 2023 16:08:39 -0700 Subject: [PATCH 9/9] Add comments for RCU and WCU --- lock/dynamolock/dynamolock.go | 6 ++++-- logstore/dynamodblogstore/dynamodblogstore.go | 6 ++++-- state/dynamostate/dynamostate.go | 6 ++++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/lock/dynamolock/dynamolock.go b/lock/dynamolock/dynamolock.go index b46b851f..3e92e356 100644 --- a/lock/dynamolock/dynamolock.go +++ b/lock/dynamolock/dynamolock.go @@ -55,8 +55,10 @@ type Options struct { HeartBeat time.Duration DeleteOnRelease bool MaxRetryTableCreateAttempts uint16 - RCU int64 - WCU int64 + // The number of read capacity units which can be consumed per second (https://aws.amazon.com/dynamodb/pricing/provisioned/) + RCU int64 + // The number of write capacity units which can be consumed per second (https://aws.amazon.com/dynamodb/pricing/provisioned/) + WCU int64 } // Sets the default options diff --git a/logstore/dynamodblogstore/dynamodblogstore.go b/logstore/dynamodblogstore/dynamodblogstore.go index db946755..351eab07 100644 --- a/logstore/dynamodblogstore/dynamodblogstore.go +++ b/logstore/dynamodblogstore/dynamodblogstore.go @@ -109,8 +109,10 @@ type DynamoDBLogStoreOptions struct { TableName string ExpirationDelaySeconds uint64 MaxRetryTableCreateAttempts uint16 - RCU int64 - WCU int64 + // The number of read capacity units which can be consumed per second (https://aws.amazon.com/dynamodb/pricing/provisioned/) + RCU int64 + // The number of write capacity units which can be consumed per second (https://aws.amazon.com/dynamodb/pricing/provisioned/) + WCU int64 } // Gets the client from a DynamoDBLogStore instance diff --git a/state/dynamostate/dynamostate.go b/state/dynamostate/dynamostate.go index 97ed6f1b..63644d46 100644 --- a/state/dynamostate/dynamostate.go +++ b/state/dynamostate/dynamostate.go @@ -43,8 +43,10 @@ type DynamoState struct { type Options struct { MaxRetryTableCreateAttempts uint16 - RCU int64 - WCU int64 + // The number of read capacity units which can be consumed per second (https://aws.amazon.com/dynamodb/pricing/provisioned/) + RCU int64 + // The number of write capacity units which can be consumed per second (https://aws.amazon.com/dynamodb/pricing/provisioned/) + WCU int64 } // Sets the default options