diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 6657b9508ef99..283eb0858b716 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -176,14 +176,11 @@ func (t *mockTxn) Mem() uint64 { return 0 } -func (t *mockTxn) StartAggressiveLocking() {} -func (t *mockTxn) RetryAggressiveLocking(_ context.Context) {} -func (t *mockTxn) CancelAggressiveLocking(_ context.Context) {} -func (t *mockTxn) DoneAggressiveLocking(_ context.Context) {} - -func (t *mockTxn) IsInAggressiveLockingMode() bool { - return false -} +func (t *mockTxn) StartAggressiveLocking() error { return nil } +func (t *mockTxn) RetryAggressiveLocking(_ context.Context) error { return nil } +func (t *mockTxn) CancelAggressiveLocking(_ context.Context) error { return nil } +func (t *mockTxn) DoneAggressiveLocking(_ context.Context) error { return nil } +func (t *mockTxn) IsInAggressiveLockingMode() bool { return false } // newMockTxn new a mockTxn. func newMockTxn() Transaction { diff --git a/kv/kv.go b/kv/kv.go index e91764635ba67..9d1c48651f1cc 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -284,10 +284,10 @@ type AssertionProto interface { // AggressiveLockingController is the interface that defines aggressive locking related operations. type AggressiveLockingController interface { - StartAggressiveLocking() - RetryAggressiveLocking(ctx context.Context) - CancelAggressiveLocking(ctx context.Context) - DoneAggressiveLocking(ctx context.Context) + StartAggressiveLocking() error + RetryAggressiveLocking(ctx context.Context) error + CancelAggressiveLocking(ctx context.Context) error + DoneAggressiveLocking(ctx context.Context) error IsInAggressiveLockingMode() bool } diff --git a/session/txn.go b/session/txn.go index c7f04c96abcc3..e81dfcc578930 100644 --- a/session/txn.go +++ b/session/txn.go @@ -291,7 +291,10 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { if txn.enterAggressiveLockingOnValid { txn.enterAggressiveLockingOnValid = false - txn.Transaction.StartAggressiveLocking() + err = txn.Transaction.StartAggressiveLocking() + if err != nil { + return err + } } // The txnInfo may already recorded the first statement (usually "begin") when it's pending, so keep them. @@ -471,64 +474,80 @@ func (txn *LazyTxn) LockKeysFunc(ctx context.Context, lockCtx *kv.LockCtx, fn fu } // StartAggressiveLocking wraps the inner transaction to support using aggressive locking with lazy initialization. -func (txn *LazyTxn) StartAggressiveLocking() { +func (txn *LazyTxn) StartAggressiveLocking() error { if txn.Valid() { - txn.Transaction.StartAggressiveLocking() + return txn.Transaction.StartAggressiveLocking() } else if txn.pending() { txn.enterAggressiveLockingOnValid = true } else { - panic("trying to start aggressive locking on a transaction in invalid state") + err := errors.New("trying to start aggressive locking on a transaction in invalid state") + logutil.BgLogger().Error("unexpected error when starting aggressive locking", zap.Error(err), zap.Stringer("txn", txn)) + return err } + return nil } // RetryAggressiveLocking wraps the inner transaction to support using aggressive locking with lazy initialization. -func (txn *LazyTxn) RetryAggressiveLocking(ctx context.Context) { +func (txn *LazyTxn) RetryAggressiveLocking(ctx context.Context) error { if txn.Valid() { - txn.Transaction.RetryAggressiveLocking(ctx) + return txn.Transaction.RetryAggressiveLocking(ctx) } else if !txn.pending() { - panic("trying to retry aggressive locking on a transaction in invalid state") + err := errors.New("trying to retry aggressive locking on a transaction in invalid state") + logutil.BgLogger().Error("unexpected error when retrying aggressive locking", zap.Error(err), zap.Stringer("txnStartTS", txn)) + return err } + return nil } // CancelAggressiveLocking wraps the inner transaction to support using aggressive locking with lazy initialization. -func (txn *LazyTxn) CancelAggressiveLocking(ctx context.Context) { +func (txn *LazyTxn) CancelAggressiveLocking(ctx context.Context) error { if txn.Valid() { - txn.Transaction.CancelAggressiveLocking(ctx) + return txn.Transaction.CancelAggressiveLocking(ctx) } else if txn.pending() { if txn.enterAggressiveLockingOnValid { txn.enterAggressiveLockingOnValid = false } else { - panic("trying to cancel aggressive locking when it's not started") + err := errors.New("trying to cancel aggressive locking when it's not started") + logutil.BgLogger().Error("unexpected error when cancelling aggressive locking", zap.Error(err), zap.Stringer("txnStartTS", txn)) + return err } } else { - panic("trying to cancel aggressive locking on a transaction in invalid state") + err := errors.New("trying to cancel aggressive locking on a transaction in invalid state") + logutil.BgLogger().Error("unexpected error when cancelling aggressive locking", zap.Error(err), zap.Stringer("txnStartTS", txn)) + return err } + return nil } // DoneAggressiveLocking wraps the inner transaction to support using aggressive locking with lazy initialization. -func (txn *LazyTxn) DoneAggressiveLocking(ctx context.Context) { +func (txn *LazyTxn) DoneAggressiveLocking(ctx context.Context) error { if txn.Valid() { - txn.Transaction.DoneAggressiveLocking(ctx) + return txn.Transaction.DoneAggressiveLocking(ctx) } else if txn.pending() { if txn.enterAggressiveLockingOnValid { txn.enterAggressiveLockingOnValid = false } else { - panic("trying to finish aggressive locking when it's not started") + err := errors.New("trying to finish aggressive locking when it's not started") + logutil.BgLogger().Error("unexpected error when finishing aggressive locking") + return err } } else { - panic("trying to cancel aggressive locking on a transaction in invalid state") + err := errors.New("trying to cancel aggressive locking on a transaction in invalid state") + logutil.BgLogger().Error("unexpected error when finishing aggressive locking") + return err } + return nil } // IsInAggressiveLockingMode wraps the inner transaction to support using aggressive locking with lazy initialization. func (txn *LazyTxn) IsInAggressiveLockingMode() bool { if txn.Valid() { return txn.Transaction.IsInAggressiveLockingMode() - } - if txn.pending() { + } else if txn.pending() { return txn.enterAggressiveLockingOnValid + } else { + return false } - return false } func (txn *LazyTxn) reset() { diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index 90c665f7526ef..41e0e40846aa3 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -514,7 +514,9 @@ func (p *basePessimisticTxnContextProvider) OnHandlePessimisticStmtStart(ctx con return err } if p.sctx.GetSessionVars().PessimisticTransactionAggressiveLocking && p.txn != nil { - p.txn.StartAggressiveLocking() + if err := p.txn.StartAggressiveLocking(); err != nil { + return err + } } return nil } @@ -525,7 +527,9 @@ func (p *basePessimisticTxnContextProvider) OnStmtRetry(ctx context.Context) err return err } if p.txn != nil && p.txn.IsInAggressiveLockingMode() { - p.txn.RetryAggressiveLocking(ctx) + if err := p.txn.RetryAggressiveLocking(ctx); err != nil { + return err + } } return nil } @@ -536,7 +540,9 @@ func (p *basePessimisticTxnContextProvider) OnStmtCommit(ctx context.Context) er return err } if p.txn != nil && p.txn.IsInAggressiveLockingMode() { - p.txn.DoneAggressiveLocking(ctx) + if err := p.txn.DoneAggressiveLocking(ctx); err != nil { + return err + } } return nil } @@ -547,7 +553,9 @@ func (p *basePessimisticTxnContextProvider) OnStmtRollback(ctx context.Context, return err } if !isForPessimisticRetry && p.txn != nil && p.txn.IsInAggressiveLockingMode() { - p.txn.CancelAggressiveLocking(ctx) + if err := p.txn.CancelAggressiveLocking(ctx); err != nil { + return err + } } return nil } diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index d1a48975ceee8..11820c6773485 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -356,7 +356,7 @@ func (txn *tikvTxn) exitAggressiveLockingIfInapplicable(ctx context.Context, key // Then the previously-locked keys during execution in this statement (if any) will be turned into the state // as if they were locked in normal way. // Note that the issue https://github.com/pingcap/tidb/issues/35682 also exists here. - txn.DoneAggressiveLocking(ctx) + txn.KVTxn.DoneAggressiveLocking(ctx) } } @@ -382,6 +382,31 @@ func (txn *tikvTxn) generateWriteConflictForLockedWithConflict(lockCtx *kv.LockC return nil } +// StartAggressiveLocking adapts the method signature of `KVTxn` to satisfy kv.AggressiveLockingController. +// TODO: Update the methods' signatures in client-go to avoid this adaptor functions. +func (txn *tikvTxn) StartAggressiveLocking() error { + txn.KVTxn.StartAggressiveLocking() + return nil +} + +// RetryAggressiveLocking adapts the method signature of `KVTxn` to satisfy kv.AggressiveLockingController. +func (txn *tikvTxn) RetryAggressiveLocking(ctx context.Context) error { + txn.KVTxn.RetryAggressiveLocking(ctx) + return nil +} + +// CancelAggressiveLocking adapts the method signature of `KVTxn` to satisfy kv.AggressiveLockingController. +func (txn *tikvTxn) CancelAggressiveLocking(ctx context.Context) error { + txn.KVTxn.CancelAggressiveLocking(ctx) + return nil +} + +// DoneAggressiveLocking adapts the method signature of `KVTxn` to satisfy kv.AggressiveLockingController. +func (txn *tikvTxn) DoneAggressiveLocking(ctx context.Context) error { + txn.KVTxn.DoneAggressiveLocking(ctx) + return nil +} + // TiDBKVFilter is the filter specific to TiDB to filter out KV pairs that needn't be committed. type TiDBKVFilter struct{}