From f835694bd93dbc9f0f0005cafbda5d10e98488f5 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Wed, 10 Jun 2020 19:46:31 +0200 Subject: [PATCH] Rollback auto-committable transactions when query fails Fixes #6264 Fixes #6285 Signed-off-by: Andres Taylor Signed-off-by: deepthi --- .../rollback/txn_rollback_shutdown_test.go | 59 ++++++++++++------- go/vt/vtgate/executor_dml_test.go | 17 ++++++ go/vt/vtgate/plan_execute.go | 7 +-- 3 files changed, 59 insertions(+), 24 deletions(-) diff --git a/go/test/endtoend/vtgate/transaction/rollback/txn_rollback_shutdown_test.go b/go/test/endtoend/vtgate/transaction/rollback/txn_rollback_shutdown_test.go index e72fe4088b1..8f4b2bd4a7d 100644 --- a/go/test/endtoend/vtgate/transaction/rollback/txn_rollback_shutdown_test.go +++ b/go/test/endtoend/vtgate/transaction/rollback/txn_rollback_shutdown_test.go @@ -23,6 +23,8 @@ import ( "os" "testing" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" @@ -47,7 +49,7 @@ func TestMain(m *testing.M) { defer cluster.PanicHandler(nil) flag.Parse() - exitcode, err := func() (int, error) { + exitCode := func() int { clusterInstance = cluster.NewCluster(cell, hostname) defer clusterInstance.Teardown() @@ -55,8 +57,9 @@ func TestMain(m *testing.M) { clusterInstance.VtgateGrpcPort = clusterInstance.GetAndReservePort() // Start topo server - if err := clusterInstance.StartTopo(); err != nil { - return 1, err + err := clusterInstance.StartTopo() + if err != nil { + panic(err) } // Start keyspace @@ -64,28 +67,25 @@ func TestMain(m *testing.M) { Name: keyspaceName, SchemaSQL: sqlSchema, } - if err := clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil { - return 1, err + err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false) + if err != nil { + panic(err) } // Set a short onterm timeout so the test goes faster. clusterInstance.VtGateExtraArgs = []string{"-onterm_timeout", "1s"} - if err := clusterInstance.StartVtgate(); err != nil { - return 1, err + err = clusterInstance.StartVtgate() + if err != nil { + panic(err) } vtParams = mysql.ConnParams{ Host: clusterInstance.Hostname, Port: clusterInstance.VtgateMySQLPort, } - return m.Run(), nil + return m.Run() }() - if err != nil { - fmt.Printf("%v\n", err) - os.Exit(1) - } else { - os.Exit(exitcode) - } + os.Exit(exitCode) } func exec(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { @@ -101,9 +101,7 @@ func TestTransactionRollBackWhenShutDown(t *testing.T) { defer cluster.PanicHandler(t) ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer conn.Close() exec(t, conn, "insert into buffer(id, msg) values(3,'mark')") @@ -126,9 +124,7 @@ func TestTransactionRollBackWhenShutDown(t *testing.T) { Port: clusterInstance.VtgateMySQLPort, } conn2, err := mysql.Connect(ctx, &vtParams) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer conn2.Close() vtParams = mysql.ConnParams{ @@ -142,3 +138,26 @@ func TestTransactionRollBackWhenShutDown(t *testing.T) { want = `[[INT64(3)]]` assert.Equal(t, want, got) } + +func TestErrorInAutocommitSession(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + conn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn.Close() + + exec(t, conn, "set autocommit=true") + exec(t, conn, "insert into buffer(id, msg) values(1,'foo')") + _, err = conn.ExecuteFetch("insert into buffer(id, msg) values(1,'bar')", 1, true) + require.Error(t, err) // this should fail with duplicate error + exec(t, conn, "insert into buffer(id, msg) values(2,'baz')") + + conn2, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer conn2.Close() + result := exec(t, conn2, "select * from buffer order by id") + + // if we have properly working autocommit code, both the successful inserts should be visible to a second + // connection, even if we have not done an explicit commit + assert.Equal(t, `[[INT64(1) VARCHAR("foo")] [INT64(2) VARCHAR("baz")]]`, fmt.Sprintf("%v", result.Rows)) +} diff --git a/go/vt/vtgate/executor_dml_test.go b/go/vt/vtgate/executor_dml_test.go index 23915311361..4c7957bb73d 100644 --- a/go/vt/vtgate/executor_dml_test.go +++ b/go/vt/vtgate/executor_dml_test.go @@ -943,6 +943,23 @@ func TestInsertOnDupKey(t *testing.T) { } } +func TestAutocommitFail(t *testing.T) { + executor, sbc1, _, _ := createExecutorEnv() + + query := "insert into user (id) values (1)" + sbc1.MustFailCodes[vtrpcpb.Code_ALREADY_EXISTS] = 1 + masterSession.Reset() + masterSession.Autocommit = true + defer func() { + masterSession.Autocommit = false + }() + _, err := executorExec(executor, query, nil) + require.Error(t, err) + + // make sure we have closed and rolled back any transactions started + assert.False(t, masterSession.InTransaction, "left with tx open") +} + func TestInsertComments(t *testing.T) { executor, sbc1, sbc2, sbclookup := createExecutorEnv() diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index b1c93d7747d..5d4e271d2d3 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -165,6 +165,9 @@ func (e *planExecute) insideTransaction(ctx context.Context, safeSession *SafeSe if err := e.e.txConn.Begin(ctx, safeSession); err != nil { return nil, err } + // The defer acts as a failsafe. If commit was successful, + // the rollback will be a no-op. + defer e.e.txConn.Rollback(ctx, safeSession) } // The SetAutocommitable flag should be same as mustCommit. @@ -184,10 +187,6 @@ func (e *planExecute) insideTransaction(ctx context.Context, safeSession *SafeSe } if mustCommit { - // The defer acts as a failsafe. If commit was successful, - // the rollback will be a no-op. - defer e.e.txConn.Rollback(ctx, safeSession) - commitStart := time.Now() if err := e.e.txConn.Commit(ctx, safeSession); err != nil { return nil, err