Skip to content

Commit

Permalink
fix: handle rconn reset when tx killer has locked the connection (#10153
Browse files Browse the repository at this point in the history
)

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal authored Apr 28, 2022
1 parent a0c53c5 commit 31c9780
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 5 deletions.
6 changes: 6 additions & 0 deletions go/vt/vterrors/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ const (
// PrimaryVindexNotSet is the error message to be used when there is no primary vindex found on a table
PrimaryVindexNotSet = "table '%s' does not have a primary vindex"
)

// TxKillerRollback purpose when acquire lock on connection for rolling back transaction.
const TxKillerRollback = "in use: for tx killer rollback"

// TxClosed regex for connection closed
var TxClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found|in use: for tx killer rollback)")
5 changes: 1 addition & 4 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"flag"
"io"
"regexp"
"sync"
"time"

Expand Down Expand Up @@ -709,8 +708,6 @@ func (stc *ScatterConn) ExecuteLock(
return qr, err
}

var txClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found)")

func wasConnectionClosed(err error) bool {
sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError)
message := sqlErr.Error()
Expand All @@ -719,7 +716,7 @@ func wasConnectionClosed(err error) bool {
case mysql.CRServerGone, mysql.CRServerLost:
return true
case mysql.ERQueryInterrupted:
return txClosed.MatchString(message)
return vterrors.TxClosed.MatchString(message)
default:
return false
}
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vtgate/scatter_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,14 @@ func TestReservedConnFail(t *testing.T) {
assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost")
oldRId = session.Session.ShardSessions[0].ReservedId

sbc0.Queries = nil
sbc0.EphemeralShardErr = mysql.NewSQLError(mysql.ERQueryInterrupted, mysql.SSUnknownSQLState, "transaction 123 in use: for tx killer rollback")
_ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations)
assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry")
require.Equal(t, 1, len(session.ShardSessions))
assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost")
oldRId = session.Session.ShardSessions[0].ReservedId

sbc0.Queries = nil
sbc0.EphemeralShardErr = vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, "operation not allowed in state NOT_SERVING during query: query1")
_ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations)
Expand Down Expand Up @@ -420,6 +428,10 @@ func TestIsConnClosed(t *testing.T) {
"tx not found missing tx id",
mysql.NewSQLError(mysql.ERQueryInterrupted, mysql.SSUnknownSQLState, "transaction not found"),
false,
}, {
"tx getting killed by tx killer",
mysql.NewSQLError(mysql.ERQueryInterrupted, mysql.SSUnknownSQLState, "transaction 111 in use: for tx killer rollback"),
true,
}}

for _, tCase := range testCases {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (tp *TxPool) Shutdown(ctx context.Context) {

func (tp *TxPool) transactionKiller() {
defer tp.env.LogError()
for _, conn := range tp.scp.GetOutdated(tp.Timeout(), "for tx killer rollback") {
for _, conn := range tp.scp.GetOutdated(tp.Timeout(), vterrors.TxKillerRollback) {
log.Warningf("killing transaction (exceeded timeout: %v): %s", tp.Timeout(), conn.String(tp.env.Config().SanitizeLogMessages))
switch {
case conn.IsTainted():
Expand Down

0 comments on commit 31c9780

Please sign in to comment.