From 2d698b754205888fcd4487ec0793c956f42bbf56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Tue, 5 Oct 2021 17:11:38 +0200 Subject: [PATCH] feat: support mutations (#43) * feat: support mutations * chore: address review comments * chore: address last review comment --- driver.go | 32 ++++++++ driver_with_mockserver_test.go | 139 +++++++++++++++++++++++++++++++++ examples/go.mod | 4 +- examples/go.sum | 20 +++++ examples/mutations/main.go | 107 +++++++++++++++++++++++++ transaction.go | 9 +++ 6 files changed, 309 insertions(+), 2 deletions(-) create mode 100644 examples/mutations/main.go diff --git a/driver.go b/driver.go index 0f892fa5..4480a752 100644 --- a/driver.go +++ b/driver.go @@ -201,6 +201,18 @@ func (c *connector) Driver() driver.Driver { return &Driver{} } +type SpannerConn interface { + // Apply writes an array of mutations to the database. This method may only be called while the connection + // is outside a transaction. Use BufferWrite to write mutations in a transaction. + // See also spanner.Client#Apply + Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption) (commitTimestamp time.Time, err error) + + // BufferWrite writes an array of mutations to the current transaction. This method may only be called while the + // connection is in a read/write transaction. Use Apply to write mutations outside a transaction. + // See also spanner.ReadWriteTransaction#BufferWrite + BufferWrite(ms []*spanner.Mutation) error +} + type conn struct { closed bool client *spanner.Client @@ -210,6 +222,26 @@ type conn struct { retryAborts bool } +func (c *conn) Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption) (commitTimestamp time.Time, err error) { + if c.inTransaction() { + return time.Time{}, spanner.ToSpannerError( + status.Error( + codes.FailedPrecondition, + "Apply may not be called while the connection is in a transaction. Use BufferWrite to write mutations in a transaction.")) + } + return c.client.Apply(ctx, ms, opts...) +} + +func (c *conn) BufferWrite(ms []*spanner.Mutation) error { + if !c.inTransaction() { + return spanner.ToSpannerError( + status.Error( + codes.FailedPrecondition, + "BufferWrite may not be called while the connection is not in a transaction. Use Apply to write mutations outside a transaction.")) + } + return c.tx.BufferWrite(ms) +} + // Ping implements the driver.Pinger interface. // returns ErrBadConn if the connection is no longer valid. func (c *conn) Ping(ctx context.Context) error { diff --git a/driver_with_mockserver_test.go b/driver_with_mockserver_test.go index 6c979576..65054f4f 100644 --- a/driver_with_mockserver_test.go +++ b/driver_with_mockserver_test.go @@ -1020,6 +1020,145 @@ func TestPrepare(t *testing.T) { } } +func TestApplyMutations(t *testing.T) { + t.Parallel() + + ctx := context.Background() + db, server, teardown := setupTestDBConnection(t) + defer teardown() + + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("failed to get connection: %v", err) + } + var commitTimestamp time.Time + if err := conn.Raw(func(driverConn interface{}) error { + spannerConn, ok := driverConn.(SpannerConn) + if !ok { + return fmt.Errorf("unexpected driver connection %v, expected SpannerConn", driverConn) + } + commitTimestamp, err = spannerConn.Apply(ctx, []*spanner.Mutation{ + spanner.Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), + spanner.Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), + }) + return err + }); err != nil { + t.Fatalf("failed to apply mutations: %v", err) + } + if commitTimestamp.Equal(time.Time{}) { + t.Fatal("no commit timestamp returned") + } + + // Even though the Apply method is used outside a transaction, the connection will internally start a read/write + // transaction for the mutations. + requests := drainRequestsFromServer(server.TestSpanner) + commitRequests := requestsOfType(requests, reflect.TypeOf(&sppb.CommitRequest{})) + if g, w := len(commitRequests), 1; g != w { + t.Fatalf("commit requests count mismatch\nGot: %v\nWant: %v", g, w) + } + commitRequest := commitRequests[0].(*sppb.CommitRequest) + if g, w := len(commitRequest.Mutations), 2; g != w { + t.Fatalf("mutation count mismatch\nGot: %v\nWant: %v", g, w) + } +} + +func TestApplyMutationsFailure(t *testing.T) { + t.Parallel() + + ctx := context.Background() + db, _, teardown := setupTestDBConnection(t) + defer teardown() + + con, err := db.Conn(ctx) + if err != nil { + t.Fatalf("failed to get connection: %v", err) + } + _, err = con.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + t.Fatalf("failed to begin transaction: %v", err) + } + if g, w := spanner.ErrCode(con.Raw(func(driverConn interface{}) error { + spannerConn, ok := driverConn.(SpannerConn) + if !ok { + return fmt.Errorf("unexpected driver connection %v, expected SpannerConn", driverConn) + } + _, err = spannerConn.Apply(ctx, []*spanner.Mutation{ + spanner.Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), + spanner.Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), + }) + return err + })), codes.FailedPrecondition; g != w { + t.Fatalf("error code mismatch for Apply during transaction\nGot: %v\nWant: %v", g, w) + } +} + +func TestBufferWriteMutations(t *testing.T) { + t.Parallel() + + ctx := context.Background() + db, server, teardown := setupTestDBConnection(t) + defer teardown() + + con, err := db.Conn(ctx) + if err != nil { + t.Fatalf("failed to get connection: %v", err) + } + tx, err := con.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + t.Fatalf("failed to begin transaction: %v", err) + } + if err := con.Raw(func(driverConn interface{}) error { + spannerConn, ok := driverConn.(SpannerConn) + if !ok { + return fmt.Errorf("unexpected driver connection %v, expected SpannerConn", driverConn) + } + return spannerConn.BufferWrite([]*spanner.Mutation{ + spanner.Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), + spanner.Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), + }) + }); err != nil { + t.Fatalf("failed to buffer mutations: %v", err) + } + if err := tx.Commit(); err != nil { + t.Fatalf("failed to commit transaction: %v", err) + } + + requests := drainRequestsFromServer(server.TestSpanner) + commitRequests := requestsOfType(requests, reflect.TypeOf(&sppb.CommitRequest{})) + if g, w := len(commitRequests), 1; g != w { + t.Fatalf("commit requests count mismatch\nGot: %v\nWant: %v", g, w) + } + commitRequest := commitRequests[0].(*sppb.CommitRequest) + if g, w := len(commitRequest.Mutations), 2; g != w { + t.Fatalf("mutation count mismatch\nGot: %v\nWant: %v", g, w) + } +} + +func TestBufferWriteMutationsFails(t *testing.T) { + t.Parallel() + + ctx := context.Background() + db, _, teardown := setupTestDBConnection(t) + defer teardown() + + con, err := db.Conn(ctx) + if err != nil { + t.Fatalf("failed to get connection: %v", err) + } + if g, w := spanner.ErrCode(con.Raw(func(driverConn interface{}) error { + spannerConn, ok := driverConn.(SpannerConn) + if !ok { + return fmt.Errorf("unexpected driver connection %v, expected SpannerConn", driverConn) + } + return spannerConn.BufferWrite([]*spanner.Mutation{ + spanner.Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(1), "Foo", int64(50)}), + spanner.Insert("Accounts", []string{"AccountId", "Nickname", "Balance"}, []interface{}{int64(2), "Bar", int64(1)}), + }) + })), codes.FailedPrecondition; g != w { + t.Fatalf("error code mismatch for BufferWrite outside transaction\nGot: %v\nWant: %v", g, w) + } +} + func TestPing(t *testing.T) { t.Parallel() diff --git a/examples/go.mod b/examples/go.mod index a1248475..357d1462 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -5,7 +5,7 @@ go 1.14 replace github.com/cloudspannerecosystem/go-sql-spanner => ../ require ( - cloud.google.com/go/spanner v1.23.1-0.20210727075241-3d6c6c7873e1 + cloud.google.com/go/spanner v1.25.0 github.com/cloudspannerecosystem/go-sql-spanner v0.0.0-00010101000000-000000000000 github.com/containerd/containerd v1.5.5 // indirect github.com/docker/docker v20.10.8+incompatible @@ -13,5 +13,5 @@ require ( github.com/gorilla/mux v1.8.0 // indirect github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect github.com/morikuni/aec v1.0.0 // indirect - google.golang.org/genproto v0.0.0-20210726143408-b02e89920bf0 + google.golang.org/genproto v0.0.0-20210821163610-241b8fcbd6c8 ) diff --git a/examples/go.sum b/examples/go.sum index 4af1bc1a..91a2ab85 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -24,6 +24,9 @@ cloud.google.com/go v0.84.0/go.mod h1:RazrYuxIK6Kb7YrzzhPoLmCVzl7Sup4NrbKPg8KHSU cloud.google.com/go v0.87.0/go.mod h1:TpDYlFy7vuLzZMMZ+B6iRiELaY7z/gJPaqbMx6mlWcY= cloud.google.com/go v0.88.0 h1:MZ2cf9Elnv1wqccq8ooKO2MqHQLc+ChCp/+QWObCpxg= cloud.google.com/go v0.88.0/go.mod h1:dnKwfYbP9hQhefiUvpbcAyoGSHUrOxR20JVElLiUvEY= +cloud.google.com/go v0.90.0/go.mod h1:kRX0mNRHe0e2rC6oNakvwQqzyDmg57xJ+SZU1eT2aDQ= +cloud.google.com/go v0.93.3 h1:wPBktZFzYBcCZVARvwVKqH1uEj+aLXofJEtrb4oOsio= +cloud.google.com/go v0.93.3/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+YI= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= @@ -38,6 +41,8 @@ cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIA cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= cloud.google.com/go/spanner v1.23.1-0.20210727075241-3d6c6c7873e1 h1:DOK5uvDxxzkTjLIb7xt15zWewNS66DnrOmOb2Hv7C9g= cloud.google.com/go/spanner v1.23.1-0.20210727075241-3d6c6c7873e1/go.mod h1:EZI0yH1D/PrXK0XH9Ba5LGXTXWeqZv0ClOD/19a0Z58= +cloud.google.com/go/spanner v1.25.0 h1:oBLJVlW/v3QMntbpUavhneJEQyPcxbAY5+rI+Jv9hvE= +cloud.google.com/go/spanner v1.25.0/go.mod h1:kQUft3x355hzzaeFbObjsvkzZDgpDkesp3v75WBnI8w= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= @@ -385,6 +390,7 @@ github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210715191844-86eeefc3e471/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -777,6 +783,8 @@ golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914 h1:3B43BWw0xEBsLZ/NO1VALz6fppU3481pik+2Ksv45z8= golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a h1:4Kd8OPUx1xgUwrHDaviWZO8MsgoZTZYC3g+8m16RBww= +golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -869,6 +877,8 @@ golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 h1:siQdpVirKtzPhKl3lZWozZraCFObP8S1v6PRp0bLrtU= +golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -976,6 +986,8 @@ google.golang.org/api v0.48.0/go.mod h1:71Pr1vy+TAZRPkPs/xlCf5SsU8WjuAWv1Pfjbtuk google.golang.org/api v0.50.0/go.mod h1:4bNT5pAuq5ji4SRZm+5QIkjny9JAyVD/3gaSihNefaw= google.golang.org/api v0.51.0 h1:SQaA2Cx57B+iPw2MBgyjEkoeMkRK2IenSGoia0U3lCk= google.golang.org/api v0.51.0/go.mod h1:t4HdrdoNgyN5cbEfm7Lum0lcLDLiise1F8qDKX00sOU= +google.golang.org/api v0.54.0 h1:ECJUVngj71QI6XEm7b1sAf8BljU5inEhMbKPR8Lxhhk= +google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6z3k= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -1038,6 +1050,11 @@ google.golang.org/genproto v0.0.0-20210716133855-ce7ef5c701ea/go.mod h1:AxrInvYm google.golang.org/genproto v0.0.0-20210721163202-f1cecdd8b78a/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= google.golang.org/genproto v0.0.0-20210726143408-b02e89920bf0 h1:tcs4DyF9LYv8cynRAbX8JeBpuezJLaK6RfiATAsGwnY= google.golang.org/genproto v0.0.0-20210726143408-b02e89920bf0/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= +google.golang.org/genproto v0.0.0-20210728212813-7823e685a01f/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= +google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= +google.golang.org/genproto v0.0.0-20210813162853-db860fec028c/go.mod h1:cFeNkxwySK631ADgubI+/XFU/xp8FD5KIVV4rj8UC5w= +google.golang.org/genproto v0.0.0-20210821163610-241b8fcbd6c8 h1:XosVttQUxX8erNhEruTu053/VchgYuksoS9Bj/OITjU= +google.golang.org/genproto v0.0.0-20210821163610-241b8fcbd6c8/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -1066,6 +1083,9 @@ google.golang.org/grpc v1.37.1/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.39.0 h1:Klz8I9kdtkIN6EpHHUOMLCYhTn/2WAe5a0s1hcBkdTI= google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= +google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= +google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q= +google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/examples/mutations/main.go b/examples/mutations/main.go new file mode 100644 index 00000000..6d264a23 --- /dev/null +++ b/examples/mutations/main.go @@ -0,0 +1,107 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "fmt" + "time" + + "cloud.google.com/go/spanner" + "github.com/cloudspannerecosystem/go-sql-spanner" + "github.com/cloudspannerecosystem/go-sql-spanner/examples" +) + +var createTableStatement = "CREATE TABLE Singers (SingerId INT64, Name STRING(MAX)) PRIMARY KEY (SingerId)" + +func mutations(projectId, instanceId, databaseId string) error { + ctx := context.Background() + db, err := sql.Open("spanner", fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectId, instanceId, databaseId)) + if err != nil { + return err + } + defer db.Close() + + // Get a connection so that we can get access to the Spanner specific connection interface SpannerConn. + conn, err := db.Conn(ctx) + if err != nil { + return err + } + + // Mutations can be written outside an explicit transaction using SpannerConn#Apply. + var commitTimestamp time.Time + if err := conn.Raw(func(driverConn interface{}) error { + spannerConn, ok := driverConn.(spannerdriver.SpannerConn) + if !ok { + return fmt.Errorf("unexpected driver connection %v, expected SpannerConn", driverConn) + } + commitTimestamp, err = spannerConn.Apply(ctx, []*spanner.Mutation{ + spanner.Insert("Singers", []string{"SingerId", "Name"}, []interface{}{int64(1), "Richard Moore"}), + spanner.Insert("Singers", []string{"SingerId", "Name"}, []interface{}{int64(2), "Alice Henderson"}), + }) + return err + }); err != nil { + return err + } + fmt.Printf("The transaction with two singer mutations was committed at %v\n", commitTimestamp) + + // Mutations can also be executed as part of a read/write transaction. + // Note: The transaction is started using the connection that we had obtained. This is necessary in order to + // ensure that the conn.Raw call below will use the same connection as the one that just started the transaction. + tx, err := conn.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return err + } + + // Get the name of a singer and update it using a mutation. + id := int64(1) + row := tx.QueryRowContext(ctx, "SELECT Name FROM Singers WHERE SingerId=@id", id) + var name string + if err := row.Scan(&name); err != nil { + return err + } + if err := conn.Raw(func(driverConn interface{}) error { + spannerConn, ok := driverConn.(spannerdriver.SpannerConn) + if !ok { + return fmt.Errorf("unexpected driver connection %v, expected SpannerConn", driverConn) + } + return spannerConn.BufferWrite([]*spanner.Mutation{ + spanner.Update("Singers", []string{"SingerId", "Name"}, []interface{}{id, name + "-Henderson"}), + }) + }); err != nil { + return err + } + if err := tx.Commit(); err != nil { + return err + } + fmt.Print("Updated the name of the first singer\n") + + // Read back the updated row. + row = db.QueryRowContext(ctx, "SELECT SingerId, Name FROM Singers WHERE SingerId = @id", id) + if err := row.Err(); err != nil { + return err + } + if err := row.Scan(&id, &name); err != nil { + return err + } + fmt.Printf("Updated singer: %v %v\n", id, name) + + return nil +} + +func main() { + examples.RunSampleOnEmulator(mutations, createTableStatement) +} diff --git a/transaction.go b/transaction.go index de9dccb5..a01d6b0d 100644 --- a/transaction.go +++ b/transaction.go @@ -32,6 +32,7 @@ type contextTransaction interface { Rollback() error Query(ctx context.Context, stmt spanner.Statement) rowIterator ExecContext(ctx context.Context, stmt spanner.Statement) (int64, error) + BufferWrite(ms []*spanner.Mutation) error } type rowIterator interface { @@ -89,6 +90,10 @@ func (tx *readOnlyTransaction) ExecContext(_ context.Context, stmt spanner.State return 0, spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "read-only transactions cannot write")) } +func (tx *readOnlyTransaction) BufferWrite([]*spanner.Mutation) error { + return spanner.ToSpannerError(status.Errorf(codes.FailedPrecondition, "read-only transactions cannot write")) +} + // ErrAbortedDueToConcurrentModification is returned by a read/write transaction // that was aborted by Cloud Spanner, and where the internal retry attempt // failed because it detected that the results during the retry were different @@ -274,6 +279,10 @@ func (tx *readWriteTransaction) ExecContext(ctx context.Context, stmt spanner.St return res, err } +func (tx *readWriteTransaction) BufferWrite(ms []*spanner.Mutation) error { + return tx.rwTx.BufferWrite(ms) +} + // errorsEqualForRetry returns true if the two errors should be considered equal // when retrying a transaction. This comparison will return true if: // - The errors are the same instances