Skip to content

Commit

Permalink
feat(spanner): support commit options in mutation operations. (#10668)
Browse files Browse the repository at this point in the history
* feat(spanner): support commit options in mutation operations.

* fix go vet

* fix testing

* incorporate suggestions

* fix comment
  • Loading branch information
rahul2393 authored Aug 14, 2024
1 parent 2b2114f commit 62a56f9
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 4 deletions.
13 changes: 11 additions & 2 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,8 @@ type applyOption struct {
// will not be recorded in allowed tracking change streams with DDL option
// allow_txn_exclusion=true.
excludeTxnFromChangeStreams bool
// commitOptions is the commit options to use for the commit operation.
commitOptions CommitOptions
}

// An ApplyOption is an optional argument to Apply.
Expand Down Expand Up @@ -921,6 +923,13 @@ func ExcludeTxnFromChangeStreams() ApplyOption {
}
}

// ApplyCommitOptions returns an ApplyOption that sets the commit options to use for the commit operation.
func ApplyCommitOptions(co CommitOptions) ApplyOption {
return func(ao *applyOption) {
ao.commitOptions = co
}
}

// Apply applies a list of mutations atomically to the database.
func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption) (commitTimestamp time.Time, err error) {
ao := &applyOption{}
Expand All @@ -939,10 +948,10 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
if !ao.atLeastOnce {
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
return t.BufferWrite(ms)
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams})
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag, ExcludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams, CommitOptions: ao.commitOptions})
return resp.CommitTs, err
}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag, disableRouteToLeader: c.disableRouteToLeader, excludeTxnFromChangeStreams: ao.excludeTxnFromChangeStreams, commitOptions: ao.commitOptions}
return t.applyAtLeastOnce(ctx, ms...)
}

Expand Down
37 changes: 36 additions & 1 deletion spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
structpb "google.golang.org/protobuf/types/known/structpb"

vkit "cloud.google.com/go/spanner/apiv1"
Expand Down Expand Up @@ -3533,6 +3534,29 @@ func TestClient_ApplyAtLeastOnce(t *testing.T) {
if err != nil {
t.Fatal(err)
}
requests := drainRequestsFromServer(server.TestSpanner)
for _, req := range requests {
if r, ok := req.(*sppb.CommitRequest); ok {
if r.MaxCommitDelay != nil {
t.Fatalf("unexpected MaxCommitDelay: %v", r.MaxCommitDelay)
}
}
}

// Using Max commit delay
duration := 1 * time.Millisecond
_, err = client.Apply(context.Background(), ms, ApplyAtLeastOnce(), ApplyCommitOptions(CommitOptions{MaxCommitDelay: &duration}))
if err != nil {
t.Fatal(err)
}
requests = drainRequestsFromServer(server.TestSpanner)
for _, req := range requests {
if r, ok := req.(*sppb.CommitRequest); ok {
if r.MaxCommitDelay.GetNanos() != durationpb.New(duration).GetNanos() {
t.Fatalf("unexpected MaxCommitDelay: %v", r.MaxCommitDelay)
}
}
}
}

func TestClient_ApplyAtLeastOnceReuseSession(t *testing.T) {
Expand Down Expand Up @@ -5319,8 +5343,16 @@ func TestClient_Apply_Tagging(t *testing.T) {
server, client, teardown := setupMockedTestServer(t)
defer teardown()

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})})
duration := time.Millisecond
client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyCommitOptions(CommitOptions{MaxCommitDelay: &duration}))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, &sppb.RequestOptions{})
for _, req := range drainRequestsFromServer(server.TestSpanner) {
if commitReq, ok := req.(*sppb.CommitRequest); ok {
if commitReq.MaxCommitDelay.GetNanos() != durationpb.New(duration).GetNanos() {
t.Fatalf("Missing MaxCommitDelay in commit request")
}
}
}

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, TransactionTag("tx-tag"))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, &sppb.RequestOptions{TransactionTag: "tx-tag"})
Expand All @@ -5330,6 +5362,9 @@ func TestClient_Apply_Tagging(t *testing.T) {

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), TransactionTag("tx-tag"))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, &sppb.RequestOptions{TransactionTag: "tx-tag"})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), TransactionTag("tx-tag"))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, &sppb.RequestOptions{TransactionTag: "tx-tag"})
}

func TestClient_PartitionQuery_RequestOptions(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,11 @@ func TestIntegration_ReadWriteTransaction_StatementBased(t *testing.T) {
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}),
Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}),
}
if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce()); err != nil {
duration, err := time.ParseDuration("100ms")
if err != nil {
t.Fatal(err)
}
if _, err := client.Apply(ctx, accounts, ApplyAtLeastOnce(), ApplyCommitOptions(CommitOptions{ReturnCommitStats: true, MaxCommitDelay: &duration})); err != nil {
t.Fatal(err)
}

Expand Down
8 changes: 8 additions & 0 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,8 @@ type writeOnlyTransaction struct {
// current transaction from the allowed tracking change streams with DDL option
// allow_txn_exclusion=true.
excludeTxnFromChangeStreams bool
// commitOptions are applied to the Commit request for the writeOnlyTransaction..
commitOptions CommitOptions
}

// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once,
Expand All @@ -1883,6 +1885,11 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
return ts, err
}

var maxCommitDelay *durationpb.Duration
if t.commitOptions.MaxCommitDelay != nil {
maxCommitDelay = durationpb.New(*(t.commitOptions.MaxCommitDelay))
}

// Make a retryer for Aborted and certain Internal errors.
retryer := onCodes(DefaultRetryBackoff, codes.Aborted, codes.Internal)
// Apply the mutation and retry if the commit is aborted.
Expand Down Expand Up @@ -1910,6 +1917,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta
},
Mutations: mPb,
RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag),
MaxCommitDelay: maxCommitDelay,
})
if err != nil && !isAbortedErr(err) {
// should not be the case with multiplexed sessions
Expand Down

0 comments on commit 62a56f9

Please sign in to comment.