diff --git a/go/vt/vterrors/constants.go b/go/vt/vterrors/constants.go index 2008e1ccbc1..50df5445354 100644 --- a/go/vt/vterrors/constants.go +++ b/go/vt/vterrors/constants.go @@ -41,3 +41,9 @@ const ( // PrimaryVindexNotSet is the error message to be used when there is no primary vindex found on a table PrimaryVindexNotSet = "table '%s' does not have a primary vindex" ) + +// TxKillerRollback purpose when acquire lock on connection for rolling back transaction. +const TxKillerRollback = "in use: for tx killer rollback" + +// TxClosed regex for connection closed +var TxClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found|in use: for tx killer rollback)") diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 3d890c77ffb..ee0f6912630 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -20,7 +20,6 @@ import ( "context" "flag" "io" - "regexp" "sync" "time" @@ -709,8 +708,6 @@ func (stc *ScatterConn) ExecuteLock( return qr, err } -var txClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found)") - func wasConnectionClosed(err error) bool { sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError) message := sqlErr.Error() @@ -719,7 +716,7 @@ func wasConnectionClosed(err error) bool { case mysql.CRServerGone, mysql.CRServerLost: return true case mysql.ERQueryInterrupted: - return txClosed.MatchString(message) + return vterrors.TxClosed.MatchString(message) default: return false } diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index dd485a2055e..276b4cad405 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -325,6 +325,14 @@ func TestReservedConnFail(t *testing.T) { assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") oldRId = session.Session.ShardSessions[0].ReservedId + sbc0.Queries = nil + sbc0.EphemeralShardErr = mysql.NewSQLError(mysql.ERQueryInterrupted, mysql.SSUnknownSQLState, "transaction 123 in use: for tx killer rollback") + _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) + assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry") + require.Equal(t, 1, len(session.ShardSessions)) + assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost") + oldRId = session.Session.ShardSessions[0].ReservedId + sbc0.Queries = nil sbc0.EphemeralShardErr = vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, "operation not allowed in state NOT_SERVING during query: query1") _ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations) @@ -420,6 +428,10 @@ func TestIsConnClosed(t *testing.T) { "tx not found missing tx id", mysql.NewSQLError(mysql.ERQueryInterrupted, mysql.SSUnknownSQLState, "transaction not found"), false, + }, { + "tx getting killed by tx killer", + mysql.NewSQLError(mysql.ERQueryInterrupted, mysql.SSUnknownSQLState, "transaction 111 in use: for tx killer rollback"), + true, }} for _, tCase := range testCases { diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index 12e262e6b65..4673bc1432b 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -122,7 +122,7 @@ func (tp *TxPool) Shutdown(ctx context.Context) { func (tp *TxPool) transactionKiller() { defer tp.env.LogError() - for _, conn := range tp.scp.GetOutdated(tp.Timeout(), "for tx killer rollback") { + for _, conn := range tp.scp.GetOutdated(tp.Timeout(), vterrors.TxKillerRollback) { log.Warningf("killing transaction (exceeded timeout: %v): %s", tp.Timeout(), conn.String(tp.env.Config().SanitizeLogMessages)) switch { case conn.IsTainted():