Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle reserved connection reset when tx killer has locked the connection #10153

Merged
merged 1 commit into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions go/vt/vterrors/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ const (
// PrimaryVindexNotSet is the error message to be used when there is no primary vindex found on a table
PrimaryVindexNotSet = "table '%s' does not have a primary vindex"
)

// TxKillerRollback purpose when acquire lock on connection for rolling back transaction.
const TxKillerRollback = "in use: for tx killer rollback"

// TxClosed regex for connection closed
var TxClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found|in use: for tx killer rollback)")
5 changes: 1 addition & 4 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"flag"
"io"
"regexp"
"sync"
"time"

Expand Down Expand Up @@ -709,8 +708,6 @@ func (stc *ScatterConn) ExecuteLock(
return qr, err
}

var txClosed = regexp.MustCompile("transaction ([a-z0-9:]+) (?:ended|not found)")

func wasConnectionClosed(err error) bool {
sqlErr := mysql.NewSQLErrorFromError(err).(*mysql.SQLError)
message := sqlErr.Error()
Expand All @@ -719,7 +716,7 @@ func wasConnectionClosed(err error) bool {
case mysql.CRServerGone, mysql.CRServerLost:
return true
case mysql.ERQueryInterrupted:
return txClosed.MatchString(message)
return vterrors.TxClosed.MatchString(message)
default:
return false
}
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vtgate/scatter_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,14 @@ func TestReservedConnFail(t *testing.T) {
assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost")
oldRId = session.Session.ShardSessions[0].ReservedId

sbc0.Queries = nil
sbc0.EphemeralShardErr = mysql.NewSQLError(mysql.ERQueryInterrupted, mysql.SSUnknownSQLState, "transaction 123 in use: for tx killer rollback")
_ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations)
assert.Equal(t, 2, len(sbc0.Queries), "one for the failed attempt, and one for the retry")
require.Equal(t, 1, len(session.ShardSessions))
assert.NotEqual(t, oldRId, session.Session.ShardSessions[0].ReservedId, "should have recreated a reserved connection since the last connection was lost")
oldRId = session.Session.ShardSessions[0].ReservedId

sbc0.Queries = nil
sbc0.EphemeralShardErr = vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, "operation not allowed in state NOT_SERVING during query: query1")
_ = executeOnShardsReturnsErr(t, res, keyspace, sc, session, destinations)
Expand Down Expand Up @@ -420,6 +428,10 @@ func TestIsConnClosed(t *testing.T) {
"tx not found missing tx id",
mysql.NewSQLError(mysql.ERQueryInterrupted, mysql.SSUnknownSQLState, "transaction not found"),
false,
}, {
"tx getting killed by tx killer",
mysql.NewSQLError(mysql.ERQueryInterrupted, mysql.SSUnknownSQLState, "transaction 111 in use: for tx killer rollback"),
true,
}}

for _, tCase := range testCases {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (tp *TxPool) Shutdown(ctx context.Context) {

func (tp *TxPool) transactionKiller() {
defer tp.env.LogError()
for _, conn := range tp.scp.GetOutdated(tp.Timeout(), "for tx killer rollback") {
for _, conn := range tp.scp.GetOutdated(tp.Timeout(), vterrors.TxKillerRollback) {
log.Warningf("killing transaction (exceeded timeout: %v): %s", tp.Timeout(), conn.String(tp.env.Config().SanitizeLogMessages))
switch {
case conn.IsTainted():
Expand Down