Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): support commit options in mutation operations. #10668

Merged
merged 7 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,8 @@ type applyOption struct {
// will not be recorded in allowed tracking change streams with DDL option
// allow_txn_exclusion=true.
excludeTxnFromChangeStreams bool
// commitOptions is the commit options to use for the commit operation.
commitOptions *CommitOptions
}

// An ApplyOption is an optional argument to Apply.
Expand Down Expand Up @@ -921,6 +923,13 @@ func ExcludeTxnFromChangeStreams() ApplyOption {
}
}

// ApplyCommitOptions returns an ApplyOption that sets the commit options to use for the commit operation.
func ApplyCommitOptions(co *CommitOptions) ApplyOption {
return func(ao *applyOption) {
ao.commitOptions = co
}
}

// Apply applies a list of mutations atomically to the database.
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
ao := &applyOption{}
Expand All @@ -937,12 +946,17 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
defer func() { trace.EndSpan(ctx, err) }()

if !ao.atLeastOnce {
var commitOptions CommitOptions
if ao.commitOptions != nil {
commitOptions = *ao.commitOptions
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand why this is different depending on whether ao.atLeastOnce is true or not.


resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
return t.BufferWrite(ms)
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams})
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams, CommitOptions: commitOptions})
return resp.CommitTs, err
}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams, commitOptions: ao.commitOptions}
return t.applyAtLeastOnce(ctx, ms...)
}

Expand Down
6 changes: 5 additions & 1 deletion spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,11 @@ func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) {
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
duration, err := time.ParseDuration("100ms")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add mock server tests for both normal read/write transactions and writeAtLeastOnce transactions that verify that the maxCommitDelay that has been set is actually sent to Spanner in both cases? This integration test only shows that setting the option does not break anything, it is not able to determine whether the option was actually sent or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the tests

if err != nil {
t.Fatal(err)
}
if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce(), ApplyCommitOptions(&CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration})); err != nil {
t.Fatal(err)
}

Expand Down
9 changes: 9 additions & 0 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,9 @@ type writeOnlyTransaction struct {
// current transaction from the allowed tracking change streams with DDL option
// allow_txn_exclusion=true.
excludeTxnFromChangeStreams bool
// maxCommitDelay is the maximum time that the commit will be delayed by the
// backend before it is acknowledged.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(The field here is not maxCommitDelay, but commitOptions)

Suggested change
// maxCommitDelay is the maximum time that the commit will be delayed by the
// backend before it is acknowledged.
// commitOptions are applied to the Commit request for the writeOnlyTransaction.

commitOptions *CommitOptions
}

// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once,
Expand All @@ -1883,6 +1886,11 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
return ts, err
}

var maxCommitDelay *durationpb.Duration
if t.commitOptions != nil && t.commitOptions.MaxCommitDelay != nil {
maxCommitDelay = durationpb.New(*(t.commitOptions.MaxCommitDelay))
}

// Make a retryer for Aborted and certain Internal errors.
retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.Internal)
// Apply the mutation and retry if the commit is aborted.
Expand Down Expand Up @@ -1910,6 +1918,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
},
Mutations: mPb,
RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag),
MaxCommitDelay: maxCommitDelay,
})
if err != nil && !isAbortedErr(err) {
// should not be the case with multiplexed sessions
Expand Down
Loading