From abb80c1799e617dd6781c6867a6f3294e0d8766c Mon Sep 17 00:00:00 2001 From: Rahul Madnawat <59585630+rahulmadnawat@users.noreply.github.com> Date: Mon, 25 Sep 2023 10:57:39 -0700 Subject: [PATCH] feat: support using a log store to manage concurrent commits (#35) * Rebase on master * Clean up API design by moving fields to object store level * Run formatter * Update some comments * Fix error message typo * Remove unused parameter * Fix calls to NewDeltaTableWithLogStore() * Rename AddCommitInfo to AddCommitInfoIfNotPresent * Rename Read to ReadActions * Remove GitHub checks for Codecov * Fix Codecov error * Update comments for Client interfaces --------- Co-authored-by: Rahul Madnawat --- checkpoint_test.go | 22 +- codecov.yml | 15 + delta.go | 422 ++++++- delta_test.go | 1100 ++++++++++++++++- go.mod | 1 + go.sum | 12 +- internal/dynamodbutils/dynamodbclient.go | 52 +- internal/dynamodbutils/dynamodbmock.go | 240 ++-- internal/s3utils/s3client.go | 4 +- internal/s3utils/s3mock.go | 41 +- lock/dynamolock/dynamolock.go | 18 +- lock/dynamolock/dynamolock_test.go | 12 +- lock/lock.go | 4 +- logstore/commitentry.go | 85 ++ logstore/dynamodblogstore/dynamodblogstore.go | 266 ++-- .../dynamodblogstore/dynamodblogstore_test.go | 135 +- logstore/externalcommitentry.go | 60 - logstore/logstore.go | 18 +- state/dynamostate/dynamostate.go | 6 +- storage/filestore/filestore.go | 41 +- storage/filestore/filestore_test.go | 10 +- storage/s3store/s3store.go | 18 +- storage/s3store/s3store_test.go | 6 +- storage/storage.go | 7 + 24 files changed, 2080 insertions(+), 515 deletions(-) create mode 100644 codecov.yml create mode 100644 logstore/commitentry.go delete mode 100644 logstore/externalcommitentry.go diff --git a/checkpoint_test.go b/checkpoint_test.go index 5a25f1d4..b4cb17fe 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -978,15 +978,15 @@ func TestCheckpointCleanupExpiredLogs(t *testing.T) { } now := time.Now() // With cleanup enabled, 25 and 15 minutes ago should be deleted, 5 should not - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(0).Raw), now.Add(-25*time.Minute), now.Add(-25*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(0).Raw), now.Add(-25*time.Minute), now.Add(-25*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(2).Raw), now.Add(-5*time.Minute), now.Add(-5*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(2).Raw), now.Add(-5*time.Minute), now.Add(-5*time.Minute)) if err != nil { t.Fatal(err) } @@ -1105,27 +1105,27 @@ func TestCheckpointCleanupTimeAdjustment(t *testing.T) { // 3: 13 min ago // 4: 12 min ago // 5: 6 min ago - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(0).Raw), now.Add(-20*time.Minute), now.Add(-20*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(0).Raw), now.Add(-20*time.Minute), now.Add(-20*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(1).Raw), now.Add(-15*time.Minute), now.Add(-15*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(2).Raw), now.Add(-10*time.Minute), now.Add(-10*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(2).Raw), now.Add(-10*time.Minute), now.Add(-10*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-13*time.Minute), now.Add(-13*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(3).Raw), now.Add(-13*time.Minute), now.Add(-13*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-12*time.Minute), now.Add(-12*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(3).Raw), now.Add(-12*time.Minute), now.Add(-12*time.Minute)) if err != nil { t.Fatal(err) } - err = os.Chtimes(filepath.Join(store.BaseURI.Raw, CommitUriFromVersion(3).Raw), now.Add(-6*time.Minute), now.Add(-6*time.Minute)) + err = os.Chtimes(filepath.Join(store.BaseURI().Raw, CommitUriFromVersion(3).Raw), now.Add(-6*time.Minute), now.Add(-6*time.Minute)) if err != nil { t.Fatal(err) } @@ -1194,7 +1194,7 @@ func TestCheckpointLocked(t *testing.T) { t.Fatal("unable to obtain lock") } - localLock := filelock.New(store.BaseURI, "_delta_log/_checkpoint.lock", filelock.Options{}) + localLock := filelock.New(store.BaseURI(), "_delta_log/_checkpoint.lock", filelock.Options{}) checkpointed, err := CreateCheckpoint(store, localLock, NewCheckpointConfiguration(), 5) if !errors.Is(err, lock.ErrLockNotObtained) { @@ -1220,7 +1220,7 @@ func TestCheckpointLocked(t *testing.T) { func TestCheckpointUnlockFailure(t *testing.T) { store, _, _, _ := setupCheckpointTest(t, "testdata/checkpoints/simple") - brokenLock := testBrokenUnlockLocker{*filelock.New(store.BaseURI, "_delta_log/_commit.lock", filelock.Options{TTL: 60 * time.Second})} + brokenLock := testBrokenUnlockLocker{*filelock.New(store.BaseURI(), "_delta_log/_commit.lock", filelock.Options{TTL: 60 * time.Second})} checkpointed, err := CreateCheckpoint(store, &brokenLock, NewCheckpointConfiguration(), 5) if !errors.Is(err, lock.ErrUnableToUnlock) { diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 00000000..24096bbc --- /dev/null +++ b/codecov.yml @@ -0,0 +1,15 @@ +coverage: + status: + project: + default: + informational: true + target: auto + threshold: "0.5%" + patch: + default: + informational: true + target: auto + threshold: "0.5%" + +github_checks: + annotations: false \ No newline at end of file diff --git a/delta.go b/delta.go index 2cdf286e..1d4680a3 100644 --- a/delta.go +++ b/delta.go @@ -20,10 +20,12 @@ import ( "regexp" "sort" "strconv" + "strings" "time" "github.com/google/uuid" "github.com/rivian/delta-go/lock" + "github.com/rivian/delta-go/logstore" "github.com/rivian/delta-go/state" "github.com/rivian/delta-go/storage" log "github.com/sirupsen/logrus" @@ -37,11 +39,8 @@ const maxReaderVersionSupported = 1 const maxWriterVersionSupported = 1 var ( - ErrDeltaTable error = errors.New("failed to apply transaction log") - ErrRetrieveLockBytes error = errors.New("failed to retrieve bytes from lock") - ErrLockDataEmpty error = errors.New("lock data is empty") ErrExceededCommitRetryAttempts error = errors.New("exceeded commit retry attempts") - ErrNotATable error = errors.New("not a Delta table") + ErrNotATable error = errors.New("not a table") ErrInvalidVersion error = errors.New("invalid version") ErrUnableToLoadVersion error = errors.New("unable to load specified version") ErrLockFailed error = errors.New("lock failed unexpectedly without an error") @@ -66,10 +65,12 @@ type DeltaTable struct { Store storage.ObjectStore // Locking client to ensure optimistic locked commits from distributed workers LockClient lock.Locker - // // file metadata for latest checkpoint + // file metadata for latest checkpoint LastCheckPoint *CheckPoint // table versions associated with timestamps VersionTimestamp map[int64]time.Time + // Log store which provides multi-cluster write support + LogStore logstore.LogStore } // OptimizeCheckpointConfiguration holds settings for optimizing checkpoint read and write operations @@ -97,6 +98,17 @@ func NewDeltaTable(store storage.ObjectStore, lock lock.Locker, stateStore state return table } +func NewDeltaTableWithLogStore(store storage.ObjectStore, lock lock.Locker, logStore logstore.LogStore) *DeltaTable { + t := new(DeltaTable) + t.State = *NewTableState(-1) + t.Store = store + t.LockClient = lock + t.LastCheckPoint = nil + t.LogStore = logStore + + return t +} + // Creates a new DeltaTransaction for the DeltaTable. // The transaction holds a mutable reference to the DeltaTable, preventing other references // until the transaction is dropped. @@ -173,25 +185,37 @@ func (table *DeltaTable) Create(metadata DeltaTableMetaData, protocol Protocol, actions = append(actions, add) } - transaction := table.CreateTransaction(nil) + var version int64 + var err error + + transaction := table.CreateTransaction(NewDeltaTransactionOptions()) transaction.AddActions(actions) - preparedCommit, err := transaction.PrepareCommit(nil, nil) - if err != nil { - return err - } - //Set StateStore Version=-1 synced with the table State Version - zeroState := state.CommitState{ - Version: table.State.Version, - } - transaction.DeltaTable.StateStore.Put(zeroState) - err = transaction.TryCommit(&preparedCommit) - if err != nil { - return err + if table.LogStore != nil { + version, err = transaction.CommitLogStore() + if err != nil { + return err + } + } else { + preparedCommit, err := transaction.PrepareCommit(nil, nil) + if err != nil { + return err + } + //Set StateStore Version=-1 synced with the table State Version + zeroState := state.CommitState{ + Version: table.State.Version, + } + transaction.DeltaTable.StateStore.Put(zeroState) + err = transaction.TryCommit(&preparedCommit) + if err != nil { + return err + } + + version = table.State.Version } // Merge state from new commit version - newState, err := NewTableStateFromCommit(table, table.State.Version) + newState, err := NewTableStateFromCommit(table, version) if err != nil { return err } @@ -790,9 +814,306 @@ func (dtmd *DeltaTableMetaData) GetPartitionColDataTypes() map[string]SchemaData // / Please not that in case of non-retryable error the temporary commit file such as // / `_delta_log/_commit_.json` will orphaned in storage. type DeltaTransaction struct { - DeltaTable *DeltaTable - Actions []Action - Options *DeltaTransactionOptions + DeltaTable *DeltaTable + Actions []Action + Operation DeltaOperation + AppMetadata map[string]any + Options *DeltaTransactionOptions +} + +// ReadActions gets actions from a file. +// +// With many concurrent readers/writers, there's a chance that concurrent recovery +// operations occur on the same file, i.e. the same temp file T(N) is copied into the +// target N.json file more than once. Though data loss will *NOT* occur, readers of N.json +// may receive an error from S3 as the ETag of N.json was changed. This is safe to +// retry, so we do so here. +func (transaction *DeltaTransaction) ReadActions(path storage.Path) ([]Action, error) { + attempt := 0 + for { + if attempt >= int(transaction.Options.MaxRetryReadAttempts) { + return nil, fmt.Errorf("failed to get actions after %d attempts", transaction.Options.MaxRetryReadAttempts) + } + + entry, err := transaction.DeltaTable.Store.Get(path) + if err != nil { + attempt++ + log.Debugf("delta-go: Failed to get log entry. Incrementing attempt number to %d and retrying. %v", attempt, err) + continue + } + + actions, err := ActionsFromLogEntries(entry) + if err != nil { + attempt++ + log.Debugf("delta-go: Failed to get actions from log entry. Incrementing attempt number to %d and retrying. %v", + attempt, err) + continue + } + + return actions, nil + } +} + +// CommitLogStore writes actions to a file with or without overwrite as indicated. +// An error is thrown if the file already exists and overwriting is disabled. +// +// If overwriting is enabled, then write normally without any interaction with a log store. +// Otherwise, to commit for Delta version N: +// - Step 0: Fail if N.json already exists in the file system. +// - Step 1: Ensure that N-1.json exists. If not, perform a recovery. +// - Step 2: PREPARE the commit. +// - Write the actions into temp file T(N). +// - Write uncompleted commit entry E(N, T(N)) with mutual exclusion to the log store. +// +// - Step 3: COMMIT the commit to the Delta log. +// - Copy T(N) into N.json. +// +// - Step 4: ACKNOWLEDGE the commit. +// - Overwrite and complete commit entry E in the log store. +func (transaction *DeltaTransaction) CommitLogStore() (int64, error) { + attempt := 0 + for { + if attempt >= int(transaction.Options.MaxRetryWriteAttempts) { + return -1, fmt.Errorf("failed to commit with log store after %d attempts", transaction.Options.MaxRetryWriteAttempts) + } + + version, err := transaction.tryCommitLogStore() + if err != nil { + attempt++ + log.Debugf("delta-go: Incrementing log store commit attempt number to %d and retrying: %v", attempt, err) + continue + } + + return version, nil + } +} + +func (t *DeltaTransaction) tryCommitLogStore() (version int64, err error) { + var currURI storage.Path + prevURI, err := t.DeltaTable.LogStore.Latest(t.DeltaTable.Store.BaseURI()) + + if prevURI != nil { + parsed, prevVersion := CommitVersionFromUri(prevURI.FileName()) + if !parsed { + return -1, fmt.Errorf("failed to parse previous version from %s", prevURI.FileName().Raw) + } + + currURI = CommitUriFromVersion(prevVersion + 1) + } else { + if version, err := t.DeltaTable.LatestVersion(); err != nil { + currURI = CommitUriFromVersion(0) + } else { + uri := CommitUriFromVersion(version).Raw + fileName := storage.NewPath(strings.Split(uri, "_delta_log/")[1]) + seconds := t.DeltaTable.LogStore.ExpirationDelaySeconds() + + entry, err := logstore.New(t.DeltaTable.Store.BaseURI(), fileName, + storage.NewPath("") /* tempPath */, true /* isComplete */, uint64(time.Now().Unix())+seconds) + if err != nil { + return -1, fmt.Errorf("create first commit entry: %v", err) + } + + if err := t.DeltaTable.LogStore.Put(entry, t.DeltaTable.Store.SupportsAtomicPutIfAbsent()); err != nil { + return -1, fmt.Errorf("put first commit entry: %v", err) + } + log.Debugf("delta-go: Put completed commit entry for table path %s and file name %s in the empty log store.", + t.DeltaTable.Store.BaseURI(), fileName) + + currURI = CommitUriFromVersion(version + 1) + } + } + + t.AddCommitInfoIfNotPresent() + + // Prevent concurrent writers from either + // a) concurrently overwriting N.json if overwriting is enabled + // b) both checking if N-1.json exists and performing a recovery where they both + // copy T(N-1) into N-1.json + // + // Note that the mutual exclusion on writing into N.json with overwriting disabled from + // different machines is provided by the log store, not by this lock. + // + // Also note that this lock is for N.json, while the lock used during a recovery is for + // N-1.json. Thus, there is no deadlock. + lock, err := t.DeltaTable.LockClient.NewLock(t.DeltaTable.Store.BaseURI().Join(currURI).Raw) + if err != nil { + return -1, fmt.Errorf("create lock: %v", err) + } + if _, err = lock.TryLock(); err != nil { + return -1, fmt.Errorf("acquire lock: %v", err) + } + defer func() { + // Defer the unlock and overwrite any errors if the unlock fails. + if unlockErr := lock.Unlock(); unlockErr != nil { + err = unlockErr + } + }() + + fileName := storage.NewPath(strings.Split(currURI.Raw, "_delta_log/")[1]) + + parsed, currVersion := CommitVersionFromUri(currURI) + if !parsed { + return -1, fmt.Errorf("failed to parse previous version from %s", currURI.Raw) + } + + if t.DeltaTable.Store.SupportsAtomicPutIfAbsent() { + t.writeActions(currURI, t.Actions) + + return currVersion, nil + } else { + // Step 0: Fail if N.json already exists in the file system and overwriting is disabled. + if _, err = t.DeltaTable.Store.Head(currURI); err == nil { + return -1, errors.New("current version already exists") + } + } + + // Step 1: Ensure that N-1.json exists. + if currVersion > 0 { + prevFileName := storage.NewPath(strings.Split(CommitUriFromVersion(currVersion-1).Raw, "_delta_log/")[1]) + + if prevEntry, err := t.DeltaTable.LogStore.Get(t.DeltaTable.Store.BaseURI(), prevFileName); err != nil { + return -1, fmt.Errorf("get previous commit: %v", err) + } else if !prevEntry.IsComplete() { + if err := t.fixDeltaLog(prevEntry); err != nil { + return -1, fmt.Errorf("fix Delta log: %v", err) + } + } + } else { + if entry, err := t.DeltaTable.LogStore.Get(t.DeltaTable.Store.BaseURI(), fileName); err == nil { + if _, err := t.DeltaTable.Store.Head(currURI); entry.IsComplete() && err != nil { + return -1, fmt.Errorf("old entries for table %s still in log store", t.DeltaTable.Store.BaseURI()) + } + } + } + + // Step 2: PREPARE the commit. + tempPath, err := t.createTempPath(fileName) + if err != nil { + return -1, fmt.Errorf("create temp path: %v", err) + } + relativeTempPath := storage.NewPath("_delta_log").Join(tempPath) + + entry, err := logstore.New(t.DeltaTable.Store.BaseURI(), fileName, tempPath, false /* isComplete */, 0 /* expirationTime */) + if err != nil { + return -1, fmt.Errorf("create commit entry: %v", err) + } + + if err := t.writeActions(relativeTempPath, t.Actions); err != nil { + return -1, fmt.Errorf("write actions: %v", err) + } + + // Step 2.2: Create uncompleted commit entry E(N, T(N)). + if err := t.DeltaTable.LogStore.Put(entry, t.DeltaTable.Store.SupportsAtomicPutIfAbsent()); err != nil { + return -1, fmt.Errorf("put uncompleted commit entry: %v", err) + } + + // Step 3: COMMIT the commit to the Delta log. + // Copy T(N) -> N.json with overwriting disabled. + if err := t.copyTempFile(relativeTempPath, currURI); err != nil { + return -1, fmt.Errorf("copy temp file to N.json: %v", err) + } + + // Step 4: ACKNOWLEDGE the commit. + if err := t.complete(entry); err != nil { + return -1, fmt.Errorf("acknowledge commit: %v", err) + } + + return currVersion, nil +} + +func (t *DeltaTransaction) complete(entry *logstore.CommitEntry) error { + seconds := t.DeltaTable.LogStore.ExpirationDelaySeconds() + entry, err := entry.Complete(seconds) + if err != nil { + return fmt.Errorf("complete commit entry: %v", err) + } + + if err := t.DeltaTable.LogStore.Put(entry, true); err != nil { + return fmt.Errorf("put completed commit entry: %v", err) + } + + return nil +} + +func (t *DeltaTransaction) copyTempFile(src storage.Path, dst storage.Path) error { + return t.DeltaTable.Store.RenameIfNotExists(src, dst) +} + +func (t *DeltaTransaction) createTempPath(path storage.Path) (storage.Path, error) { + return storage.NewPath(fmt.Sprintf(".tmp/%s.%s", path.Raw, uuid.New().String())), nil +} + +func (t *DeltaTransaction) fixDeltaLog(entry *logstore.CommitEntry) (err error) { + if entry.IsComplete() { + return nil + } + + filePath, err := entry.AbsoluteFilePath() + if err != nil { + return fmt.Errorf("get absolute file path: %v", err) + } + + lock, err := t.DeltaTable.LockClient.NewLock(filePath.Raw) + if err != nil { + return fmt.Errorf("create lock: %v", err) + } + if _, err = lock.TryLock(); err != nil { + return fmt.Errorf("acquire lock: %v", err) + } + defer func() { + // Defer the unlock and overwrite any errors if the unlock fails. + if unlockErr := lock.Unlock(); err != nil { + err = unlockErr + } + }() + + attempt, copied := 0, false + for { + if attempt >= int(t.Options.MaxRetryDeltaLogFixAttempts) { + return fmt.Errorf("failed to fix Delta log after %d attempts", t.Options.MaxRetryDeltaLogFixAttempts) + } + + log.Infof("delta-go: Trying to fix %s.", entry.FileName().Raw) + + if _, err = t.DeltaTable.Store.Head(filePath); !copied && err != nil { + tempPath, err := entry.AbsoluteTempPath() + if err != nil { + attempt++ + log.Debugf("delta-go: Failed to get absolute temp path. Incrementing attempt number to %d and retrying. %v", + attempt, err) + continue + } + + if err := t.copyTempFile(tempPath, filePath); err != nil { + attempt++ + log.Debugf("delta-go: File %s already copied. Incrementing attempt number to %d and retrying. %v", + entry.FileName().Raw, attempt, err) + copied = true + continue + } + + copied = true + } + + if err := t.complete(entry); err != nil { + attempt++ + log.Debugf("delta-go: Failed to complete commit entry. Incrementing attempt number to %d and retrying. %v", attempt, err) + continue + } + + log.Infof("delta-go: Fixed file %s.", entry.FileName().Raw) + return nil + } +} + +func (transaction *DeltaTransaction) writeActions(path storage.Path, actions []Action) error { + // Serialize all actions that are part of this log entry. + entry, err := LogEntryFromActions(actions) + if err != nil { + return errors.New("failed to serialize actions") + } + + return transaction.DeltaTable.Store.Put(path, entry) } // / Creates a new delta transaction. @@ -815,6 +1136,42 @@ func (transaction *DeltaTransaction) AddActions(actions []Action) { transaction.Actions = append(transaction.Actions, actions...) } +// AddCommitInfo adds a `commitInfo` action to a transaction's actions if not already present. +func (t *DeltaTransaction) AddCommitInfoIfNotPresent() { + found := false + for _, action := range t.Actions { + switch action.(type) { + case CommitInfo: + found = true + } + } + + if !found { + commitInfo := make(CommitInfo) + commitInfo["timestamp"] = time.Now().UnixMilli() + commitInfo["clientVersion"] = fmt.Sprintf("delta-go.%s", deltaClientVersion) + + if t.Operation != nil { + maps.Copy(commitInfo, t.Operation.GetCommitInfo()) + } + if t.AppMetadata != nil { + maps.Copy(commitInfo, t.AppMetadata) + } + + t.AddAction(commitInfo) + } +} + +// SetOperation sets the Delta operation for this transaction. +func (transaction *DeltaTransaction) SetOperation(operation DeltaOperation) { + transaction.Operation = operation +} + +// SetAppMetadata sets the app metadata for this transaction. +func (transaction *DeltaTransaction) SetAppMetadata(appMetadata map[string]any) { + transaction.AppMetadata = appMetadata +} + // Commits the given actions to the delta log. // This method will retry the transaction commit based on the value of `max_retry_commit_attempts` set in `DeltaTransactionOptions`. func (transaction *DeltaTransaction) Commit(operation DeltaOperation, appMetadata map[string]any) (int64, error) { @@ -1005,8 +1362,12 @@ type PreparedCommit struct { URI storage.Path } -const defaultDeltaMaxRetryCommitAttempts uint32 = 10000000 -const defaultRetryCommitAttemptsBeforeLoadingTable uint32 = 100 +const ( + defaultDeltaMaxRetryCommitAttempts uint32 = 10000000 + defaultDeltaMaxWriteCommitAttempts uint32 = 10000000 + defaultRetryCommitAttemptsBeforeLoadingTable uint32 = 100 + defaultMaxRetryDeltaLogFixAttempts uint16 = 3 +) // Options for customizing behavior of a `DeltaTransaction` type DeltaTransactionOptions struct { @@ -1014,17 +1375,20 @@ type DeltaTransactionOptions struct { MaxRetryCommitAttempts uint32 // RetryWaitDuration sets the amount of times between retry's on the transaction RetryWaitDuration time.Duration + // Number of retry attempts allowed when reading actions from a log entry + MaxRetryReadAttempts uint32 + // Number of retry attempts allowed when writing actions to a log entry + MaxRetryWriteAttempts uint32 // number of retry commit attempts before loading the latest version from the table rather // than using the state store RetryCommitAttemptsBeforeLoadingTable uint32 + // Number of retry attempts allowed when fixing the Delta log + MaxRetryDeltaLogFixAttempts uint16 } -// NewDeltaTransactionOptions Sets the default MaxRetryCommitAttempts to DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS = 10000000 +// NewDeltaTransactionOptions sets the default transaction options. func NewDeltaTransactionOptions() *DeltaTransactionOptions { - return &DeltaTransactionOptions{ - MaxRetryCommitAttempts: defaultDeltaMaxRetryCommitAttempts, - RetryCommitAttemptsBeforeLoadingTable: defaultRetryCommitAttemptsBeforeLoadingTable, - } + return &DeltaTransactionOptions{MaxRetryCommitAttempts: defaultDeltaMaxRetryCommitAttempts, MaxRetryWriteAttempts: defaultDeltaMaxWriteCommitAttempts, MaxRetryDeltaLogFixAttempts: defaultMaxRetryDeltaLogFixAttempts, RetryCommitAttemptsBeforeLoadingTable: defaultRetryCommitAttemptsBeforeLoadingTable} } // OpenTableWithVersion loads the table at this specific version diff --git a/delta_test.go b/delta_test.go index f81ada14..a19089fd 100644 --- a/delta_test.go +++ b/delta_test.go @@ -30,12 +30,16 @@ import ( "github.com/apache/arrow/go/v13/parquet" "github.com/apache/arrow/go/v13/parquet/compress" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/chelseajonesr/rfarrow" "github.com/google/uuid" + "github.com/rivian/delta-go/internal/dynamodbutils" "github.com/rivian/delta-go/internal/s3utils" "github.com/rivian/delta-go/lock" + "github.com/rivian/delta-go/lock/dynamolock" "github.com/rivian/delta-go/lock/filelock" "github.com/rivian/delta-go/lock/nillock" + "github.com/rivian/delta-go/logstore/dynamodblogstore" "github.com/rivian/delta-go/state/filestate" "github.com/rivian/delta-go/state/localstate" @@ -45,7 +49,8 @@ import ( ) func TestDeltaTransactionPrepareCommit(t *testing.T) { - store := filestore.FileObjectStore{BaseURI: storage.Path{Raw: "tmp/"}} + var store filestore.FileObjectStore + store.SetBaseURI(storage.NewPath("tmp/")) l := filelock.New(storage.NewPath(""), "tmp/_delta_log/_commit.lock", filelock.Options{}) deltaTable := DeltaTable{Store: &store, LockClient: l} options := DeltaTransactionOptions{MaxRetryCommitAttempts: 3} @@ -70,7 +75,7 @@ func TestDeltaTransactionPrepareCommit(t *testing.T) { t.Errorf("extension should be .tmp, has %s", commit.URI.Ext()) } - commitFullPath := filepath.Join(store.BaseURI.Base(), commit.URI.Raw) + commitFullPath := filepath.Join(store.BaseURI().Base(), commit.URI.Raw) exists := fileExists(commitFullPath) if !exists { t.Error("commit file does not exist") @@ -1317,15 +1322,16 @@ func BenchmarkLatestVersion(b *testing.B) { var ( dir = b.TempDir() path = storage.NewPath(dir) - fileStore = filestore.FileObjectStore{BaseURI: path} - client = new(s3utils.MockS3Client) + fileStore *filestore.FileObjectStore + client = new(s3utils.MockClient) ) + fileStore.SetBaseURI(path) client.SetFileStore(fileStore) baseURL, err := uri.ParseURL() if err != nil { - b.Fatalf("Failed to parse URL: %v", err) + b.Fatalf("Failed to parse URL from %s: %v", uri, err) } if strings.HasSuffix(baseURL.Path, "/") { @@ -1444,3 +1450,1087 @@ func TestLoadVersion(t *testing.T) { t.Errorf("expected version %d, found %d", 12, table.State.Version) } } + +// Performs common setup for the log store tests, creating a Delta table backed by mock DynamoDB and S3 clients +func setUpSingleClusterLogStoreTest(t *testing.T) (logStoreTableName string, table *DeltaTable, transaction *DeltaTransaction) { + t.Helper() + + logStoreTableName = "version_log_store" + logStore, err := dynamodblogstore.New(dynamodblogstore.Options{Client: dynamodbutils.NewMockClient(), TableName: logStoreTableName}) + if err != nil { + t.Errorf("Failed to create log store: %v", err) + } + + path := storage.NewPath("s3://test-bucket/test-delta-table/") + client, err := s3utils.NewMockClient(t, path) + if err != nil { + t.Errorf("Failed to create client: %v", err) + } + store, err := s3store.New(client, path) + if err != nil { + t.Errorf("Failed to create store: %v", err) + } + + lock, err := dynamolock.New(dynamodbutils.NewMockClient(), "version_lock_store", "", + dynamolock.Options{TTL: 1 * time.Second, HeartBeat: 100 * time.Millisecond}) + if err != nil { + t.Errorf("Failed to create lock: %v", err) + } + + table = NewDeltaTableWithLogStore(store, lock, logStore) + + schema := SchemaTypeStruct{ + Fields: []SchemaField{ + {Name: "foo", Type: String, Nullable: false, Metadata: make(map[string]any)}, + {Name: "bar", Type: String, Nullable: false, Metadata: make(map[string]any)}, + }} + + config := make(map[string]string) + metadata := NewDeltaTableMetaData( + "test", + "This is a test table.", + new(Format).Default(), + schema, []string{"date"}, + config) + protocol := new(Protocol).Default() + + if err := table.Create(*metadata, protocol, make(map[string]any), []Add{}); err != nil { + t.Errorf("Failed to create table: %v", err) + } + + actions := []Action{Add{ + Path: "part-00000-b08cb562-b392-441d-a090-494a47da752b-c000.snappy.parquet", + Size: 807, + ModificationTime: time.Now().UnixMilli(), + }, + Add{ + Path: "part-00001-f9c7792d-57bc-4c56-8b9b-5cd7899ee9a2-c000.snappy.parquet", + Size: 807, + ModificationTime: time.Now().UnixMilli(), + }} + + operation := Write{Mode: Append} + appMetaData := make(map[string]any) + appMetaData["isBlindAppend"] = true + + transaction = table.CreateTransaction(NewDeltaTransactionOptions()) + transaction.AddActions(actions) + transaction.SetOperation(operation) + transaction.SetAppMetadata(appMetaData) + + return +} + +func TestCommitLogStore_Sequential(t *testing.T) { + logStoreTableName, table, transaction := setUpSingleClusterLogStoreTest(t) + + transactions := 101 + for i := 1; i < transactions; i++ { + version, err := transaction.CommitLogStore() + if err != nil { + t.Errorf("Failed to commit with log store: %v", err) + } + t.Logf("Committed version %d", version) + } + + items, ok := table.LogStore.Client().(*dynamodbutils.MockClient).TablesToItems().Get(logStoreTableName) + if !ok { + t.Error("Failed to get table items") + } + + if len(items) != transactions { + t.Errorf("len(items) = %v, want %v", len(items), transactions) + } + + for entry := 0; entry < len(items); entry++ { + parsed, version := CommitVersionFromUri( + storage.NewPath(items[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value)) + if !parsed { + t.Errorf("Failed to parse version from %s", items[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value) + } + + if version != int64(entry) { + t.Errorf("version = %v, want %v", version, int64(entry)) + } + } + + url, err := table.Store.(*s3store.S3ObjectStore).BaseURI().ParseURL() + if err != nil { + t.Errorf("Failed to parse URL from %s: %v", table.Store.(*s3store.S3ObjectStore).BaseURI().Raw, err) + } + prefix := url.Host + url.Path + "_delta_log/" + objects, err := table.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().ListAll(storage.NewPath(prefix)) + if err != nil { + t.Errorf("Failed to list all Delta log objects: %v", err) + } + + if len(objects.Objects) != transactions+2 { + t.Errorf("len(objects.Objects) = %v, want %v", len(objects.Objects), transactions+2) + } + + if objects.Objects[0].Location.Raw != prefix+".tmp/" { + t.Errorf("objects.Objects[0].Location.Raw = %v, want %v", objects.Objects[0].Location.Raw, prefix+".tmp/") + } + + if objects.Objects[len(objects.Objects)-1].Location.Raw != prefix { + t.Errorf("objects.Objects[len(objects.Objects)-1].Location.Raw = %v, want %v", objects.Objects[len(objects.Objects)-1].Location.Raw, + prefix) + } + + logs := objects.Objects[1 : len(objects.Objects)-1] + + for log := 0; log < len(logs); log++ { + parsed, version := CommitVersionFromUri(logs[log].Location) + if !parsed { + t.Errorf("Failed to parse version from %s", logs[log].Location.Raw) + } + + if version != int64(log) { + t.Errorf("version = %v, want %v", version, int64(log)) + } + + bytes, err := table.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().Get(logs[log].Location) + if err != nil { + t.Errorf("Failed to get bytes: %v", err) + } + + actions, err := ActionsFromLogEntries(bytes) + if err != nil { + t.Errorf("Failed to get actions from bytes: %v", err) + } + + if log == 0 { + t.Log("Verifying the exact contents of the first log is not possible") + continue + } + + if len(actions) != len(transaction.Actions) { + t.Errorf("len(actions) = %v, want %v", len(actions), len(transaction.Actions)) + } + + for action := 0; action < len(transaction.Actions)-1; action++ { + parsed, ok := actions[action].(*Add) + if !ok { + t.Error("Failed to get parsed add action") + } + + expected, ok := transaction.Actions[action].(Add) + if !ok { + t.Error("Failed to get expected add action") + } + + if parsed.Path != expected.Path { + t.Errorf("parsed.Path = %v, want %v", parsed.Path, expected.Path) + } + + if !reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) { + t.Errorf("reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) = %v, want %v", + reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues), false) + } + + if parsed.Size != expected.Size { + t.Errorf("parsed.Size = %v, want %v", parsed.Size, expected.Size) + } + + if parsed.ModificationTime != expected.ModificationTime { + t.Errorf("parsed.ModificationTime = %v, want %v", parsed.ModificationTime, expected.ModificationTime) + } + + if parsed.DataChange != expected.DataChange { + t.Errorf("parsed.DataChange = %v, want %v", parsed.DataChange, expected.DataChange) + } + + if !reflect.DeepEqual(parsed.Tags, expected.Tags) { + t.Errorf("reflect.DeepEqual(parsed.Tags, expected.Tags) = %v, want %v", reflect.DeepEqual(parsed.Tags, expected.Tags), false) + } + + if parsed.Stats != expected.Stats { + t.Errorf("parsed.Stats = %v, want %v", parsed.Stats, expected.Stats) + } + } + } +} + +func TestCommitLogStore_LimitedConcurrent(t *testing.T) { + logStoreTableName, table, transaction := setUpSingleClusterLogStoreTest(t) + + var ( + wg sync.WaitGroup + maxGoroutines = 5 + guard = make(chan struct{}, maxGoroutines) + transactions = 101 + ) + for entry := 1; entry < transactions; entry++ { + guard <- struct{}{} + wg.Add(1) + + go func() { + version, err := transaction.CommitLogStore() + if err != nil { + t.Errorf("Failed to commit with log store: %v", err) + } + t.Logf("Committed version %d", version) + + <-guard + wg.Done() + }() + } + wg.Wait() + + items, ok := table.LogStore.Client().(*dynamodbutils.MockClient).TablesToItems().Get(logStoreTableName) + if !ok { + t.Error("Failed to get table items") + } + + if len(items) != transactions { + t.Errorf("len(items) = %v, want %v", len(items), transactions) + } + + for entry := 0; entry < len(items); entry++ { + parsed, version := CommitVersionFromUri( + storage.NewPath(items[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value)) + if !parsed { + t.Errorf("Failed to parse version from %s", items[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value) + } + + if version != int64(entry) { + t.Errorf("version = %v, want %v", version, int64(entry)) + } + } + + url, err := table.Store.(*s3store.S3ObjectStore).BaseURI().ParseURL() + if err != nil { + t.Errorf("Failed to parse URL from %s: %v", table.Store.(*s3store.S3ObjectStore).BaseURI().Raw, err) + } + prefix := url.Host + url.Path + "_delta_log/" + objects, err := table.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().ListAll(storage.NewPath(prefix)) + if err != nil { + t.Errorf("Failed to list all Delta log objects: %v", err) + } + + if objects.Objects[0].Location.Raw != prefix+".tmp/" { + t.Errorf("objects.Objects[0].Location.Raw = %v, want %v", objects.Objects[0].Location.Raw, prefix+".tmp/") + } + + if objects.Objects[len(objects.Objects)-1].Location.Raw != prefix { + t.Errorf("objects.Objects[len(objects.Objects)-1].Location.Raw = %v, want %v", objects.Objects[len(objects.Objects)-1].Location.Raw, + prefix) + } + + logs := objects.Objects[len(objects.Objects)-transactions-1 : len(objects.Objects)-1] + + for log := 0; log < len(logs); log++ { + parsed, version := CommitVersionFromUri(logs[log].Location) + if !parsed { + t.Errorf("Failed to parse version from %s", logs[log].Location.Raw) + } + + if version != int64(log) { + t.Errorf("version = %v, want %v", version, int64(log)) + } + + bytes, err := table.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().Get(logs[log].Location) + if err != nil { + t.Errorf("Failed to get bytes: %v", err) + } + + actions, err := ActionsFromLogEntries(bytes) + if err != nil { + t.Errorf("Failed to get actions from bytes: %v", err) + } + + if log == 0 { + t.Log("Verifying the exact contents of the first log is not possible") + continue + } + + if len(actions) != len(transaction.Actions) { + t.Errorf("len(actions) = %v, want %v", len(actions), len(transaction.Actions)) + } + + for action := 0; action < len(transaction.Actions)-1; action++ { + parsed, ok := actions[action].(*Add) + if !ok { + t.Error("Failed to get parsed add action") + } + + expected, ok := transaction.Actions[action].(Add) + if !ok { + t.Error("Failed to get expected add action") + } + + if parsed.Path != expected.Path { + t.Errorf("parsed.Path = %v, want %v", parsed.Path, expected.Path) + } + + if !reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) { + t.Errorf("reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) = %v, want %v", + reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues), false) + } + + if parsed.Size != expected.Size { + t.Errorf("parsed.Size = %v, want %v", parsed.Size, expected.Size) + } + + if parsed.ModificationTime != expected.ModificationTime { + t.Errorf("parsed.ModificationTime = %v, want %v", parsed.ModificationTime, expected.ModificationTime) + } + + if parsed.DataChange != expected.DataChange { + t.Errorf("parsed.DataChange = %v, want %v", parsed.DataChange, expected.DataChange) + } + + if !reflect.DeepEqual(parsed.Tags, expected.Tags) { + t.Errorf("reflect.DeepEqual(parsed.Tags, expected.Tags) = %v, want %v", reflect.DeepEqual(parsed.Tags, expected.Tags), false) + } + + if parsed.Stats != expected.Stats { + t.Errorf("parsed.Stats = %v, want %v", parsed.Stats, expected.Stats) + } + } + } +} + +func TestCommitLogStore_UnlimitedConcurrent(t *testing.T) { + logStoreTableName, table, transaction := setUpSingleClusterLogStoreTest(t) + + var ( + wg sync.WaitGroup + transactions = 101 + ) + for i := 1; i < transactions; i++ { + wg.Add(1) + + go func() { + version, err := transaction.CommitLogStore() + if err != nil { + t.Errorf("Failed to commit with log store: %v", err) + } + t.Logf("Committed version %d", version) + + wg.Done() + }() + } + wg.Wait() + + items, ok := table.LogStore.Client().(*dynamodbutils.MockClient).TablesToItems().Get(logStoreTableName) + if !ok { + t.Error("Failed to get table items") + } + + if len(items) != transactions { + t.Errorf("len(items) = %v, want %v", len(items), transactions) + } + + for entry := 0; entry < len(items); entry++ { + parsed, version := CommitVersionFromUri( + storage.NewPath(items[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value)) + if !parsed { + t.Errorf("Failed to parse version from %s", items[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value) + } + + if version != int64(entry) { + t.Errorf("version = %v, want %v", version, int64(entry)) + } + } + + url, err := table.Store.(*s3store.S3ObjectStore).BaseURI().ParseURL() + if err != nil { + t.Errorf("Failed to parse URL from %s: %v", table.Store.(*s3store.S3ObjectStore).BaseURI().Raw, err) + } + prefix := url.Host + url.Path + "_delta_log/" + objects, err := table.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().ListAll(storage.NewPath(prefix)) + if err != nil { + t.Errorf("Failed to list all Delta log objects: %v", err) + } + + if objects.Objects[0].Location.Raw != prefix+".tmp/" { + t.Errorf("objects.Objects[0].Location.Raw = %v, want %v", objects.Objects[0].Location.Raw, prefix+".tmp/") + } + + if objects.Objects[len(objects.Objects)-1].Location.Raw != prefix { + t.Errorf("objects.Objects[len(objects.Objects)-1].Location.Raw = %v, want %v", objects.Objects[len(objects.Objects)-1].Location.Raw, + prefix) + } + + logs := objects.Objects[len(objects.Objects)-transactions-1 : len(objects.Objects)-1] + + for log := 0; log < len(logs); log++ { + parsed, version := CommitVersionFromUri(logs[log].Location) + if !parsed { + t.Errorf("Failed to parse version from %s", logs[log].Location.Raw) + } + + if version != int64(log) { + t.Errorf("version = %v, want %v", version, int64(log)) + } + + bytes, err := table.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().Get(logs[log].Location) + if err != nil { + t.Errorf("Failed to get bytes: %v", err) + } + + actions, err := ActionsFromLogEntries(bytes) + if err != nil { + t.Errorf("Failed to get actions from bytes: %v", err) + } + + if log == 0 { + t.Log("Verifying the exact contents of the first log is not possible") + continue + } + + if len(actions) != len(transaction.Actions) { + t.Errorf("len(actions) = %v, want %v", len(actions), len(transaction.Actions)) + } + + for action := 0; action < len(transaction.Actions)-1; action++ { + parsed, ok := actions[action].(*Add) + if !ok { + t.Error("Failed to get parsed add action") + } + + expected, ok := transaction.Actions[action].(Add) + if !ok { + t.Error("Failed to get expected add action") + } + + if parsed.Path != expected.Path { + t.Errorf("parsed.Path = %v, want %v", parsed.Path, expected.Path) + } + + if !reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) { + t.Errorf("reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) = %v, want %v", + reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues), false) + } + + if parsed.Size != expected.Size { + t.Errorf("parsed.Size = %v, want %v", parsed.Size, expected.Size) + } + + if parsed.ModificationTime != expected.ModificationTime { + t.Errorf("parsed.ModificationTime = %v, want %v", parsed.ModificationTime, expected.ModificationTime) + } + + if parsed.DataChange != expected.DataChange { + t.Errorf("parsed.DataChange = %v, want %v", parsed.DataChange, expected.DataChange) + } + + if !reflect.DeepEqual(parsed.Tags, expected.Tags) { + t.Errorf("reflect.DeepEqual(parsed.Tags, expected.Tags) = %v, want %v", reflect.DeepEqual(parsed.Tags, expected.Tags), false) + } + + if parsed.Stats != expected.Stats { + t.Errorf("parsed.Stats = %v, want %v", parsed.Stats, expected.Stats) + } + } + } +} + +// Performs common setup for the log store tests, creating a Delta table backed by mock DynamoDB and S3 clients +func setUpMultiClusterLogStoreTest(t *testing.T) (logStoreTableName string, firstTable *DeltaTable, secondTable *DeltaTable, actions []Action, operation Write, appMetadata map[string]any) { + t.Helper() + + logStoreTableName = "version_log_store" + logStore, err := dynamodblogstore.New(dynamodblogstore.Options{Client: dynamodbutils.NewMockClient(), TableName: logStoreTableName}) + if err != nil { + t.Errorf("Failed to create log store: %v", err) + } + + path := storage.NewPath("s3://test-bucket/test-delta-table/") + client, err := s3utils.NewMockClient(t, path) + if err != nil { + t.Errorf("Failed to create client: %v", err) + } + store, err := s3store.New(client, path) + if err != nil { + t.Errorf("Failed to create store: %v", err) + } + + firstLock := nillock.New() + secondLock := nillock.New() + + firstTable = NewDeltaTableWithLogStore(store, firstLock, logStore) + secondTable = NewDeltaTableWithLogStore(store, secondLock, logStore) + + actions = []Action{Add{ + Path: "part-00000-b08cb562-b392-441d-a090-494a47da752b-c000.snappy.parquet", + Size: 807, + ModificationTime: time.Now().UnixMilli(), + }, + Add{ + Path: "part-00001-f9c7792d-57bc-4c56-8b9b-5cd7899ee9a2-c000.snappy.parquet", + Size: 807, + ModificationTime: time.Now().UnixMilli(), + }} + + operation = Write{Mode: Append, PartitionBy: []string{"date"}} + appMetadata = make(map[string]any) + appMetadata["isBlindAppend"] = true + + return +} + +func TestCommitLogStore_DifferentClients(t *testing.T) { + logStoreTableName, firstTable, secondTable, actions, operation, appMetadata := setUpMultiClusterLogStoreTest(t) + + var ( + wg sync.WaitGroup + transactions = 500 + firstTransaction *DeltaTransaction + secondTransaction *DeltaTransaction + ) + for i := 1; i < transactions/2+1; i++ { + wg.Add(1) + + go func() { + firstTransaction = firstTable.CreateTransaction(NewDeltaTransactionOptions()) + firstTransaction.AddActions(actions) + firstTransaction.SetOperation(operation) + firstTransaction.SetAppMetadata(appMetadata) + + version, err := firstTransaction.CommitLogStore() + if err != nil { + t.Errorf("Failed to commit first transaction with log store: %v", err) + } + t.Logf("Committed version %d", version) + + secondTransaction = secondTable.CreateTransaction(NewDeltaTransactionOptions()) + secondTransaction.AddActions(actions) + secondTransaction.SetOperation(operation) + secondTransaction.SetAppMetadata(appMetadata) + + version, err = secondTransaction.CommitLogStore() + if err != nil { + t.Errorf("Failed to commit second transaction with log store: %v", err) + } + t.Logf("Committed version %d", version) + + wg.Done() + }() + } + wg.Wait() + + firstItems, ok := firstTable.LogStore.Client().(*dynamodbutils.MockClient).TablesToItems().Get(logStoreTableName) + if !ok { + t.Error("Failed to get first table's items") + } + + if len(firstItems) != transactions { + t.Errorf("len(firstItems) = %v, want %v", len(firstItems), transactions) + } + + for entry := 0; entry < len(firstItems); entry++ { + parsed, version := CommitVersionFromUri( + storage.NewPath(firstItems[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value)) + if !parsed { + t.Errorf("Failed to parse version from %s", storage.NewPath(firstItems[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value).Raw) + } + + if version != int64(entry) { + t.Errorf("First table: version = %v, want %v", version, int64(entry)) + } + } + + url, err := firstTable.Store.(*s3store.S3ObjectStore).BaseURI().ParseURL() + if err != nil { + t.Errorf("First table: failed to parse URL from %s: %v", firstTable.Store.(*s3store.S3ObjectStore).BaseURI().Raw, err) + } + prefix := url.Host + url.Path + "_delta_log/" + firstObjects, err := firstTable.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().ListAll(storage.NewPath(prefix)) + if err != nil { + t.Errorf("Failed to list all Delta log objects for first table: %v", err) + } + + if firstObjects.Objects[0].Location.Raw != prefix+".tmp/" { + t.Errorf("firstObjects.Objects[0].Location.Raw = %v, want %v", firstObjects.Objects[0].Location.Raw, prefix+".tmp/") + } + + if firstObjects.Objects[len(firstObjects.Objects)-1].Location.Raw != prefix { + t.Errorf("firstObjects.Objects[len(firstObjects.Objects)-1].Location.Raw = %v, want %v", + firstObjects.Objects[len(firstObjects.Objects)-1].Location.Raw, prefix) + } + + logs := firstObjects.Objects[len(firstObjects.Objects)-transactions-1 : len(firstObjects.Objects)-1] + + for log := 0; log < len(logs); log++ { + parsed, version := CommitVersionFromUri(logs[log].Location) + if !parsed { + t.Errorf("First table: failed to parse version from %s", logs[log].Location.Raw) + } + + if version != int64(log) { + t.Errorf("First table: version = %v, want %v", version, int64(log)) + } + + bytes, err := firstTable.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().Get(logs[log].Location) + if err != nil { + t.Errorf("Failed to get bytes for first table: %v", err) + } + + actions, err := ActionsFromLogEntries(bytes) + if err != nil { + t.Errorf("Failed to get actions from bytes for first table: %v", err) + } + + if log == 0 { + t.Log("Verifying the exact contents of the first table's first log is not possible") + continue + } + + if len(actions) != len(firstTransaction.Actions) { + t.Errorf("First table: len(actions) = %v, want %v", len(actions), len(firstTransaction.Actions)) + } + + for action := 0; action < len(firstTransaction.Actions)-1; action++ { + parsed, ok := actions[action].(*Add) + if !ok { + t.Error("Failed to get parsed add action") + } + + expected, ok := firstTransaction.Actions[action].(Add) + if !ok { + t.Error("Failed to get expected add action") + } + + if parsed.Path != expected.Path { + t.Errorf("First table: parsed.Path = %v, want %v", parsed.Path, expected.Path) + } + + if !reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) { + t.Errorf("First table: reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) = %v, want %v", + reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues), false) + } + + if parsed.Size != expected.Size { + t.Errorf("First table: parsed.Size = %v, want %v", parsed.Size, expected.Size) + } + + if parsed.ModificationTime != expected.ModificationTime { + t.Errorf("First table: parsed.ModificationTime = %v, want %v", parsed.ModificationTime, expected.ModificationTime) + } + + if parsed.DataChange != expected.DataChange { + t.Errorf("First table: parsed.DataChange = %v, want %v", parsed.DataChange, expected.DataChange) + } + + if !reflect.DeepEqual(parsed.Tags, expected.Tags) { + t.Errorf("First table: reflect.DeepEqual(parsed.Tags, expected.Tags) = %v, want %v", + reflect.DeepEqual(parsed.Tags, expected.Tags), false) + } + + if parsed.Stats != expected.Stats { + t.Errorf("First table: parsed.Stats = %v, want %v", parsed.Stats, expected.Stats) + } + } + } + + secondItems, ok := secondTable.LogStore.Client().(*dynamodbutils.MockClient).TablesToItems().Get(logStoreTableName) + if !ok { + t.Error("Failed to get second table's items") + } + + if len(secondItems) != transactions { + t.Errorf("len(secondItems) = %v, want %v", len(secondItems), transactions) + } + + if len(secondItems) != len(firstItems) { + t.Errorf("len(secondItems) = %v, want %v", len(secondItems), len(firstItems)) + } + + for entry := 0; entry < len(secondItems); entry++ { + parsed, version := CommitVersionFromUri( + storage.NewPath(secondItems[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value)) + if !parsed { + t.Errorf("Second table: failed to parse version from %s", secondItems[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value) + } + + if version != int64(entry) { + t.Errorf("Second table: version = %v, want %v", version, int64(entry)) + } + } + + url, err = secondTable.Store.(*s3store.S3ObjectStore).BaseURI().ParseURL() + if err != nil { + t.Errorf("Second table: failed to parse URL from %s: %v", secondTable.Store.(*s3store.S3ObjectStore).BaseURI().Raw, err) + } + prefix = url.Host + url.Path + "_delta_log/" + secondObjects, err := secondTable.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().ListAll(storage.NewPath(prefix)) + if err != nil { + t.Errorf("Failed to list all Delta log objects for second table: %v", err) + } + + if len(secondObjects.Objects) != len(firstObjects.Objects) { + t.Errorf("len(secondObjects.Objects) = %v, want %v", len(secondObjects.Objects), len(firstObjects.Objects)) + } + + if secondObjects.Objects[0].Location.Raw != prefix+".tmp/" { + t.Errorf("secondObjects.Objects[0].Location.Raw = %v, want %v", secondObjects.Objects[0].Location.Raw, prefix+".tmp/") + } + + if secondObjects.Objects[len(secondObjects.Objects)-1].Location.Raw != prefix { + t.Errorf("secondObjects.Objects[len(secondObjects.Objects)-1].Location.Raw = %v, want %v", + secondObjects.Objects[len(secondObjects.Objects)-1].Location.Raw, prefix) + } + + logs = secondObjects.Objects[len(secondObjects.Objects)-transactions-1 : len(secondObjects.Objects)-1] + + for log := 0; log < len(logs); log++ { + parsed, version := CommitVersionFromUri(logs[log].Location) + if !parsed { + t.Errorf("Second table: failed to parse version from %s", logs[log].Location.Raw) + } + + if version != int64(log) { + t.Errorf("Second table: version = %v, want %v", version, int64(log)) + } + + bytes, err := secondTable.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().Get(logs[log].Location) + if err != nil { + t.Errorf("Failed to get bytes for second table: %v", err) + } + + actions, err := ActionsFromLogEntries(bytes) + if err != nil { + t.Errorf("Failed to get actions from bytes for second table: %v", err) + } + + if log == 0 { + t.Log("Verifying the exact contents of the second table's first log is not possible") + continue + } + + if len(actions) != len(secondTransaction.Actions) { + t.Errorf("First table: len(actions) = %v, want %v", len(actions), len(secondTransaction.Actions)) + } + + for action := 0; action < len(secondTransaction.Actions)-1; action++ { + parsed, ok := actions[action].(*Add) + if !ok { + t.Error("Failed to get parsed add action") + } + + expected, ok := secondTransaction.Actions[action].(Add) + if !ok { + t.Error("Failed to get expected add action") + } + + if parsed.Path != expected.Path { + t.Errorf("Second table: parsed.Path = %v, want %v", parsed.Path, expected.Path) + } + + if !reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) { + t.Errorf("Second table: reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) = %v, want %v", + reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues), false) + } + + if parsed.Size != expected.Size { + t.Errorf("Second table: parsed.Size = %v, want %v", parsed.Size, expected.Size) + } + + if parsed.ModificationTime != expected.ModificationTime { + t.Errorf("Second table: parsed.ModificationTime = %v, want %v", parsed.ModificationTime, expected.ModificationTime) + } + + if parsed.DataChange != expected.DataChange { + t.Errorf("Second table: parsed.DataChange = %v, want %v", parsed.DataChange, expected.DataChange) + } + + if !reflect.DeepEqual(parsed.Tags, expected.Tags) { + t.Errorf("Second table: reflect.DeepEqual(parsed.Tags, expected.Tags) = %v, want %v", + reflect.DeepEqual(parsed.Tags, expected.Tags), false) + } + + if parsed.Stats != expected.Stats { + t.Errorf("Second table: parsed.Stats = %v, want %v", parsed.Stats, expected.Stats) + } + } + } +} + +func TestCommitLogStore_EmptyLogStoreTableExists(t *testing.T) { + logStoreTableName, firstTable, secondTable, actions, operation, appMetadata := setUpMultiClusterLogStoreTest(t) + + versions := 1000 + for version := 1; version < versions; version++ { + filePath := CommitUriFromVersion(int64(version)) + + err := firstTable.Store.Put(filePath, nil) + if err != nil { + t.Errorf("Failed to put log: %v", err) + } + } + + var ( + wg sync.WaitGroup + transactions = 500 + firstTransaction *DeltaTransaction + secondTransaction *DeltaTransaction + ) + for i := 1; i < transactions/2+1; i++ { + wg.Add(1) + + go func() { + firstTransaction = firstTable.CreateTransaction(NewDeltaTransactionOptions()) + firstTransaction.AddActions(actions) + firstTransaction.SetOperation(operation) + firstTransaction.SetAppMetadata(appMetadata) + + version, err := firstTransaction.CommitLogStore() + if err != nil { + t.Errorf("Failed to commit first transaction with log store: %v", err) + } + t.Logf("Committed version %d", version) + + secondTransaction = secondTable.CreateTransaction(NewDeltaTransactionOptions()) + secondTransaction.AddActions(actions) + secondTransaction.SetOperation(operation) + secondTransaction.SetAppMetadata(appMetadata) + + version, err = secondTransaction.CommitLogStore() + if err != nil { + t.Errorf("Failed to commit second transaction with log store: %v", err) + } + t.Logf("Committed version %d", version) + + wg.Done() + }() + } + wg.Wait() + + firstItems, ok := firstTable.LogStore.Client().(*dynamodbutils.MockClient).TablesToItems().Get(logStoreTableName) + if !ok { + t.Error("Failed to get first table's items") + } + + if len(firstItems) != transactions+1 { + t.Errorf("len(firstItems) = %v, want %v", len(firstItems), transactions+1) + } + + for entry := 0; entry < len(firstItems); entry++ { + parsed, version := CommitVersionFromUri( + storage.NewPath(firstItems[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value)) + if !parsed { + t.Errorf("First table: failed to parse version from %s", firstItems[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value) + } + + if version != int64(entry+versions-1) { + t.Errorf("First table: version = %v, want %v", version, int64(entry+versions-1)) + } + } + + url, err := firstTable.Store.(*s3store.S3ObjectStore).BaseURI().ParseURL() + if err != nil { + t.Errorf("First table: failed to parse URL from %s: %v", firstTable.Store.(*s3store.S3ObjectStore).BaseURI().Raw, err) + } + prefix := url.Host + url.Path + "_delta_log/" + firstObjects, err := firstTable.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().ListAll(storage.NewPath(prefix)) + if err != nil { + t.Errorf("Failed to list all Delta log objects for first table: %v", err) + } + + if firstObjects.Objects[0].Location.Raw != prefix+".tmp/" { + t.Errorf("firstObjects.Objects[0].Location.Raw = %v, want %v", firstObjects.Objects[0].Location.Raw, prefix+".tmp/") + } + + if firstObjects.Objects[len(firstObjects.Objects)-1].Location.Raw != prefix { + t.Errorf("firstObjects.Objects[len(firstObjects.Objects)-1].Location.Raw = %v, want %v", + firstObjects.Objects[len(firstObjects.Objects)-1].Location.Raw, prefix) + } + + logs := firstObjects.Objects[len(firstObjects.Objects)-transactions-1 : len(firstObjects.Objects)-1] + + for log := 0; log < len(logs); log++ { + parsed, version := CommitVersionFromUri(logs[log].Location) + if !parsed { + t.Errorf("First table: failed to parse version from %s", logs[log].Location.Raw) + } + + if version != int64(log+versions) { + t.Errorf("First table: version = %v, want %v", version, int64(log+versions)) + } + + bytes, err := firstTable.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().Get(logs[log].Location) + if err != nil { + t.Errorf("Failed to get bytes for first table: %v", err) + } + + actions, err := ActionsFromLogEntries(bytes) + if err != nil { + t.Errorf("Failed to get actions from bytes for first table: %v", err) + } + + if log == 0 { + t.Log("Verifying the exact contents of the first table's first log is not possible") + continue + } + + if len(actions) != len(firstTransaction.Actions) { + t.Errorf("First table: len(actions) = %v, want %v", len(actions), len(firstTransaction.Actions)) + } + + for action := 0; action < len(firstTransaction.Actions)-1; action++ { + parsed, ok := actions[action].(*Add) + if !ok { + t.Error("First table: failed to get parsed add action") + } + + expected, ok := firstTransaction.Actions[action].(Add) + if !ok { + t.Error("First table: failed to get expected add action") + } + + if parsed.Path != expected.Path { + t.Errorf("First table: parsed.Path = %v, want %v", parsed.Path, expected.Path) + } + + if !reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) { + t.Errorf("First table: reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) = %v, want %v", + reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues), false) + } + + if parsed.Size != expected.Size { + t.Errorf("First table: parsed.Size = %v, want %v", parsed.Size, expected.Size) + } + + if parsed.ModificationTime != expected.ModificationTime { + t.Errorf("First table: parsed.ModificationTime = %v, want %v", parsed.ModificationTime, expected.ModificationTime) + } + + if parsed.DataChange != expected.DataChange { + t.Errorf("First table: parsed.DataChange = %v, want %v", parsed.DataChange, expected.DataChange) + } + + if !reflect.DeepEqual(parsed.Tags, expected.Tags) { + t.Errorf("First table: reflect.DeepEqual(parsed.Tags, expected.Tags) = %v, want %v", + reflect.DeepEqual(parsed.Tags, expected.Tags), false) + } + + if parsed.Stats != expected.Stats { + t.Errorf("First table: parsed.Stats = %v, want %v", parsed.Stats, expected.Stats) + } + } + } + + secondItems, ok := secondTable.LogStore.Client().(*dynamodbutils.MockClient).TablesToItems().Get(logStoreTableName) + if !ok { + t.Error("Failed to get second table's items") + } + + if len(secondItems) != transactions+1 { + t.Errorf("len(secondItems) = %v, want %v", len(secondItems), transactions+1) + } + + if len(secondItems) != len(firstItems) { + t.Errorf("len(secondItems) = %v, want %v", len(secondItems), len(firstItems)) + } + + for entry := 0; entry < len(secondItems); entry++ { + parsed, version := CommitVersionFromUri( + storage.NewPath(secondItems[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value)) + if !parsed { + t.Errorf("Second table: failed to parse version from %s", secondItems[entry][string(dynamodblogstore.FileName)].(*types.AttributeValueMemberS).Value) + } + + if version != int64(entry+versions-1) { + t.Errorf("Second table: version = %v, want %v", version, int64(entry+versions-1)) + } + } + + url, err = secondTable.Store.(*s3store.S3ObjectStore).BaseURI().ParseURL() + if err != nil { + t.Errorf("Second table: failed to parse URL from %s: %v", secondTable.Store.(*s3store.S3ObjectStore).BaseURI().Raw, err) + } + prefix = url.Host + url.Path + "_delta_log/" + secondObjects, err := secondTable.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().ListAll(storage.NewPath(prefix)) + if err != nil { + t.Errorf("Failed to list all Delta log objects for second table: %v", err) + } + + if len(secondObjects.Objects) != len(firstObjects.Objects) { + t.Errorf("len(secondObjects.Objects) = %v, want %v", len(secondObjects.Objects), len(firstObjects.Objects)) + } + + if secondObjects.Objects[0].Location.Raw != prefix+".tmp/" { + t.Errorf("secondObjects.Objects[0].Location.Raw = %v, want %v", secondObjects.Objects[0].Location.Raw, prefix+".tmp/") + } + + if secondObjects.Objects[len(secondObjects.Objects)-1].Location.Raw != prefix { + t.Errorf("secondObjects.Objects[len(secondObjects.Objects)-1].Location.Raw = %v, want %v", + secondObjects.Objects[len(secondObjects.Objects)-1].Location.Raw, prefix) + } + + logs = secondObjects.Objects[len(secondObjects.Objects)-transactions-1 : len(secondObjects.Objects)-1] + + for log := 0; log < len(logs); log++ { + parsed, version := CommitVersionFromUri(logs[log].Location) + if !parsed { + t.Errorf("Second table: failed to parse version from %s", logs[log].Location.Raw) + } + + if version != int64(log+versions) { + t.Errorf("Second table: version = %v, want %v", version, int64(log+versions)) + } + + bytes, err := secondTable.Store.(*s3store.S3ObjectStore).Client.(*s3utils.MockClient).FileStore().Get(logs[log].Location) + if err != nil { + t.Errorf("Failed to get bytes for second table: %v", err) + } + + actions, err := ActionsFromLogEntries(bytes) + if err != nil { + t.Errorf("Failed to get actions from bytes for second table: %v", err) + } + + if log == 0 { + t.Log("Verifying the exact contents of the second table's first log is not possible") + continue + } + + if len(actions) != len(secondTransaction.Actions) { + t.Errorf("First table: len(actions) = %v, want %v", len(actions), len(secondTransaction.Actions)) + } + + for action := 0; action < len(secondTransaction.Actions)-1; action++ { + parsed, ok := actions[action].(*Add) + if !ok { + t.Error("Second table: failed to get parsed add action") + } + + expected, ok := secondTransaction.Actions[action].(Add) + if !ok { + t.Error("Second table: failed to get expected add action") + } + + if parsed.Path != expected.Path { + t.Errorf("Second table: parsed.Path = %v, want %v", parsed.Path, expected.Path) + } + + if !reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) { + t.Errorf("Second table: reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues) = %v, want %v", + reflect.DeepEqual(parsed.PartitionValues, expected.PartitionValues), false) + } + + if parsed.Size != expected.Size { + t.Errorf("Second table: parsed.Size = %v, want %v", parsed.Size, expected.Size) + } + + if parsed.ModificationTime != expected.ModificationTime { + t.Errorf("Second table: parsed.ModificationTime = %v, want %v", parsed.ModificationTime, expected.ModificationTime) + } + + if parsed.DataChange != expected.DataChange { + t.Errorf("Second table: parsed.DataChange = %v, want %v", parsed.DataChange, expected.DataChange) + } + + if !reflect.DeepEqual(parsed.Tags, expected.Tags) { + t.Errorf("Second table: reflect.DeepEqual(parsed.Tags, expected.Tags) = %v, want %v", + reflect.DeepEqual(parsed.Tags, expected.Tags), false) + } + + if parsed.Stats != expected.Stats { + t.Errorf("Second table: parsed.Stats = %v, want %v", parsed.Stats, expected.Stats) + } + } + } +} diff --git a/go.mod b/go.mod index addf6c3c..2f23d03b 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,7 @@ require ( github.com/google/go-cmp v0.5.9 github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.16.7 // indirect + github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/pierrec/lz4/v4 v4.1.18 // indirect golang.org/x/sys v0.12.0 // indirect ) diff --git a/go.sum b/go.sum index fd90aab7..5e0233c9 100644 --- a/go.sum +++ b/go.sum @@ -175,16 +175,8 @@ github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= -github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= -github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro= -github.com/onsi/gomega v1.20.1/go.mod h1:DtrZpjmvpn2mPm4YWQa0/ALMDj9v4YxLgojwPeREyVo= -github.com/onsi/gomega v1.21.1/go.mod h1:iYAIXgPSaDHak0LCMA+AWBpIKBr8WZicMxnE8luStNc= -github.com/onsi/gomega v1.22.1/go.mod h1:x6n7VNe4hw0vkyYUM4mjIXx3JbLiPaBPNgB7PRQ1tuM= -github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2vQAg= -github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmvt1jM= -github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/onsi/gomega v1.26.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= +github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c= +github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM= github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ= github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= diff --git a/internal/dynamodbutils/dynamodbclient.go b/internal/dynamodbutils/dynamodbclient.go index 039cca7e..87809602 100644 --- a/internal/dynamodbutils/dynamodbclient.go +++ b/internal/dynamodbutils/dynamodbclient.go @@ -14,7 +14,7 @@ package dynamodbutils import ( "context" - "errors" + "fmt" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -22,12 +22,8 @@ import ( log "github.com/sirupsen/logrus" ) -var ( - ErrExceededTableCreateRetryAttempts error = errors.New("failed to create table") -) - -// Defines methods implemented by dynamodb.Client -type DynamoDBClient interface { +// Client defines methods implemented by AWS SDK for Go v2's DynamoDB client. +type Client interface { GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) @@ -37,35 +33,34 @@ type DynamoDBClient interface { Query(ctx context.Context, params *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) } -// Tries to create a DynamoDB table only if it doesn't exist -func TryEnsureDynamoDBTableExists(client DynamoDBClient, tableName string, createTableInput dynamodb.CreateTableInput, maxRetryTableCreateAttempts uint16) error { - attemptNumber := 0 - created := false - +// CreateTableIfNotExists creates a table if it does not exist. +func CreateTableIfNotExists(c Client, name string, cti dynamodb.CreateTableInput, maxAttempts uint16) error { + var ( + attemptNumber = 0 + created = false + ) for { - if attemptNumber >= int(maxRetryTableCreateAttempts) { - log.Debugf("delta-go: Table create attempt failed. Attempts exhausted beyond maxRetryDynamoDbTableCreateAttempts of %d so failing.", maxRetryTableCreateAttempts) - return ErrExceededTableCreateRetryAttempts + if attemptNumber >= int(maxAttempts) { + return fmt.Errorf("failed to create table after %d attempts", maxAttempts) } - status := "CREATING" - - result, err := client.DescribeTable(context.TODO(), &dynamodb.DescribeTableInput{ - TableName: aws.String(tableName), + result, err := c.DescribeTable(context.TODO(), &dynamodb.DescribeTableInput{ + TableName: aws.String(name), }) if err != nil { - log.Infof("delta-go: DynamoDB table %s does not exist. Creating it now with provisioned throughput of %d RCUs and %d WCUs.", tableName, *createTableInput.ProvisionedThroughput.ReadCapacityUnits, *createTableInput.ProvisionedThroughput.ReadCapacityUnits) - _, err := client.CreateTable(context.TODO(), &createTableInput) - if err != nil { - log.Debugf("delta-go: Table %s just created by concurrent process. %v", tableName, err) + log.Infof("delta-go: DynamoDB table %s does not exist. Creating it now with provisioned throughput of %d RCUs and %d WCUs.", name, *cti.ProvisionedThroughput.ReadCapacityUnits, *cti.ProvisionedThroughput.ReadCapacityUnits) + if _, err := c.CreateTable(context.TODO(), &cti); err != nil { + log.Debugf("delta-go: Table %s just created by concurrent process. %v", name, err) } created = true } + var status string + if result == nil || result.Table == nil { attemptNumber++ - log.Infof("delta-go: Waiting for %s table creation", tableName) + log.Infof("delta-go: Waiting for %s table creation", name) time.Sleep(1 * time.Second) continue } else { @@ -74,17 +69,18 @@ func TryEnsureDynamoDBTableExists(client DynamoDBClient, tableName string, creat if status == "ACTIVE" { if created { - log.Infof("delta-go: Successfully created DynamoDB table %s", tableName) + log.Infof("delta-go: Successfully created DynamoDB table %s", name) } else { - log.Infof("delta-go: Table %s already exists", tableName) + log.Infof("delta-go: Table %s already exists", name) } } else if status == "CREATING" { attemptNumber++ - log.Infof("delta-go: Waiting for %s table creation", tableName) + log.Infof("delta-go: Waiting for %s table creation", name) time.Sleep(1 * time.Second) + continue } else { attemptNumber++ - log.Debugf("delta-go: Table %s status: %s. Incrementing attempt number to %d and retrying. %v", tableName, status, attemptNumber, err) + log.Debugf("delta-go: Table %s status: %s. Incrementing attempt number to %d and retrying. %v", name, status, attemptNumber, err) continue } diff --git a/internal/dynamodbutils/dynamodbmock.go b/internal/dynamodbutils/dynamodbmock.go index 9c48684a..4ef04764 100644 --- a/internal/dynamodbutils/dynamodbmock.go +++ b/internal/dynamodbutils/dynamodbmock.go @@ -16,78 +16,100 @@ import ( "context" "errors" "regexp" + "sync" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/google/go-cmp/cmp" + cmap "github.com/orcaman/concurrent-map/v2" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) -var ( - ErrorConditionExpressionNotSatisfied error = errors.New("condition expression not satisfied") - ErrorTableDoesNotExist error = errors.New("table does not exist") - ErrorCannotFindItems error = errors.New("cannot find items") -) - -// Stores the partition and sort key for a DynamoDB table (a primary key is composed of a partition and sort key) -type DynamoDBPrimaryKey struct { +// PrimaryKey stores the partition and sort key for a DynamoDB table (a primary key is composed of a partition and sort key). +type PrimaryKey struct { partitionKey string sortKey string } -// Stores the data structures used to mock DynamoDB -type MockDynamoDBClient struct { - DynamoDBClient - tablesToPrimaryKeys map[string]DynamoDBPrimaryKey - tablesToItems map[string][]map[string]types.AttributeValue +// MockClient stores the data structures used to mock DynamoDB. +type MockClient struct { + Client + tablesToKeys cmap.ConcurrentMap[string, PrimaryKey] + tablesToItems cmap.ConcurrentMap[string, []map[string]types.AttributeValue] + mu sync.Mutex } -// Compile time check that MockDynamoDBClient implements DynamoDBClient -var _ DynamoDBClient = (*MockDynamoDBClient)(nil) +// Compile time check that MockClient implements DynamoDBClient +var _ Client = (*MockClient)(nil) + +// NewMockClient creates a new MockClient instance. +func NewMockClient() *MockClient { + m := new(MockClient) + m.tablesToKeys = cmap.New[PrimaryKey]() + m.tablesToItems = cmap.New[[]map[string]types.AttributeValue]() -// Creates a new MockDynamoDBClient instance -func NewMockClient() *MockDynamoDBClient { - m := new(MockDynamoDBClient) - m.tablesToPrimaryKeys = make(map[string]DynamoDBPrimaryKey) - m.tablesToItems = make(map[string][]map[string]types.AttributeValue) return m } -// Gets the map of DynamoDB tables to primary keys -func (m *MockDynamoDBClient) GetTablesToPrimaryKeys() map[string]DynamoDBPrimaryKey { - return m.tablesToPrimaryKeys +// TablesToKeys gets the map of DynamoDB tables to primary keys. +func (m *MockClient) TablesToKeys() cmap.ConcurrentMap[string, PrimaryKey] { + return m.tablesToKeys } -// Gets the map of DynamoDB tables to DynamoDB items -func (m *MockDynamoDBClient) GetTablesToItems() map[string][]map[string]types.AttributeValue { +// TablesToItems gets the map of DynamoDB tables to DynamoDB items. +func (m *MockClient) TablesToItems() cmap.ConcurrentMap[string, []map[string]types.AttributeValue] { return m.tablesToItems } -// Gets an item from a mock DynamoDB table -func (m *MockDynamoDBClient) GetItem(_ context.Context, input *dynamodb.GetItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { - _, ok := m.tablesToItems[*input.TableName] +// GetItem gets a shallow clone of an item in a mock DynamoDB table. +func (m *MockClient) GetItem(_ context.Context, input *dynamodb.GetItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + + items, ok := m.tablesToItems.Get(*input.TableName) if !ok { - return &dynamodb.GetItemOutput{}, ErrorTableDoesNotExist + return &dynamodb.GetItemOutput{}, errors.New("table does not exist") } - for _, item := range m.tablesToItems[*input.TableName] { - if IsMapSubset[string, types.AttributeValue](item, input.Key, cmp.AllowUnexported(types.AttributeValueMemberS{})) { - return &dynamodb.GetItemOutput{Item: item}, nil + for _, item := range items { + if m.isMatch(item, input.Key, cmp.AllowUnexported(types.AttributeValueMemberS{})) { + copiedItem := maps.Clone(item) + + return &dynamodb.GetItemOutput{Item: copiedItem}, nil } } return &dynamodb.GetItemOutput{}, nil } -// Puts an item into a mock DynamoDB table -func (m *MockDynamoDBClient) PutItem(_ context.Context, input *dynamodb.PutItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) { - _, ok := m.tablesToPrimaryKeys[*input.TableName] +// getNonClonedItem gets a non-cloned item from a mock DynamoDB table. +func (m *MockClient) getNonClonedItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) { + items, ok := m.tablesToItems.Get(*input.TableName) if !ok { - return &dynamodb.PutItemOutput{}, ErrorTableDoesNotExist + return &dynamodb.GetItemOutput{}, errors.New("table does not exist") } - _, ok = m.tablesToItems[*input.TableName] + + for _, item := range items { + if m.isMatch(item, input.Key, cmp.AllowUnexported(types.AttributeValueMemberS{})) { + return &dynamodb.GetItemOutput{Item: item}, nil + } + } + + return &dynamodb.GetItemOutput{}, nil +} + +// PutItem puts an item into a mock DynamoDB table. +func (m *MockClient) PutItem(_ context.Context, input *dynamodb.PutItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.tablesToKeys.Get(*input.TableName); !ok { + return &dynamodb.PutItemOutput{}, errors.New("table does not exist") + } + items, ok := m.tablesToItems.Get(*input.TableName) if !ok { - return &dynamodb.PutItemOutput{}, ErrorTableDoesNotExist + return &dynamodb.PutItemOutput{}, errors.New("table does not exist") } matched := false @@ -95,122 +117,148 @@ func (m *MockDynamoDBClient) PutItem(_ context.Context, input *dynamodb.PutItemI matched, _ = regexp.MatchString(`attribute_not_exists\(([A-z]+)\)`, *input.ConditionExpression) } if matched { - pattern := regexp.MustCompile(`attribute_not_exists\(([A-z]+)\)`) - subStrs := pattern.FindStringSubmatch(*input.ConditionExpression) - gio, _ := m.GetItem(context.TODO(), &dynamodb.GetItemInput{TableName: input.TableName, Key: map[string]types.AttributeValue{subStrs[1]: input.Item[subStrs[1]]}}) - if gio.Item != nil { - return &dynamodb.PutItemOutput{}, ErrorConditionExpressionNotSatisfied + var ( + pattern = regexp.MustCompile(`attribute_not_exists\(([A-z]+)\)`) + subStrs = pattern.FindStringSubmatch(*input.ConditionExpression) + ) + if gio, _ := m.getNonClonedItem(&dynamodb.GetItemInput{TableName: input.TableName, Key: map[string]types.AttributeValue{subStrs[1]: input.Item[subStrs[1]]}}); gio.Item != nil { + return &dynamodb.PutItemOutput{}, errors.New("condition expression not satisfied") } } - gio, _ := m.GetItem(context.TODO(), &dynamodb.GetItemInput{TableName: input.TableName, Key: map[string]types.AttributeValue{m.tablesToPrimaryKeys[*input.TableName].partitionKey: input.Item[m.tablesToPrimaryKeys[*input.TableName].partitionKey], m.tablesToPrimaryKeys[*input.TableName].sortKey: input.Item[m.tablesToPrimaryKeys[*input.TableName].sortKey]}}) - if gio.Item != nil { - posInSlice := slices.IndexFunc(m.tablesToItems[*input.TableName], func(i map[string]types.AttributeValue) bool { - return cmp.Equal(i, gio.Item, cmp.AllowUnexported(types.AttributeValueMemberS{})) + key, ok := m.tablesToKeys.Get(*input.TableName) + if !ok { + return &dynamodb.PutItemOutput{}, errors.New("table does not exist") + } + if gio, _ := m.getNonClonedItem(&dynamodb.GetItemInput{TableName: input.TableName, Key: map[string]types.AttributeValue{key.partitionKey: input.Item[key.partitionKey], key.sortKey: input.Item[key.sortKey]}}); gio.Item != nil { + items, ok := m.tablesToItems.Get(*input.TableName) + if !ok { + return &dynamodb.PutItemOutput{}, errors.New("table does not exist") + } + + pos := slices.IndexFunc(items, func(i map[string]types.AttributeValue) bool { + return cmp.Equal(i, gio.Item, cmp.AllowUnexported(types.AttributeValueMemberS{}), cmp.AllowUnexported(types.AttributeValueMemberN{})) }) - m.tablesToItems[*input.TableName] = slices.Replace[[]map[string]types.AttributeValue](m.tablesToItems[*input.TableName], posInSlice, posInSlice+1, input.Item) + m.tablesToItems.Set(*input.TableName, slices.Replace[[]map[string]types.AttributeValue](items, pos, pos+1, input.Item)) + return &dynamodb.PutItemOutput{}, nil } - m.tablesToItems[*input.TableName] = append(m.tablesToItems[*input.TableName], input.Item) + m.tablesToItems.Set(*input.TableName, append(items, input.Item)) + return &dynamodb.PutItemOutput{}, nil } -// Updates an item in a mock DynamoDB table -func (m *MockDynamoDBClient) UpdateItem(_ context.Context, input *dynamodb.UpdateItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) { +// UpdateItem does nothing since it is not required to be implemented. +func (m *MockClient) UpdateItem(_ context.Context, input *dynamodb.UpdateItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) { return &dynamodb.UpdateItemOutput{}, nil } -// Deletes an item from a mock DynamoDB table -func (m *MockDynamoDBClient) DeleteItem(_ context.Context, input *dynamodb.DeleteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) { - _, ok := m.tablesToItems[*input.TableName] +// DeleteItem deletes an item from a mock DynamoDB table. +func (m *MockClient) DeleteItem(_ context.Context, input *dynamodb.DeleteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + + items, ok := m.tablesToItems.Get(*input.TableName) if !ok { - return &dynamodb.DeleteItemOutput{}, ErrorTableDoesNotExist + return &dynamodb.DeleteItemOutput{}, errors.New("table does not exist") } var itemToDelete map[string]types.AttributeValue - for _, item := range m.tablesToItems[*input.TableName] { - if IsMapSubset[string, types.AttributeValue](item, input.Key, cmp.AllowUnexported(types.AttributeValueMemberS{})) { + for _, item := range items { + if m.isMatch(item, input.Key, cmp.AllowUnexported(types.AttributeValueMemberS{})) { itemToDelete = item + break } } - posInSlice := slices.IndexFunc(m.tablesToItems[*input.TableName], func(v map[string]types.AttributeValue) bool { + pos := slices.IndexFunc(items, func(v map[string]types.AttributeValue) bool { return cmp.Equal(v, itemToDelete, cmp.AllowUnexported(types.AttributeValueMemberS{})) }) - m.tablesToItems[*input.TableName] = slices.Delete(m.tablesToItems[*input.TableName], posInSlice, posInSlice+1) + m.tablesToItems.Set(*input.TableName, slices.Delete(items, pos, pos+1)) + return &dynamodb.DeleteItemOutput{}, nil } -// Creates a mock DynamoDB table -func (m *MockDynamoDBClient) CreateTable(_ context.Context, input *dynamodb.CreateTableInput, _ ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error) { - m.tablesToPrimaryKeys[*input.TableName] = DynamoDBPrimaryKey{} +// CreateTable creates a mock DynamoDB table. +func (m *MockClient) CreateTable(_ context.Context, input *dynamodb.CreateTableInput, _ ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() - posInSlice := slices.IndexFunc(input.KeySchema, func(kse types.KeySchemaElement) bool { + m.tablesToKeys.Set(*input.TableName, PrimaryKey{}) + + pos := slices.IndexFunc(input.KeySchema, func(kse types.KeySchemaElement) bool { return kse.KeyType == types.KeyTypeHash }) - primaryKey, ok := m.tablesToPrimaryKeys[*input.TableName] - if ok && posInSlice != -1 { - primaryKey.partitionKey = *input.KeySchema[posInSlice].AttributeName - m.tablesToPrimaryKeys[*input.TableName] = primaryKey + key, ok := m.tablesToKeys.Get(*input.TableName) + if ok && pos != -1 { + key.partitionKey = *input.KeySchema[pos].AttributeName + m.tablesToKeys.Set(*input.TableName, key) } - posInSlice = slices.IndexFunc(input.KeySchema, func(kse types.KeySchemaElement) bool { + pos = slices.IndexFunc(input.KeySchema, func(kse types.KeySchemaElement) bool { return kse.KeyType == types.KeyTypeRange }) - primaryKey, ok = m.tablesToPrimaryKeys[*input.TableName] - if ok && posInSlice != -1 { - primaryKey.sortKey = *input.KeySchema[posInSlice].AttributeName - m.tablesToPrimaryKeys[*input.TableName] = primaryKey + key, ok = m.tablesToKeys.Get(*input.TableName) + if ok && pos != -1 { + key.sortKey = *input.KeySchema[pos].AttributeName + m.tablesToKeys.Set(*input.TableName, key) } - m.tablesToItems[*input.TableName] = []map[string]types.AttributeValue{} + m.tablesToItems.Set(*input.TableName, []map[string]types.AttributeValue{}) + return &dynamodb.CreateTableOutput{}, nil } -// Describes a mock DynamoDB table -func (m *MockDynamoDBClient) DescribeTable(_ context.Context, input *dynamodb.DescribeTableInput, _ ...func(*dynamodb.Options)) (*dynamodb.DescribeTableOutput, error) { - _, ok := m.tablesToItems[*input.TableName] - if ok { +// DescribeTable describes a mock DynamoDB table. +func (m *MockClient) DescribeTable(_ context.Context, input *dynamodb.DescribeTableInput, _ ...func(*dynamodb.Options)) (*dynamodb.DescribeTableOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.tablesToItems.Get(*input.TableName); ok { return &dynamodb.DescribeTableOutput{Table: &types.TableDescription{TableStatus: "ACTIVE"}}, nil } - return &dynamodb.DescribeTableOutput{}, ErrorTableDoesNotExist + return &dynamodb.DescribeTableOutput{}, errors.New("table does not exist") } -// Queries a mock DynamoDB table -func (m *MockDynamoDBClient) Query(_ context.Context, input *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) { - _, ok := m.tablesToItems[*input.TableName] +// Query queries a mock DynamoDB table. +func (m *MockClient) Query(_ context.Context, input *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error) { + m.mu.Lock() + defer m.mu.Unlock() + + items, ok := m.tablesToItems.Get(*input.TableName) if !ok { - return &dynamodb.QueryOutput{}, ErrorTableDoesNotExist + return &dynamodb.QueryOutput{}, errors.New("table does not exist") } pattern := regexp.MustCompile("([A-z]+) = (:[A-z]+)") - subStrs := pattern.FindStringSubmatch(*input.KeyConditionExpression) + submatches := pattern.FindStringSubmatch(*input.KeyConditionExpression) - items := []map[string]types.AttributeValue{} - for _, item := range m.tablesToItems[*input.TableName] { - if IsMapSubset[string, types.AttributeValue](item, map[string]types.AttributeValue{subStrs[1]: input.ExpressionAttributeValues[subStrs[2]]}, cmp.AllowUnexported(types.AttributeValueMemberS{})) { - items = append(items, item) + matchingItems := []map[string]types.AttributeValue{} + for _, item := range items { + if m.isMatch(item, map[string]types.AttributeValue{submatches[1]: input.ExpressionAttributeValues[submatches[2]]}, cmp.AllowUnexported(types.AttributeValueMemberS{})) { + matchingItems = append(matchingItems, item) } } - if len(items) != 0 { - slices.Reverse[[]map[string]types.AttributeValue](items) - return &dynamodb.QueryOutput{Items: items}, nil + if len(matchingItems) != 0 { + slices.Reverse[[]map[string]types.AttributeValue](matchingItems) + + return &dynamodb.QueryOutput{Items: matchingItems}, nil } - return &dynamodb.QueryOutput{}, ErrorCannotFindItems + return &dynamodb.QueryOutput{}, errors.New("cannot find items") } -// Checks if a map is a subset of another map -func IsMapSubset[K, V comparable](m map[K]V, sub map[K]V, opts ...cmp.Option) bool { - if len(sub) > len(m) { +// isMatch checks if an item possesses a certain primary key. +func (m *MockClient) isMatch(item map[string]types.AttributeValue, key map[string]types.AttributeValue, opts ...cmp.Option) bool { + if len(key) > len(item) { return false } - for k, vsub := range sub { - if vm, found := m[k]; !found || !cmp.Equal(vm, vsub, opts...) { + for i, key := range key { + if attributeName, found := item[i]; !found || !cmp.Equal(attributeName, key, opts...) { return false } } diff --git a/internal/s3utils/s3client.go b/internal/s3utils/s3client.go index a3b490d6..fc05b4eb 100644 --- a/internal/s3utils/s3client.go +++ b/internal/s3utils/s3client.go @@ -18,8 +18,8 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" ) -// Defines methods implemented by s3.Client -type S3Client interface { +// Client defines methods implemented by AWS SDK for Go v2's S3 client. +type Client interface { CopyObject(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) diff --git a/internal/s3utils/s3mock.go b/internal/s3utils/s3mock.go index 2ac49d49..1ac479fb 100644 --- a/internal/s3utils/s3mock.go +++ b/internal/s3utils/s3mock.go @@ -29,9 +29,9 @@ import ( "github.com/rivian/delta-go/storage/filestore" ) -type MockS3Client struct { +type MockClient struct { // Use a FileObjectStore to mock S3 storage - fileStore filestore.FileObjectStore + fileStore *filestore.FileObjectStore // The S3 store path s3StorePath string // For testing: if MockError is set, any S3ClientAPI function called will return that error @@ -41,16 +41,17 @@ type MockS3Client struct { } // Compile time check that MockS3Client implements S3Client -var _ S3Client = (*MockS3Client)(nil) +var _ Client = (*MockClient)(nil) // NewMockClient creates a mock S3 client that uses a filestore in a temporary directory to // store, retrieve, and manipulate files -func NewMockClient(t *testing.T, baseURI storage.Path) (*MockS3Client, error) { +func NewMockClient(t *testing.T, baseURI storage.Path) (*MockClient, error) { tmpDir := t.TempDir() tmpPath := storage.NewPath(tmpDir) - fileStore := filestore.FileObjectStore{BaseURI: tmpPath} - client := new(MockS3Client) - client.fileStore = fileStore + var fileStore filestore.FileObjectStore + fileStore.SetBaseURI(tmpPath) + client := new(MockClient) + client.fileStore = &fileStore // The mock client needs information about the S3 store's path to avoid edge cases during List baseURL, err := baseURI.ParseURL() if err != nil { @@ -65,22 +66,22 @@ func NewMockClient(t *testing.T, baseURI storage.Path) (*MockS3Client, error) { } // FileStore gets the file store. -func (m *MockS3Client) FileStore() filestore.FileObjectStore { +func (m *MockClient) FileStore() *filestore.FileObjectStore { return m.fileStore } // S3StorePath gets the S3 store path. -func (m *MockS3Client) S3StorePath() string { +func (m *MockClient) S3StorePath() string { return m.s3StorePath } // SetFileStore sets the file store. -func (m *MockS3Client) SetFileStore(store filestore.FileObjectStore) { +func (m *MockClient) SetFileStore(store *filestore.FileObjectStore) { m.fileStore = store } // SetS3StorePath sets the S3 store path. -func (m *MockS3Client) SetS3StorePath(path string) { +func (m *MockClient) SetS3StorePath(path string) { m.s3StorePath = path } @@ -93,7 +94,7 @@ func getFilePathFromS3Input(bucket string, key string) (storage.Path, error) { return storage.NewPath(filePath), nil } -func (m *MockS3Client) HeadObject(ctx context.Context, input *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { +func (m *MockClient) HeadObject(ctx context.Context, input *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { if m.MockError != nil { return nil, m.MockError } @@ -113,7 +114,7 @@ func (m *MockS3Client) HeadObject(ctx context.Context, input *s3.HeadObjectInput return headObjectOutput, nil } -func (m *MockS3Client) PutObject(ctx context.Context, input *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) { +func (m *MockClient) PutObject(ctx context.Context, input *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) { if m.MockError != nil { return nil, m.MockError } @@ -130,7 +131,7 @@ func (m *MockS3Client) PutObject(ctx context.Context, input *s3.PutObjectInput, return putObjectOutput, err } -func (m *MockS3Client) GetObject(ctx context.Context, input *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { +func (m *MockClient) GetObject(ctx context.Context, input *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { if m.MockError != nil { return nil, m.MockError } @@ -150,7 +151,7 @@ func (m *MockS3Client) GetObject(ctx context.Context, input *s3.GetObjectInput, return getObjectOutput, nil } -func (m *MockS3Client) CopyObject(ctx context.Context, input *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { +func (m *MockClient) CopyObject(ctx context.Context, input *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) { if m.MockError != nil { return nil, m.MockError } @@ -175,7 +176,7 @@ func (m *MockS3Client) CopyObject(ctx context.Context, input *s3.CopyObjectInput return copyObjectOutput, nil } -func (m *MockS3Client) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { +func (m *MockClient) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { if m.MockError != nil { return nil, m.MockError } @@ -193,7 +194,7 @@ func (m *MockS3Client) DeleteObject(ctx context.Context, input *s3.DeleteObjectI return deleteObjectOutput, nil } -func (m *MockS3Client) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { +func (m *MockClient) ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { if m.MockError != nil { return nil, m.MockError } @@ -266,7 +267,7 @@ func getFilePath(baseURI storage.Path, location storage.Path) (storage.Path, err } // getFile returns a file from the underlying filestore, for use in unit tests -func (m *MockS3Client) GetFile(baseURI storage.Path, location storage.Path) ([]byte, error) { +func (m *MockClient) GetFile(baseURI storage.Path, location storage.Path) ([]byte, error) { filePath, err := getFilePath(baseURI, location) if err != nil { return nil, err @@ -275,7 +276,7 @@ func (m *MockS3Client) GetFile(baseURI storage.Path, location storage.Path) ([]b } // putFile writes data to a file in the underlying filestore for use in unit tests -func (m *MockS3Client) PutFile(baseURI storage.Path, location storage.Path, data []byte) error { +func (m *MockClient) PutFile(baseURI storage.Path, location storage.Path, data []byte) error { filePath, err := getFilePath(baseURI, location) if err != nil { return err @@ -284,7 +285,7 @@ func (m *MockS3Client) PutFile(baseURI storage.Path, location storage.Path, data } // fileExists checks if a file exists in the underlying filestore for use in unit tests -func (m *MockS3Client) FileExists(baseURI storage.Path, location storage.Path) (bool, error) { +func (m *MockClient) FileExists(baseURI storage.Path, location storage.Path) (bool, error) { filePath, err := getFilePath(baseURI, location) if err != nil { return false, err diff --git a/lock/dynamolock/dynamolock.go b/lock/dynamolock/dynamolock.go index 1df6d964..1d33c3e3 100644 --- a/lock/dynamolock/dynamolock.go +++ b/lock/dynamolock/dynamolock.go @@ -41,7 +41,7 @@ type DynamoLock struct { lockClient *dynamolock.Client lockedItem *dynamolock.Lock key string - dynamoClient dynamodbutils.DynamoDBClient + dynamoClient dynamodbutils.Client opts Options } @@ -78,7 +78,7 @@ func (opts *Options) setOptionsDefaults() { } // Creates a new DynamoLock instance -func New(client dynamodbutils.DynamoDBClient, tableName string, key string, opts Options) (*DynamoLock, error) { +func New(client dynamodbutils.Client, tableName string, key string, opts Options) (*DynamoLock, error) { opts.setOptionsDefaults() lc, err := dynamolock.New(client, @@ -103,7 +103,7 @@ func New(client dynamodbutils.DynamoDBClient, tableName string, key string, opts }, TableName: aws.String(tableName), } - dynamodbutils.TryEnsureDynamoDBTableExists(client, tableName, createTableInput, opts.MaxRetryTableCreateAttempts) + dynamodbutils.CreateTableIfNotExists(client, tableName, createTableInput, opts.MaxRetryTableCreateAttempts) l := new(DynamoLock) l.tableName = tableName @@ -111,15 +111,25 @@ func New(client dynamodbutils.DynamoDBClient, tableName string, key string, opts l.lockClient = lc l.opts = opts l.dynamoClient = client + return l, nil } // Creates a new DynamoLock instance using an existing DynamoLock instance func (l *DynamoLock) NewLock(key string) (lock.Locker, error) { + lc, err := dynamolock.New(l.dynamoClient, + l.tableName, + dynamolock.WithLeaseDuration(l.opts.TTL), + dynamolock.WithHeartbeatPeriod(l.opts.HeartBeat), + ) + if err != nil { + return nil, err + } + nl := new(DynamoLock) nl.tableName = l.tableName - nl.lockClient = l.lockClient nl.key = key + nl.lockClient = lc nl.dynamoClient = l.dynamoClient nl.opts = l.opts diff --git a/lock/dynamolock/dynamolock_test.go b/lock/dynamolock/dynamolock_test.go index 2fcae870..15724482 100644 --- a/lock/dynamolock/dynamolock_test.go +++ b/lock/dynamolock/dynamolock_test.go @@ -112,7 +112,11 @@ func TestDeleteOnRelease(t *testing.T) { t.Error("Lock should be expired") } - if len(client.GetTablesToItems()["delta_lock_table"]) != 0 { + items, ok := client.TablesToItems().Get("delta_lock_table") + if !ok { + t.Error("Failed to get table items") + } + if len(items) != 0 { t.Error("Lock should be deleted on release") } @@ -140,7 +144,11 @@ func TestDeleteOnRelease(t *testing.T) { t.Error("Lock should be expired") } - if len(client.GetTablesToItems()["delta_lock_table"]) != 1 { + items, ok = client.TablesToItems().Get("delta_lock_table") + if !ok { + t.Error("Failed to get table items") + } + if len(items) != 1 { t.Error("Lock should not be deleted on release") } } diff --git a/lock/lock.go b/lock/lock.go index 89e62d20..b3cca775 100644 --- a/lock/lock.go +++ b/lock/lock.go @@ -25,8 +25,8 @@ var ( // The data can be used to provide information about the application using the lock including // the prior lock client version. type Locker interface { - // Creates a new lock using an existing lock object - NewLock(string) (Locker, error) + // Creates a new lock using an existing lock instance + NewLock(key string) (Locker, error) // Releases the lock // Otherwise returns ErrorUnableToUnlock. diff --git a/logstore/commitentry.go b/logstore/commitentry.go new file mode 100644 index 00000000..66d4a25b --- /dev/null +++ b/logstore/commitentry.go @@ -0,0 +1,85 @@ +// Copyright 2023 Rivian Automotive, Inc. +// Licensed under the Apache License, Version 2.0 (the “License”); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an “AS IS” BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package logstore + +import ( + "time" + + "github.com/rivian/delta-go/storage" +) + +// CommitEntry represents an entry in a log store for a given commit in the Delta log. +type CommitEntry struct { + // Absolute path of a Delta table + tablePath storage.Path + // File name for a commit, e.g. "000000N.json" + fileName storage.Path + // Path to the temp file for a commit, relative to the Delta log + tempPath storage.Path + // True if the temp file has been successfully copied to its destination location, otherwise false + isComplete bool + // Epoch seconds at which a completed commit entry is safe to be deleted + expirationTime uint64 +} + +// New creates a new CommitEntry instance. +func New(tablePath storage.Path, fileName storage.Path, tempPath storage.Path, isComplete bool, expirationTime uint64) (*CommitEntry, error) { + ce := new(CommitEntry) + ce.tablePath = tablePath + ce.fileName = fileName + ce.tempPath = tempPath + ce.isComplete = isComplete + ce.expirationTime = expirationTime + + return ce, nil +} + +// TablePath gets the table path for a commit entry. +func (ce *CommitEntry) TablePath() storage.Path { + return ce.tablePath +} + +// FileName gets the file name for a commit entry. +func (ce *CommitEntry) FileName() storage.Path { + return ce.fileName +} + +// TempPath gets the temp path for a commit entry. +func (ce *CommitEntry) TempPath() storage.Path { + return ce.tempPath +} + +// IsComplete gets the completion status of a commit entry. +func (ce *CommitEntry) IsComplete() bool { + return ce.isComplete +} + +// ExpireTime gets the expiration time of a commit entry. +func (ce *CommitEntry) ExpirationTime() uint64 { + return ce.expirationTime +} + +// Complete completes a commit entry. +func (ce *CommitEntry) Complete(expirationDelaySeconds uint64) (*CommitEntry, error) { + return New(ce.tablePath, ce.fileName, ce.tempPath, true, uint64(time.Now().Unix())+expirationDelaySeconds) +} + +// AbsoluteFilePath gets the absolute file path for a commit entry. +func (ce *CommitEntry) AbsoluteFilePath() (storage.Path, error) { + return storage.PathFromIter([]string{ce.tablePath.Raw, "_delta_log", ce.fileName.Raw}), nil +} + +// AbsoluteTempPath gets the absolute temp path for a commit entry. +func (ce *CommitEntry) AbsoluteTempPath() (storage.Path, error) { + return storage.PathFromIter([]string{ce.tablePath.Raw, "_delta_log", ce.tempPath.Raw}), nil +} diff --git a/logstore/dynamodblogstore/dynamodblogstore.go b/logstore/dynamodblogstore/dynamodblogstore.go index 5619a99b..e31c1351 100644 --- a/logstore/dynamodblogstore/dynamodblogstore.go +++ b/logstore/dynamodblogstore/dynamodblogstore.go @@ -10,11 +10,10 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package logstore +package dynamodblogstore import ( "context" - "errors" "fmt" "strconv" @@ -27,158 +26,145 @@ import ( log "github.com/sirupsen/logrus" ) -var ( - // Compile time check that DynamoDBLogStore implements logstore.LogStore - _ logstore.LogStore = (*DynamoDBLogStore)(nil) - ErrorUnableToGetCommitEntry error = errors.New("unable to get commit entry") -) - -// Represents attribute names in DynamoDB items -type Attribute string +// attribute represents attribute names in DynamoDB items. +type attribute string const ( - // DynamoDB table attribute keys - TablePath Attribute = "tablePath" - FileName Attribute = "fileName" - TempPath Attribute = "tempPath" - Complete Attribute = "complete" - ExpireTime Attribute = "expireTime" + // DynamoDB table attribute names + TablePath attribute = "tablePath" + FileName attribute = "fileName" + TempPath attribute = "tempPath" + Complete attribute = "complete" + ExpireTime attribute = "expireTime" - // The delay, in seconds, after a commit entry has been committed to the delta log at which - // point it is safe to be deleted from the log store. + // The delay, in seconds, after a commit entry has been committed to a Delta log at which + // point it is safe to be deleted from a log store. - // We want a delay long enough such that, after the commit entry has been deleted, another - // write attempt for the SAME delta log commit can FAIL using ONLY the file system's existence - // check (e.g. `Stat(fs, path)`). Recall we assume that the file system does not provide mutual - // exclusion. + // We want a delay long enough such that, after a commit entry has been deleted, another + // write attempt for the SAME Delta log commit can FAIL using ONLY the file system's existence + // check. Recall we assume that the file system does not provide mutual exclusion. // We use a value of 1 day. // If we choose too small of a value, like 0 seconds, then the following scenario is possible: - // - t0: Writers W1 and W2 start writing data files - // - t1: W1 begins to try and write into the `_delta_log` + // - t0: Writers W1 and W2 start writing data files. + // - t1: W1 begins to try and write to the Delta log. // - t2: W1 checks if N.json exists in file system. It doesn't. - // - t3: W1 writes actions into temp file T1(N) - // - t4: W1 writes to log store entry E1(N, complete=false) - // - t5: W1 copies (with overwrite=false) T1(N) into N.json - // - t6: W1 overwrites entry in log store E1(N, complete=true, expireTime=now+0) - // - t7: E1 is safe to be deleted, and some log store TTL mechanism deletes E1 - // - t8: W2 begins to try and write into the `_delta_log` - // - t9: W1 checks if N.json exists in file system, but too little time has transpired between - // t5 and t9 that the file system check (fs.exists(path)) returns FALSE. + // - t3: W1 writes actions into temp file T1(N). + // - t4: W1 puts uncompleted commit entry E1(N). + // - t5: W1 copies, with overwriting disabled, T1(N) into N.json. + // - t6: W1 overwrites and completes commit entry E1(N), setting the expiration time to the + // current time. + // - t7: E1 is safe to be deleted, and some log store TTL mechanism deletes E1. + // - t8: W2 begins to try and write to the Delta log. + // - t9: W1 checks if N.json exists in the file system, but too little time has transpired + // between t5 and t9 that the file system existence check returns FALSE. // Note: This isn't possible on S3 (which provides strong consistency) but could be // possible on eventually-consistent systems. - // - t10: W2 writes actions into temp file T2(N) - // - t11: W2 writes to log store entry E2(N, complete=false) - // - t12: W2 successfully copies (with overwrite=false) T2(N) into N.json. File system didn't - // provide the necessary mutual exclusion, so the copy succeeded. Thus, DATA LOSS HAS - // OCCURRED. + // - t10: W2 writes actions into temp file T2(N). + // - t11: W2 puts uncompleted commit entry E2(N). + // - t12: W2 successfully copies, with overwriting disabled, T2(N) into N.json. The file system + // didn't provide the ncessary mutual exclusion, so the copy succeeded. Thus, DATA LOSS + // HAS OCCURRED. // By using an expiration delay of 1 day, we ensure one of the steps at t9 or t12 will fail. - DefaultCommitEntryExpirationDelaySeconds uint64 = 24 * 60 * 60 - DefaultMaxRetryTableCreateAttempts uint16 = 20 - DefaultRCU int64 = 5 - DefaultWCU int64 = 5 + defaultEntryExpirationDelaySeconds uint64 = 24 * 60 * 60 + defaultMaxTableCreateAttempts uint16 = 20 + defaultRCU int64 = 5 + defaultWCU int64 = 5 ) // A concrete implementation of LogStore that uses a DynamoDB table // to provide the mutual exclusion during calls to `Put`. -// DynamoDB entries are of form +// DynamoDB entries are of the form // - key // -- tablePath (HASH, STRING) // -- fileName (RANGE, STRING) // - attributes -// -- tempPath (STRING, relative to `_delta_log`) +// -- tempPath (STRING, relative to the Delta log) // -- complete (STRING, representing boolean, "true" or "false") // -- expireTime (NUMBER, epoch seconds) -type DynamoDBLogStore struct { - client dynamodbutils.DynamoDBClient - tableName string - expirationDelaySeconds uint64 - maxRetryTableCreateAttempts uint16 - rcu int64 - wcu int64 +type LogStore struct { + client dynamodbutils.Client + tableName string + expirationDelaySeconds uint64 + maxTableCreateAttempts uint16 + rcu int64 + wcu int64 } -// Options for a DynamoDBLogStore instance -type DynamoDBLogStoreOptions struct { - Config aws.Config - Client dynamodbutils.DynamoDBClient - TableName string - ExpirationDelaySeconds uint64 - MaxRetryTableCreateAttempts uint16 - // The number of read capacity units which can be consumed per second (https://aws.amazon.com/dynamodb/pricing/provisioned/) +// Compile time check that LogStore implements logstore.LogStore +var _ logstore.LogStore = (*LogStore)(nil) + +// Options contains settings that can be adjusted to change the behavior of LogStore. +type Options struct { + Client dynamodbutils.Client + TableName string + ExpirationDelaySeconds uint64 + MaxTableCreateAttempts uint16 + // The number of read capacity units which can be consumed per second (https://aws.amazon.com/dynamodb/pricing/provisioned/). RCU int64 - // The number of write capacity units which can be consumed per second (https://aws.amazon.com/dynamodb/pricing/provisioned/) + // The number of write capacity units which can be consumed per second (https://aws.amazon.com/dynamodb/pricing/provisioned/). WCU int64 } -// Gets the client from a DynamoDBLogStore instance -func (ls DynamoDBLogStore) GetClient() dynamodbutils.DynamoDBClient { +// Client gets the DynamoDB client. +func (ls LogStore) Client() any { return ls.client } -// Gets the table name from a DynamoDBLogStore instance -func (ls DynamoDBLogStore) GetTableName() string { +// TableName gets the DynamoDB table name. +func (ls LogStore) TableName() string { return ls.tableName } -// Gets the number of expiration delay seconds from a DynamoDBLogStore instance -func (ls DynamoDBLogStore) GetExpirationDelaySeconds() uint64 { +// ExpirationDelaySeconds gets the number of seconds until a commit entry expires. +func (ls LogStore) ExpirationDelaySeconds() uint64 { return ls.expirationDelaySeconds } -// Gets the maximum number of table creation retry attempts from a DynamoDBLogStore instance -func (ls DynamoDBLogStore) GetMaxRetryTableCreateAttempts() uint16 { - return ls.maxRetryTableCreateAttempts +// MaxRetryTableCreateAttempts gets the maximum number of table creation attempts. +func (ls LogStore) MaxRetryTableCreateAttempts() uint16 { + return ls.maxTableCreateAttempts } -// Creates a new DynamoDBLogStore instance -func NewDynamoDBLogStore(lso DynamoDBLogStoreOptions) (*DynamoDBLogStore, error) { - ls := new(DynamoDBLogStore) - ls.tableName = lso.TableName +// New creates a new LogStore instance. +func New(o Options) (*LogStore, error) { + ls := new(LogStore) + ls.tableName = o.TableName - if lso.ExpirationDelaySeconds != 0 { - ls.expirationDelaySeconds = lso.ExpirationDelaySeconds + if o.ExpirationDelaySeconds != 0 { + ls.expirationDelaySeconds = o.ExpirationDelaySeconds } else { - ls.expirationDelaySeconds = DefaultCommitEntryExpirationDelaySeconds + ls.expirationDelaySeconds = defaultEntryExpirationDelaySeconds } - if lso.MaxRetryTableCreateAttempts != 0 { - ls.maxRetryTableCreateAttempts = lso.MaxRetryTableCreateAttempts + if o.MaxTableCreateAttempts != 0 { + ls.maxTableCreateAttempts = o.MaxTableCreateAttempts } else { - ls.maxRetryTableCreateAttempts = DefaultMaxRetryTableCreateAttempts + ls.maxTableCreateAttempts = defaultMaxTableCreateAttempts } - if lso.RCU != 0 { - ls.rcu = lso.RCU + if o.RCU != 0 { + ls.rcu = o.RCU } else { - ls.rcu = DefaultRCU + ls.rcu = defaultRCU } - if lso.WCU != 0 { - ls.wcu = lso.WCU + if o.WCU != 0 { + ls.wcu = o.WCU } else { - ls.wcu = DefaultWCU + ls.wcu = defaultWCU } + ls.client = o.Client + log.Infof("delta-go: Using table name %s", ls.tableName) log.Infof("delta-go: Using TTL (seconds) %d", ls.expirationDelaySeconds) - var err error - if lso.Client == nil { - ls.client, err = ls.getClient(lso.Config) - if err != nil { - log.Debugf("delta-go: Failed to get DynamoDB client. %v", err) - return nil, err - } - } else { - ls.client = lso.Client - } - - createTableInput := dynamodb.CreateTableInput{ + cti := dynamodb.CreateTableInput{ AttributeDefinitions: []types.AttributeDefinition{ { AttributeName: aws.String(string(TablePath)), @@ -205,97 +191,94 @@ func NewDynamoDBLogStore(lso DynamoDBLogStoreOptions) (*DynamoDBLogStore, error) }, TableName: aws.String(ls.tableName), } - dynamodbutils.TryEnsureDynamoDBTableExists(ls.client, ls.tableName, createTableInput, ls.maxRetryTableCreateAttempts) + dynamodbutils.CreateTableIfNotExists(ls.client, ls.tableName, cti, ls.maxTableCreateAttempts) return ls, nil } -// Puts an entry into a DynamoDBLogStore instance in an exclusive way -func (ls *DynamoDBLogStore) Put(entry *logstore.CommitEntry, overwrite bool) error { - log.Debugf("delta-go: PutItem (tablePath %s, fileName %s, tempPath %s, complete %t, expireTime %d, overwrite %t)", entry.TablePath, entry.FileName, entry.TempPath, entry.Complete, entry.ExpireTime, overwrite) +// Put puts a commit entry into a log store in an exclusive way. +func (ls *LogStore) Put(entry *logstore.CommitEntry, overwrite bool) error { + log.Debugf("delta-go: PutItem (tablePath %s, fileName %s, tempPath %s, complete %t, expireTime %d, overwrite %t)", entry.TablePath(), entry.FileName(), entry.TempPath(), entry.IsComplete(), entry.ExpirationTime(), overwrite) pir, err := ls.createPutItemRequest(entry, overwrite) if err != nil { - log.Debugf("delta-go: Failed to create PutItem request. %v", err) - return err + return fmt.Errorf("create put item request: %v", err) } - _, err = ls.client.PutItem(context.TODO(), pir) + if _, err := ls.client.PutItem(context.TODO(), pir); err != nil { + return fmt.Errorf("put item: %v", err) + } - return err + return nil } -// Gets an entry corresponding to the Delta log file with given `tablePath` and `fileName` from a DynamoDBLogStore instance -func (ls *DynamoDBLogStore) Get(tablePath storage.Path, fileName storage.Path) (*logstore.CommitEntry, error) { +// Get gets a commit entry corresponding to the commit log identified by the given table path and file name. +func (ls *LogStore) Get(tablePath storage.Path, fileName storage.Path) (*logstore.CommitEntry, error) { attributes := map[string]types.AttributeValue{string(TablePath): &types.AttributeValueMemberS{Value: tablePath.Raw}, string(FileName): &types.AttributeValueMemberS{Value: fileName.Raw}} gii := dynamodb.GetItemInput{Key: attributes, TableName: aws.String(ls.tableName), ConsistentRead: aws.Bool(true)} gio, err := ls.client.GetItem(context.TODO(), &gii) if err != nil || gio.Item == nil { - log.Debugf("delta-go: Failed GetItem. %v", err) - return nil, errors.Join(err, ErrorUnableToGetCommitEntry) + return nil, fmt.Errorf("get item: %v", err) } - ece, err := ls.dbResultToCommitEntry(gio.Item) + ce, err := ls.mapItemToEntry(gio.Item) if err != nil { - log.Debugf("delta-go: Failed to map a DBB query result item to a CommitEntry. %v", err) - return nil, err + return nil, fmt.Errorf("map item to entry: %v", err) } - return ece, err + return ce, err } -// Gets the latest entry corresponding to the Delta log file for given `tablePath` from a DynamoDBLogStore instance -func (ls *DynamoDBLogStore) GetLatest(tablePath storage.Path) (*logstore.CommitEntry, error) { - qi := dynamodb.QueryInput{TableName: &ls.tableName, ConsistentRead: aws.Bool(true), ScanIndexForward: aws.Bool(false), Limit: aws.Int32(1), ExpressionAttributeValues: map[string]types.AttributeValue{ - ":partitionKey": &types.AttributeValueMemberS{Value: tablePath.Raw}, - }, KeyConditionExpression: aws.String(fmt.Sprintf("%s = :partitionKey", TablePath))} +// Latest gets the commit entry corresponding to the latest commit log for a given table path. +func (ls *LogStore) Latest(tablePath storage.Path) (*logstore.CommitEntry, error) { + qi := dynamodb.QueryInput{TableName: &ls.tableName, ConsistentRead: aws.Bool(true), ScanIndexForward: aws.Bool(false), Limit: aws.Int32(1), + ExpressionAttributeValues: map[string]types.AttributeValue{":partitionKey": &types.AttributeValueMemberS{Value: tablePath.Raw}}, + KeyConditionExpression: aws.String(fmt.Sprintf("%s = :partitionKey", TablePath))} qo, err := ls.client.Query(context.TODO(), &qi) if err != nil { - log.Debugf("delta-go: Failed Query. %v", err) - return nil, err + return nil, fmt.Errorf("query: %v", err) } - ece, err := ls.dbResultToCommitEntry(qo.Items[0]) + ce, err := ls.mapItemToEntry(qo.Items[0]) if err != nil { - log.Debugf("delta-go: Failed to map a DBB query result item to an CommitEntry. %v", err) - return nil, err + return nil, fmt.Errorf("map item to entry: %v", err) } - return ece, nil + return ce, nil } -// Maps a DynamoDB query output item to a CommitEntry instance -func (ls *DynamoDBLogStore) dbResultToCommitEntry(item map[string]types.AttributeValue) (*logstore.CommitEntry, error) { - var expireTimeAttr uint64 +// mapItemToEntry maps an item to a commit entry. +func (ls *LogStore) mapItemToEntry(item map[string]types.AttributeValue) (*logstore.CommitEntry, error) { + var time uint64 var err error - _, ok := item[string(ExpireTime)] - if !ok { - expireTimeAttr = 0 + if _, ok := item[string(ExpireTime)]; !ok { + time = 0 } else { - expireTimeAttr, err = strconv.ParseUint(item[string(ExpireTime)].(*types.AttributeValueMemberN).Value, 10, 64) + time, err = strconv.ParseUint(item[string(ExpireTime)].(*types.AttributeValueMemberN).Value, 10, 64) if err != nil { - log.Debugf("delta-go: Failed to interpet expire time attribute as uint64. %v", err) - return nil, err + return nil, fmt.Errorf("failed to interpret expire time as uint64: %v", err) } } - return logstore.NewCommitEntry( + return logstore.New( storage.NewPath(item[string(TablePath)].(*types.AttributeValueMemberS).Value), storage.NewPath(item[string(FileName)].(*types.AttributeValueMemberS).Value), storage.NewPath(item[string(TempPath)].(*types.AttributeValueMemberS).Value), item[string(Complete)].(*types.AttributeValueMemberS).Value == "true", - expireTimeAttr, + time, ) } -// Creates a put item request for an item to be inserted into a DynamoDBLogStore instance -func (ls *DynamoDBLogStore) createPutItemRequest(entry *logstore.CommitEntry, overwrite bool) (*dynamodb.PutItemInput, error) { - attributes := map[string]types.AttributeValue{string(TablePath): &types.AttributeValueMemberS{Value: entry.TablePath.Raw}, string(FileName): &types.AttributeValueMemberS{Value: entry.FileName.Raw}, string(TempPath): &types.AttributeValueMemberS{Value: entry.TempPath.Raw}, string(Complete): &types.AttributeValueMemberS{Value: *aws.String(strconv.FormatBool(entry.Complete))}} +// createPutItemRequest creates a put item request. +func (ls *LogStore) createPutItemRequest(entry *logstore.CommitEntry, overwrite bool) (*dynamodb.PutItemInput, error) { + attributes := map[string]types.AttributeValue{string(TablePath): &types.AttributeValueMemberS{Value: entry.TablePath().Raw}, + string(FileName): &types.AttributeValueMemberS{Value: entry.FileName().Raw}, string(TempPath): &types.AttributeValueMemberS{Value: entry.TempPath().Raw}, + string(Complete): &types.AttributeValueMemberS{Value: *aws.String(strconv.FormatBool(entry.IsComplete()))}} - if entry.ExpireTime != 0 { - attributes[string(ExpireTime)] = &types.AttributeValueMemberN{Value: *aws.String(fmt.Sprint(entry.ExpireTime))} + if entry.ExpirationTime() != 0 { + attributes[string(ExpireTime)] = &types.AttributeValueMemberN{Value: *aws.String(fmt.Sprint(entry.ExpirationTime()))} } pir := &dynamodb.PutItemInput{ @@ -308,8 +291,3 @@ func (ls *DynamoDBLogStore) createPutItemRequest(entry *logstore.CommitEntry, ov return pir, nil } - -// Gets a DynamoDB client for a DynamoDBLogStore instance from an AWS config -func (ls *DynamoDBLogStore) getClient(config aws.Config) (dynamodbutils.DynamoDBClient, error) { - return dynamodb.NewFromConfig(config), nil -} diff --git a/logstore/dynamodblogstore/dynamodblogstore_test.go b/logstore/dynamodblogstore/dynamodblogstore_test.go index 515f337f..113b8060 100644 --- a/logstore/dynamodblogstore/dynamodblogstore_test.go +++ b/logstore/dynamodblogstore/dynamodblogstore_test.go @@ -10,7 +10,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package logstore +package dynamodblogstore import ( "testing" @@ -20,120 +20,117 @@ import ( "github.com/rivian/delta-go/storage" ) -func TestGet(t *testing.T) { - lso := DynamoDBLogStoreOptions{Client: dynamodbutils.NewMockClient(), TableName: "log_store"} - ls, err := NewDynamoDBLogStore(lso) +func TestPut(t *testing.T) { + o := Options{Client: dynamodbutils.NewMockClient(), TableName: "log_store"} + ls, err := New(o) if err != nil { - t.Error("failed to create DynamoDB log store") + t.Error("Failed to create DynamoDB log store") } - ece, err := logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) + ce, err := logstore.New(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) if err != nil { - t.Error("failed to create commit entry") + t.Error("Failed to create commit entry") } - err = ls.Put(ece, false) - if err != nil { - t.Error("failed to put commit entry") + if err := ls.Put(ce, true); err != nil { + t.Error("Failed to put commit entry") } - ece, err = logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) + ce, err = logstore.New(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), true, uint64(0)) if err != nil { - t.Error("failed to create commit entry") + t.Error("Failed to create commit entry") } - err = ls.Put(ece, false) - if err == nil { - t.Error("commit entry already exists") + if err := ls.Put(ce, true); err != nil { + t.Error("Failed to overwrite commit entry") } - ece, err = ls.Get(storage.NewPath("usr/local/"), storage.NewPath("01.json")) - if err != nil || ece == nil { - t.Error("failed to get commit entry") + items, ok := ls.client.(*dynamodbutils.MockClient).TablesToItems().Get("log_store") + if !ok { + t.Error("Failed to get table items") } - - ece, err = ls.Get(storage.NewPath("usr/local/A"), storage.NewPath("01.json")) - if err == nil || ece != nil { - t.Error("no commit entry should be returned") + if len(items) != 1 { + t.Error("Incorrect number of items in table") } - _, err = ls.Get(storage.NewPath("usr/local/"), storage.NewPath("02.json")) - if err == nil || ece != nil { - t.Error("no commit entry should be returned") + ce, err = ls.Latest(storage.NewPath("usr/local/")) + if err != nil { + t.Error("Failed to get latest commit entry") + } + if ce.IsComplete() != true { + t.Error("Commit entry should be complete") } } -func TestGetLatest(t *testing.T) { - lso := DynamoDBLogStoreOptions{Client: dynamodbutils.NewMockClient(), TableName: "log_store"} - ls, err := NewDynamoDBLogStore(lso) +func TestGet(t *testing.T) { + o := Options{Client: dynamodbutils.NewMockClient(), TableName: "log_store"} + ls, err := New(o) if err != nil { - t.Error("failed to create DynamoDB log store") + t.Error("Failed to create new DynamoDB log store") } - eceFirst, err := logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) + ce, err := logstore.New(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) if err != nil { - t.Error("failed to create commit entry") + t.Error("Failed to create commit entry") } - err = ls.Put(eceFirst, false) - if err != nil { - t.Error("failed to put commit entry") + if err := ls.Put(ce, false); err != nil { + t.Error("Failed to put commit entry") } - eceSecond, err := logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("02.json"), storage.NewPath("02.tmp"), false, uint64(0)) + ce, err = logstore.New(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) if err != nil { - t.Error("failed to create commit entry") + t.Error("Failed to create commit entry") } - err = ls.Put(eceSecond, false) - if err != nil { - t.Error("failed to put commit entry") + if err := ls.Put(ce, false); err == nil { + t.Error("Commit entry already exists") } - eceLatest, err := ls.GetLatest(storage.NewPath("usr/local/")) - if err != nil || eceLatest == nil { - t.Error("failed to get latest commit entry") + ce, err = ls.Get(storage.NewPath("usr/local/"), storage.NewPath("01.json")) + if err != nil || ce == nil { + t.Error("Failed to get commit entry") } - if eceSecond.FileName.Raw != eceLatest.FileName.Raw || eceSecond.TempPath.Raw != eceLatest.TempPath.Raw { - t.Error("got incorrect latest commit entry") + + ce, err = ls.Get(storage.NewPath("usr/local/A"), storage.NewPath("01.json")) + if err == nil || ce != nil { + t.Error("No commit entry should be returned") } - _, err = ls.GetLatest(storage.NewPath("usr/local/A")) - if err == nil { - t.Error("no commit entry should be returned") + _, err = ls.Get(storage.NewPath("usr/local/"), storage.NewPath("02.json")) + if err == nil || ce != nil { + t.Error("No commit entry should be returned") } } -func TestPutOverwrite(t *testing.T) { - lso := DynamoDBLogStoreOptions{Client: dynamodbutils.NewMockClient(), TableName: "log_store"} - ls, err := NewDynamoDBLogStore(lso) +func TestLatest(t *testing.T) { + o := Options{Client: dynamodbutils.NewMockClient(), TableName: "log_store"} + ls, err := New(o) if err != nil { - t.Error("failed to create DynamoDB log store") + t.Error("Failed to create DynamoDB log store") } - ece, err := logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) + firstEntry, err := logstore.New(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), false, uint64(0)) if err != nil { - t.Error("failed to create commit entry") + t.Error("Failed to create commit entry") } - err = ls.Put(ece, true) - if err != nil { - t.Error("failed to put commit entry") + if err := ls.Put(firstEntry, false); err != nil { + t.Error("Failed to put commit entry") } - ece, err = logstore.NewCommitEntry(storage.NewPath("usr/local/"), storage.NewPath("01.json"), storage.NewPath("01.tmp"), true, uint64(0)) + secondEntry, err := logstore.New(storage.NewPath("usr/local/"), storage.NewPath("02.json"), storage.NewPath("02.tmp"), false, uint64(0)) if err != nil { - t.Error("failed to create commit entry") + t.Error("Failed to create commit entry") } - err = ls.Put(ece, true) - if err != nil { - t.Error("failed to overwrite commit entry") + if err := ls.Put(secondEntry, false); err != nil { + t.Error("Failed to put commit entry") } - if len(ls.client.(*dynamodbutils.MockDynamoDBClient).GetTablesToItems()["log_store"]) != 1 { - t.Error("incorrect number of items in table") + latest, err := ls.Latest(storage.NewPath("usr/local/")) + if err != nil || latest == nil { + t.Error("Failed to get latest commit entry") } - - ece, err = ls.GetLatest(storage.NewPath("usr/local/")) - if err != nil { - t.Error("failed to get latest commit entry") + if secondEntry.FileName().Raw != latest.FileName().Raw || secondEntry.TempPath().Raw != latest.TempPath().Raw { + t.Error("Got incorrect latest commit entry") } - if ece.Complete != true { - t.Error("commit entry should be complete") + + if _, err = ls.Latest(storage.NewPath("usr/local/A")); err == nil { + t.Error("No commit entry should be returned") } } diff --git a/logstore/externalcommitentry.go b/logstore/externalcommitentry.go deleted file mode 100644 index 898a6fde..00000000 --- a/logstore/externalcommitentry.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2023 Rivian Automotive, Inc. -// Licensed under the Apache License, Version 2.0 (the “License”); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an “AS IS” BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package logstore - -import ( - "time" - - "github.com/rivian/delta-go/storage" -) - -// Wrapper struct representing an entry in a log store for a given commit in the Delta log -// Contains relevant fields and helper methods have been defined -type CommitEntry struct { - // Absolute path for Delta table - TablePath storage.Path - // File name of this commit, e.g. "000000N.json" - FileName storage.Path - // Path to temp file for this commit, relative to the `_delta_log` - TempPath storage.Path - // true if Delta JSON file is successfully copied to its destination location, else false - Complete bool - // Epoch seconds at which this commit entry is safe to be deleted if complete equals true, else 0 - ExpireTime uint64 -} - -// Creates a new CommitEntry instance -func NewCommitEntry(tablePath storage.Path, fileName storage.Path, tempPath storage.Path, complete bool, expireTime uint64) (*CommitEntry, error) { - ece := new(CommitEntry) - ece.TablePath = tablePath - ece.FileName = fileName - ece.TempPath = tempPath - ece.Complete = complete - ece.ExpireTime = expireTime - return ece, nil -} - -// Returns this entry with `complete=true` and a valid `expireTime` -func (ece *CommitEntry) AsComplete(expirationDelaySeconds uint64) (*CommitEntry, error) { - return NewCommitEntry(ece.TablePath, ece.FileName, ece.TempPath, true, uint64(time.Now().Unix())+expirationDelaySeconds) -} - -// Returns the absolute path to the file for this entry -func (ece *CommitEntry) AbsoluteFilePath() (storage.Path, error) { - return storage.PathFromIter([]string{ece.TablePath.Raw, "_delta_log", ece.FileName.Raw}), nil -} - -// Returns the absolute path to the temp file for this entry -func (ece *CommitEntry) AbsoluteTempPath() (storage.Path, error) { - return storage.PathFromIter([]string{ece.TablePath.Raw, "_delta_log", ece.TempPath.Raw}), nil -} diff --git a/logstore/logstore.go b/logstore/logstore.go index 847ce3e9..65f203d5 100644 --- a/logstore/logstore.go +++ b/logstore/logstore.go @@ -12,15 +12,23 @@ // limitations under the License. package logstore -import "github.com/rivian/delta-go/storage" +import ( + "github.com/rivian/delta-go/storage" +) type LogStore interface { - // Puts an entry into a log store in an exclusive way + // Put puts a commit entry into a log store in an exclusive way. Put(entry *CommitEntry, overwrite bool) error - // Gets an entry corresponding to the Delta log file with given `tablePath` and `fileName` + // Get gets a commit entry corresponding to the commit log identified by the given table path and file name. Get(tablePath storage.Path, fileName storage.Path) (*CommitEntry, error) - // Gets the latest entry corresponding to the Delta log file for given `tablePath` - GetLatest(tablePath storage.Path) (*CommitEntry, error) + // Latest gets the commit entry corresponding to the latest commit log for a given table path. + Latest(tablePath storage.Path) (*CommitEntry, error) + + // Client gets a log store client. + Client() any + + // ExpirationDelaySeconds gets the number of seconds until a commit entry expires. + ExpirationDelaySeconds() uint64 } diff --git a/state/dynamostate/dynamostate.go b/state/dynamostate/dynamostate.go index 63644d46..b7a7f039 100644 --- a/state/dynamostate/dynamostate.go +++ b/state/dynamostate/dynamostate.go @@ -38,7 +38,7 @@ const ( type DynamoState struct { Table string Key string - Client dynamodbutils.DynamoDBClient + Client dynamodbutils.Client } type Options struct { @@ -65,7 +65,7 @@ func (opts *Options) setOptionsDefaults() { // Compile time check that DynamoState implements state.StateStore var _ state.StateStore = (*DynamoState)(nil) -func New(client dynamodbutils.DynamoDBClient, tableName string, key string, opts Options) (*DynamoState, error) { +func New(client dynamodbutils.Client, tableName string, key string, opts Options) (*DynamoState, error) { opts.setOptionsDefaults() createTableInput := dynamodb.CreateTableInput{ @@ -81,7 +81,7 @@ func New(client dynamodbutils.DynamoDBClient, tableName string, key string, opts }, TableName: aws.String(tableName), } - dynamodbutils.TryEnsureDynamoDBTableExists(client, tableName, createTableInput, opts.MaxRetryTableCreateAttempts) + dynamodbutils.CreateTableIfNotExists(client, tableName, createTableInput, opts.MaxRetryTableCreateAttempts) tb := new(DynamoState) tb.Table = tableName diff --git a/storage/filestore/filestore.go b/storage/filestore/filestore.go index 8bad7a09..1c9d7648 100644 --- a/storage/filestore/filestore.go +++ b/storage/filestore/filestore.go @@ -26,7 +26,7 @@ import ( // FileObjectStore provides local file storage type FileObjectStore struct { - BaseURI storage.Path + baseURI storage.Path } // Compile time check that FileObjectStore implements storage.ObjectStore @@ -34,12 +34,12 @@ var _ storage.ObjectStore = (*FileObjectStore)(nil) func New(baseURI storage.Path) *FileObjectStore { fs := new(FileObjectStore) - fs.BaseURI = baseURI + fs.baseURI = baseURI return fs } func (s *FileObjectStore) Put(location storage.Path, bytes []byte) error { - writePath := filepath.Join(s.BaseURI.Raw, location.Raw) + writePath := filepath.Join(s.baseURI.Raw, location.Raw) err := os.MkdirAll(filepath.Dir(writePath), 0700) if err != nil { return errors.Join(storage.ErrPutObject, err) @@ -62,7 +62,7 @@ func (s *FileObjectStore) RenameIfNotExists(from storage.Path, to storage.Path) } func (s *FileObjectStore) Get(location storage.Path) ([]byte, error) { - filePath := filepath.Join(s.BaseURI.Raw, location.Raw) + filePath := filepath.Join(s.baseURI.Raw, location.Raw) data, err := os.ReadFile(filePath) if os.IsNotExist(err) { return nil, errors.Join(storage.ErrObjectDoesNotExist, err) @@ -74,7 +74,7 @@ func (s *FileObjectStore) Get(location storage.Path) ([]byte, error) { } func (s *FileObjectStore) Head(location storage.Path) (storage.ObjectMeta, error) { - filePath := filepath.Join(s.BaseURI.Raw, location.Raw) + filePath := filepath.Join(s.baseURI.Raw, location.Raw) var meta storage.ObjectMeta info, err := os.Stat(filePath) if os.IsNotExist(err) { @@ -96,8 +96,8 @@ func (s *FileObjectStore) Head(location storage.Path) (storage.ObjectMeta, error func (s *FileObjectStore) Rename(from storage.Path, to storage.Path) error { // rename source to destination - f := s.BaseURI.Join(from) - t := s.BaseURI.Join(to) + f := s.baseURI.Join(from) + t := s.baseURI.Join(to) err := os.Rename(f.Raw, t.Raw) if err != nil { return errors.Join(storage.ErrObjectDoesNotExist, err) @@ -106,7 +106,7 @@ func (s *FileObjectStore) Rename(from storage.Path, to storage.Path) error { } func (s *FileObjectStore) Delete(location storage.Path) error { - filePath := filepath.Join(s.BaseURI.Raw, location.Raw) + filePath := filepath.Join(s.baseURI.Raw, location.Raw) err := os.Remove(filePath) if err != nil { return errors.Join(storage.ErrDeleteObject, err) @@ -115,7 +115,7 @@ func (s *FileObjectStore) Delete(location storage.Path) error { } func (s *FileObjectStore) DeleteFolder(location storage.Path) error { - filePath := filepath.Join(s.BaseURI.Raw, location.Raw) + filePath := filepath.Join(s.baseURI.Raw, location.Raw) err := os.RemoveAll(filePath) if err != nil { return errors.Join(storage.ErrDeleteObject, err) @@ -189,7 +189,7 @@ func (s *FileObjectStore) ListAll(prefix storage.Path) (storage.ListResult, erro var listResult storage.ListResult dir, filePrefix := filepath.Split(prefix.Raw) - fullDir := filepath.Join(s.BaseURI.Raw, dir) + fullDir := filepath.Join(s.baseURI.Raw, dir) // If filePrefix was "", make sure fullDir includes a trailing separator. // Otherwise we will return results in the parent directory that start with the same @@ -200,7 +200,7 @@ func (s *FileObjectStore) ListAll(prefix storage.Path) (storage.ListResult, erro // baseURI will be trimmed from the beginning of the results returned. // It must have a trailing separator. - baseURI := s.BaseURI.Raw + baseURI := s.baseURI.Raw if !os.IsPathSeparator(baseURI[len(baseURI)-1]) { baseURI += string(filepath.Separator) } @@ -212,7 +212,7 @@ func (s *FileObjectStore) ListAll(prefix storage.Path) (storage.ListResult, erro // If the prefix passed in was a directory, add the root directory explicitly if dir != "" && filePrefix == "" { - info, err := os.Stat(filepath.Join(s.BaseURI.Raw, dir)) + info, err := os.Stat(filepath.Join(s.baseURI.Raw, dir)) // If we get an error the directory doesn't exist, that's okay if err != nil && !os.IsNotExist(err) { return listResult, errors.Join(storage.ErrListObjects, err) @@ -240,7 +240,7 @@ func (s *FileObjectStore) SupportsWriter() bool { } func (s *FileObjectStore) Writer(location storage.Path, flag int) (io.Writer, func(), error) { - writePath := filepath.Join(s.BaseURI.Raw, location.Raw) + writePath := filepath.Join(s.baseURI.Raw, location.Raw) err := os.MkdirAll(filepath.Dir(writePath), 0700) if err != nil { return nil, nil, errors.Join(storage.ErrWriter, err) @@ -249,3 +249,18 @@ func (s *FileObjectStore) Writer(location storage.Path, flag int) (io.Writer, fu f, err := os.OpenFile(writePath, os.O_WRONLY|flag, 0700) return f, func() { f.Close() }, err } + +// BaseURI gets the base URI. +func (s *FileObjectStore) BaseURI() storage.Path { + return s.baseURI +} + +// SetBaseURI sets the base URI. +func (s *FileObjectStore) SetBaseURI(baseURI storage.Path) { + s.baseURI = baseURI +} + +// SupportsAtomicPutIfAbsent returns false because local file storage does not provide a "put-if-absent" API. +func (s *FileObjectStore) SupportsAtomicPutIfAbsent() bool { + return false +} diff --git a/storage/filestore/filestore_test.go b/storage/filestore/filestore_test.go index 0dbad0d0..da1b4531 100644 --- a/storage/filestore/filestore_test.go +++ b/storage/filestore/filestore_test.go @@ -28,7 +28,7 @@ func TestPut(t *testing.T) { // fileLockKey := filepath.Join(tmpDir, "_delta_log/_commit.lock") tmpPath := storage.NewPath(tmpDir) - store := FileObjectStore{BaseURI: tmpPath} + store := FileObjectStore{baseURI: tmpPath} // lock := filelock.FileLock{Key: fileLockKey} putPath := storage.NewPath("test_file.json") @@ -55,7 +55,7 @@ func TestHead(t *testing.T) { // fileLockKey := filepath.Join(tmpDir, "_delta_log/_commit.lock") tmpPath := storage.NewPath(tmpDir) - store := FileObjectStore{BaseURI: tmpPath} + store := FileObjectStore{baseURI: tmpPath} // lock := filelock.FileLock{Key: fileLockKey} putPath := storage.NewPath("test_file.json") @@ -92,7 +92,7 @@ func TestRenameIfNotExists(t *testing.T) { tmpDir := t.TempDir() tmpPath := storage.NewPath(tmpDir) - store := FileObjectStore{BaseURI: tmpPath} + store := FileObjectStore{baseURI: tmpPath} fromPath := storage.NewPath("data.json.tmp") toPath := storage.NewPath("data.json") @@ -127,7 +127,7 @@ func TestDelete(t *testing.T) { tmpDir := t.TempDir() tmpPath := storage.NewPath(tmpDir) - store := FileObjectStore{BaseURI: tmpPath} + store := FileObjectStore{baseURI: tmpPath} filePath := storage.NewPath("data.json") @@ -184,7 +184,7 @@ func TestList(t *testing.T) { tmpDir := t.TempDir() tmpPath := storage.NewPath(tmpDir) - store := FileObjectStore{BaseURI: tmpPath} + store := FileObjectStore{baseURI: tmpPath} // Create some files and directories filePaths := []string{"data.json", "data2.json", "d3.json", "data/more.json", "data/more2.json", "data3/hello.json"} diff --git a/storage/s3store/s3store.go b/storage/s3store/s3store.go index 4590453f..23b8c188 100644 --- a/storage/s3store/s3store.go +++ b/storage/s3store/s3store.go @@ -32,8 +32,8 @@ import ( // type filePutter func(key string, data io.ReadSeeker, creds *credentials.Credentials) error type S3ObjectStore struct { // Source object key - Client s3utils.S3Client - BaseURI storage.Path + Client s3utils.Client + baseURI storage.Path baseURL *url.URL bucket string path string @@ -44,10 +44,10 @@ type S3ObjectStore struct { // Compile time check that S3ObjectStore implements storage.ObjectStore var _ storage.ObjectStore = (*S3ObjectStore)(nil) -func New(client s3utils.S3Client, baseURI storage.Path) (*S3ObjectStore, error) { +func New(client s3utils.Client, baseURI storage.Path) (*S3ObjectStore, error) { store := new(S3ObjectStore) store.Client = client - store.BaseURI = baseURI + store.baseURI = baseURI var err error store.baseURL, err = baseURI.ParseURL() @@ -296,3 +296,13 @@ func (s *S3ObjectStore) Writer(to storage.Path, flag int) (io.Writer, func(), er func (s *S3ObjectStore) DeleteFolder(location storage.Path) error { return storage.ErrOperationNotSupported } + +// BaseURI gets the base URI. +func (s *S3ObjectStore) BaseURI() storage.Path { + return s.baseURI +} + +// SupportsAtomicPutIfAbsent returns false because S3 does not provide a "put-if-absent" API. +func (s *S3ObjectStore) SupportsAtomicPutIfAbsent() bool { + return false +} diff --git a/storage/s3store/s3store_test.go b/storage/s3store/s3store_test.go index 8a636fd5..44a3eae7 100644 --- a/storage/s3store/s3store_test.go +++ b/storage/s3store/s3store_test.go @@ -28,7 +28,7 @@ import ( ) // Test helper: setupTest does common setup for our tests, creating a mock S3 client and an S3ObjectStore -func setupTest(t *testing.T) (baseURI storage.Path, mockClient *s3utils.MockS3Client, s3Store *S3ObjectStore) { +func setupTest(t *testing.T) (baseURI storage.Path, mockClient *s3utils.MockClient, s3Store *S3ObjectStore) { t.Helper() baseURI = storage.NewPath("s3://test-bucket/test-delta-table") mockClient, err := s3utils.NewMockClient(t, baseURI) @@ -43,7 +43,7 @@ func setupTest(t *testing.T) (baseURI storage.Path, mockClient *s3utils.MockS3Cl } // Test helper: verify the file exists and has the expected contents -func verifyFileContents(t *testing.T, baseURI storage.Path, path storage.Path, mockClient *s3utils.MockS3Client, data []byte, errorMessage string) { +func verifyFileContents(t *testing.T, baseURI storage.Path, path storage.Path, mockClient *s3utils.MockClient, data []byte, errorMessage string) { t.Helper() results, err := mockClient.GetFile(baseURI, path) if err != nil { @@ -55,7 +55,7 @@ func verifyFileContents(t *testing.T, baseURI storage.Path, path storage.Path, m } // Test helper: verify the file does not exist -func verifyFileDoesNotExist(t *testing.T, baseURI storage.Path, path storage.Path, mockClient *s3utils.MockS3Client, errorMessage string) { +func verifyFileDoesNotExist(t *testing.T, baseURI storage.Path, path storage.Path, mockClient *s3utils.MockClient, errorMessage string) { t.Helper() fileExists, err := mockClient.FileExists(baseURI, path) if fileExists { diff --git a/storage/storage.go b/storage/storage.go index f9ea7763..31bbdb57 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -244,6 +244,13 @@ type ObjectStore interface { // If error is nil, then the returned function should be called with a defer to close resources // Writer may not be supported for all store types Writer(to Path, flag int) (io.Writer, func(), error) + + // BaseURI gets a store's base URI. + BaseURI() Path + + // SupportsAtomicPutIfAbsent returns true if a store provides a "put-if-absent" API. + // Otherwise, it returns false. + SupportsAtomicPutIfAbsent() bool } // / Wrapper around List that will perform paging if required