From a6ddf489394ff06258f2e02e9ef5071bb5628b6f Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 27 Dec 2022 21:50:38 +0800 Subject: [PATCH 1/5] Add more tests and fix the bug of insert in aggressive locking Signed-off-by: MyonKeminta --- integration_tests/2pc_test.go | 337 +++++++++++++++++++++++++++++++++- txnkv/transaction/txn.go | 12 +- 2 files changed, 344 insertions(+), 5 deletions(-) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 178dcefd04..0a8f60c666 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -39,6 +39,7 @@ package tikv_test import ( "bytes" "context" + stderrs "errors" "fmt" "math" "math/rand" @@ -310,7 +311,7 @@ func (s *testCommitterSuite) TestContextCancel2() { cancel() // Secondary keys should not be canceled. s.Eventually(func() bool { - return !s.isKeyLocked([]byte("b")) + return !s.isKeyOptimisticLocked([]byte("b")) }, 2*time.Second, 20*time.Millisecond, "Secondary locks are not committed after 2 seconds") } @@ -370,7 +371,7 @@ func (s *testCommitterSuite) mustGetRegionID(key []byte) uint64 { return loc.Region.GetID() } -func (s *testCommitterSuite) isKeyLocked(key []byte) bool { +func (s *testCommitterSuite) isKeyOptimisticLocked(key []byte) bool { ver, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) s.Nil(err) bo := tikv.NewBackofferWithVars(context.Background(), 500, nil) @@ -387,6 +388,34 @@ func (s *testCommitterSuite) isKeyLocked(key []byte) bool { return keyErr.GetLocked() != nil } +func (s *testCommitterSuite) checkIsKeyLocked(key []byte, expectedLocked bool) { + // To be aware of the result of async operations (e.g. async pessimistic rollback), retry if the check fails. + for i := 0; i < 5; i++ { + txn := s.begin() + txn.SetPessimistic(true) + + lockCtx := kv.NewLockCtx(txn.StartTS(), kv.LockNoWait, time.Now()) + err := txn.LockKeys(context.Background(), lockCtx, key) + + var isCheckSuccess bool + if err != nil && stderrs.Is(err, tikverr.ErrLockAcquireFailAndNoWaitSet) { + isCheckSuccess = expectedLocked + } else { + s.Nil(err) + isCheckSuccess = !expectedLocked + } + + if isCheckSuccess { + s.Nil(txn.Rollback()) + return + } + + s.Nil(txn.Rollback()) + time.Sleep(time.Millisecond * 50) + } + s.Fail(fmt.Sprintf("expected key %q locked = %v, but the actual result not match", string(key), expectedLocked)) +} + func (s *testCommitterSuite) TestPrewriteCancel() { // Setup region delays for key "b" and "c". delays := map[uint64]time.Duration{ @@ -416,7 +445,7 @@ func (s *testCommitterSuite) TestPrewriteCancel() { s.NotNil(err) // "c" should be cleaned up in reasonable time. s.Eventually(func() bool { - return !s.isKeyLocked([]byte("c")) + return !s.isKeyOptimisticLocked([]byte("c")) }, 500*time.Millisecond, 10*time.Millisecond) } @@ -1112,6 +1141,308 @@ func (s *testCommitterSuite) TestPessimisticLockAllowLockWithConflictError() { } } +func (s *testCommitterSuite) TestAggressiveLocking() { + for _, finalIsDone := range []bool{false, true} { + txn := s.begin() + txn.SetPessimistic(true) + s.False(txn.IsInAggressiveLockingMode()) + + // Lock some keys in normal way. + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"), []byte("k2"))) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + + // Enter aggressive locking mode and lock some keys. + txn.StartAggressiveLocking() + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + for _, key := range []string{"k2", "k3", "k4"} { + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte(key))) + s.checkIsKeyLocked([]byte(key), true) + } + s.True(!txn.IsInAggressiveLockingStage([]byte("k2"))) + s.True(txn.IsInAggressiveLockingStage([]byte("k3"))) + s.True(txn.IsInAggressiveLockingStage([]byte("k4"))) + + // Retry and change some of the keys to be locked. + txn.RetryAggressiveLocking(context.Background()) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + s.checkIsKeyLocked([]byte("k3"), true) + s.checkIsKeyLocked([]byte("k4"), true) + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k4"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5"))) + s.checkIsKeyLocked([]byte("k4"), true) + s.checkIsKeyLocked([]byte("k5"), true) + + // Retry again, then the unnecessary locks acquired in the previous stage should be released. + txn.RetryAggressiveLocking(context.Background()) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + s.checkIsKeyLocked([]byte("k3"), false) + s.checkIsKeyLocked([]byte("k4"), true) + s.checkIsKeyLocked([]byte("k5"), true) + + // Lock some different keys again and then done or cancel. + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k6"))) + + if finalIsDone { + txn.DoneAggressiveLocking(context.Background()) + time.Sleep(time.Millisecond * 50) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + s.checkIsKeyLocked([]byte("k3"), false) + s.checkIsKeyLocked([]byte("k4"), false) + s.checkIsKeyLocked([]byte("k5"), true) + s.checkIsKeyLocked([]byte("k6"), true) + } else { + txn.CancelAggressiveLocking(context.Background()) + time.Sleep(time.Millisecond * 50) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + s.checkIsKeyLocked([]byte("k3"), false) + s.checkIsKeyLocked([]byte("k4"), false) + s.checkIsKeyLocked([]byte("k5"), false) + s.checkIsKeyLocked([]byte("k6"), false) + } + + s.NoError(txn.Rollback()) + } +} + +func (s *testCommitterSuite) TestAggressiveLockingInsert() { + txn0 := s.begin() + s.NoError(txn0.Set([]byte("k1"), []byte("v1"))) + s.NoError(txn0.Set([]byte("k3"), []byte("v3"))) + s.NoError(txn0.Set([]byte("k6"), []byte("v6"))) + s.NoError(txn0.Set([]byte("k8"), []byte("v8"))) + s.NoError(txn0.Commit(context.Background())) + + txn := s.begin() + txn.SetPessimistic(true) + + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"), []byte("k2"))) + s.NoError(txn.Set([]byte("k5"), []byte("v5"))) + s.NoError(txn.Delete([]byte("k6"))) + + insert := func(lockCtx *kv.LockCtx, key string) error { + txn.GetMemBuffer().UpdateFlags([]byte(key), kv.SetPresumeKeyNotExists) + if lockCtx == nil { + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + } + return txn.LockKeys(context.Background(), lockCtx, []byte(key)) + } + + mustAlreadyExist := func(err error) { + if _, ok := errors.Cause(err).(*tikverr.ErrKeyExist); !ok { + s.Fail(fmt.Sprintf("expected KeyExist error, but got: %+q", err)) + } + } + + txn.StartAggressiveLocking() + // Already-locked before aggressive locking. + mustAlreadyExist(insert(nil, "k1")) + s.NoError(insert(nil, "k2")) + // Acquiring new locks normally. + mustAlreadyExist(insert(nil, "k3")) + s.NoError(insert(nil, "k4")) + // Existence buffered in memdb of this transaction. + mustAlreadyExist(insert(nil, "k5")) + s.NoError(insert(nil, "k6")) + + // Locked with conflict and then do pessimistic retry. + txn2 := s.begin() + s.NoError(txn2.Set([]byte("k7"), []byte("v7"))) + s.NoError(txn2.Delete([]byte("k8"))) + s.NoError(txn2.Commit(context.Background())) + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(insert(lockCtx, "k7")) + s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS) + s.Equal(txn2.GetCommitTS(), lockCtx.Values["k7"].LockedWithConflictTS) + lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(insert(lockCtx, "k8")) + s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS) + s.Equal(txn2.GetCommitTS(), lockCtx.Values["k8"].LockedWithConflictTS) + // Update forUpdateTS to simulate a pessimistic retry. + newForUpdateTS, err := s.store.CurrentTimestamp(oracle.GlobalTxnScope) + s.Nil(err) + s.GreaterOrEqual(newForUpdateTS, txn2.GetCommitTS()) + lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()} + mustAlreadyExist(insert(nil, "k7")) + s.NoError(insert(nil, "k8")) + + txn.CancelAggressiveLocking(context.Background()) + s.NoError(txn.Rollback()) +} + +func (s *testCommitterSuite) TestAggressiveLockingSwitchPrimary() { + txn := s.begin() + checkPrimary := func(key string, expectedPrimary string) { + lockInfo := s.getLockInfo([]byte(key)) + s.Equal(kvrpcpb.Op_PessimisticLock, lockInfo.LockType) + s.Equal(expectedPrimary, string(lockInfo.PrimaryLock)) + } + + forUpdateTS := txn.StartTS() + txn.StartAggressiveLocking() + lockCtx := &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + checkPrimary("k1", "k1") + checkPrimary("k2", "k1") + + // Primary not changed. + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k3"))) + checkPrimary("k1", "k1") + checkPrimary("k3", "k1") + + // Primary changed and is not in the set of previously locked keys. + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k4"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5"))) + checkPrimary("k4", "k4") + checkPrimary("k5", "k4") + // Previously locked keys that are not in the most recent aggressive locking stage will be released. + s.checkIsKeyLocked([]byte("k2"), false) + + // Primary changed and is in the set of previously locked keys. + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k6"))) + checkPrimary("k5", "k5") + checkPrimary("k6", "k5") + s.checkIsKeyLocked([]byte("k1"), false) + s.checkIsKeyLocked([]byte("k3"), false) + + // Primary changed and is locked *before* the previous aggressive locking stage (suppose it's the n-th retry, + // the expected primary is locked during the (n-2)-th retry). + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k7"))) + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k6"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k5"))) + checkPrimary("k5", "k6") + checkPrimary("k6", "k6") + + txn.CancelAggressiveLocking(context.Background()) + // Check all released. + for i := 0; i < 6; i++ { + key := []byte{byte('k'), byte('1') + byte(i)} + s.checkIsKeyLocked(key, false) + } + s.NoError(txn.Rollback()) + + // Also test the primary-switching logic won't misbehave when the primary is already selected before entering + // aggressive locking. + txn = s.begin() + forUpdateTS = txn.StartTS() + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"), []byte("k2"))) + checkPrimary("k1", "k1") + checkPrimary("k2", "k1") + + txn.StartAggressiveLocking() + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k3"))) + checkPrimary("k2", "k1") + checkPrimary("k3", "k1") + + forUpdateTS++ + txn.RetryAggressiveLocking(context.Background()) + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k3"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k4"))) + checkPrimary("k3", "k1") + checkPrimary("k4", "k1") + + txn.CancelAggressiveLocking(context.Background()) + s.checkIsKeyLocked([]byte("k1"), true) + s.checkIsKeyLocked([]byte("k2"), true) + s.checkIsKeyLocked([]byte("k3"), false) + s.checkIsKeyLocked([]byte("k4"), false) + s.NoError(txn.Rollback()) + s.checkIsKeyLocked([]byte("k1"), false) + s.checkIsKeyLocked([]byte("k2"), false) + +} + +func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() { + txn0 := s.begin() + s.NoError(txn0.Set([]byte("k2"), []byte("v2"))) + s.NoError(txn0.Commit(context.Background())) + + txn := s.begin() + + for _, firstAttemptLockedWithConflict := range []bool{false, true} { + forUpdateTS := txn.StartTS() + txn.StartAggressiveLocking() + lockCtx := &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + + var txn2 transaction.TxnProbe + if firstAttemptLockedWithConflict { + txn2 = s.begin() + s.NoError(txn2.Delete([]byte("k1"))) + s.NoError(txn2.Set([]byte("k1"), []byte("v1"))) + s.NoError(txn2.Commit(context.Background())) + } + + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + + if firstAttemptLockedWithConflict { + s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS) + s.Equal(txn2.GetCommitTS(), lockCtx.Values["k1"].LockedWithConflictTS) + s.Equal(txn2.GetCommitTS(), lockCtx.Values["k2"].LockedWithConflictTS) + } + + if firstAttemptLockedWithConflict { + forUpdateTS = txn2.GetCommitTS() + 1 + } else { + forUpdateTS++ + } + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + lockCtx.InitCheckExistence(2) + txn.RetryAggressiveLocking(context.Background()) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + s.Equal(0, lockCtx.MaxLockedWithConflictTS) + s.Equal(false, lockCtx.Values["k1"].Exists) + s.Equal(true, lockCtx.Values["k2"].Exists) + + forUpdateTS++ + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + lockCtx.InitReturnValues(2) + txn.RetryAggressiveLocking(context.Background()) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) + s.Equal(0, lockCtx.MaxLockedWithConflictTS) + s.Equal(false, lockCtx.Values["k1"].Exists) + s.Equal(true, lockCtx.Values["k2"].Exists) + s.Equal([]byte("v2"), lockCtx.Values["k2"].Value) + + txn.CancelAggressiveLocking(context.Background()) + s.NoError(txn.Rollback()) + } +} + // TestElapsedTTL tests that elapsed time is correct even if ts physical time is greater than local time. func (s *testCommitterSuite) TestElapsedTTL() { key := []byte("key") diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index a8bb345dbb..158292d615 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -886,16 +886,24 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput checkKeyExists = flags.HasNeedCheckExists() } // If the key is locked in the current aggressive locking stage, override the information in memBuf. + isInLastAggressiveLockingStage := false if txn.aggressiveLockingContext != nil { if entry, ok := txn.aggressiveLockingContext.currentLockedKeys[string(key)]; ok { locked = true valueExist = entry.Value.Exists + } else if entry, ok := txn.aggressiveLockingContext.lastRetryUnnecessaryLocks[string(key)]; ok { + locked = true + valueExist = entry.Value.Exists + isInLastAggressiveLockingStage = true } } - if !locked { + if !locked || isInLastAggressiveLockingStage { + // Locks acquired in the previous aggressive locking stage might need to be updated later in + // `filterAggressiveLockedKeys`. keys = append(keys, key) - } else if txn.IsPessimistic() { + } + if locked && txn.IsPessimistic() { if checkKeyExists && valueExist { alreadyExist := kvrpcpb.AlreadyExist{Key: key} e := &tikverr.ErrKeyExist{AlreadyExist: &alreadyExist} From 32d6e67fb71db503105821e699b1c78d28ab55ce Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Wed, 28 Dec 2022 22:11:05 +0800 Subject: [PATCH 2/5] Fix tests Signed-off-by: MyonKeminta --- integration_tests/2pc_test.go | 14 ++++--- internal/mockstore/mocktikv/mvcc_leveldb.go | 46 ++++++++++----------- txnkv/transaction/txn.go | 4 +- 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 0a8f60c666..c65e24025e 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1226,6 +1226,7 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { txn.SetPessimistic(true) lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + lockCtx.InitReturnValues(2) s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"), []byte("k2"))) s.NoError(txn.Set([]byte("k5"), []byte("v5"))) s.NoError(txn.Delete([]byte("k6"))) @@ -1282,6 +1283,7 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { func (s *testCommitterSuite) TestAggressiveLockingSwitchPrimary() { txn := s.begin() + txn.SetPessimistic(true) checkPrimary := func(key string, expectedPrimary string) { lockInfo := s.getLockInfo([]byte(key)) s.Equal(kvrpcpb.Op_PessimisticLock, lockInfo.LockType) @@ -1352,6 +1354,7 @@ func (s *testCommitterSuite) TestAggressiveLockingSwitchPrimary() { // Also test the primary-switching logic won't misbehave when the primary is already selected before entering // aggressive locking. txn = s.begin() + txn.SetPessimistic(true) forUpdateTS = txn.StartTS() lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"), []byte("k2"))) @@ -1389,9 +1392,10 @@ func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() { s.NoError(txn0.Set([]byte("k2"), []byte("v2"))) s.NoError(txn0.Commit(context.Background())) - txn := s.begin() - for _, firstAttemptLockedWithConflict := range []bool{false, true} { + txn := s.begin() + txn.SetPessimistic(true) + forUpdateTS := txn.StartTS() txn.StartAggressiveLocking() lockCtx := &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} @@ -1400,7 +1404,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() { if firstAttemptLockedWithConflict { txn2 = s.begin() s.NoError(txn2.Delete([]byte("k1"))) - s.NoError(txn2.Set([]byte("k1"), []byte("v1"))) + s.NoError(txn2.Set([]byte("k2"), []byte("v2"))) s.NoError(txn2.Commit(context.Background())) } @@ -1423,7 +1427,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() { txn.RetryAggressiveLocking(context.Background()) s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) - s.Equal(0, lockCtx.MaxLockedWithConflictTS) + s.Equal(uint64(0), lockCtx.MaxLockedWithConflictTS) s.Equal(false, lockCtx.Values["k1"].Exists) s.Equal(true, lockCtx.Values["k2"].Exists) @@ -1433,7 +1437,7 @@ func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() { txn.RetryAggressiveLocking(context.Background()) s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k1"))) s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k2"))) - s.Equal(0, lockCtx.MaxLockedWithConflictTS) + s.Equal(uint64(0), lockCtx.MaxLockedWithConflictTS) s.Equal(false, lockCtx.Values["k1"].Exists) s.Equal(true, lockCtx.Values["k2"].Exists) s.Equal([]byte("v2"), lockCtx.Values["k2"].Value) diff --git a/internal/mockstore/mocktikv/mvcc_leveldb.go b/internal/mockstore/mocktikv/mvcc_leveldb.go index 82ea4a0d6c..b85e9915b6 100644 --- a/internal/mockstore/mocktikv/mvcc_leveldb.go +++ b/internal/mockstore/mocktikv/mvcc_leveldb.go @@ -650,12 +650,13 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation dec := lockDecoder{ expectKey: mutation.Key, } - ok, err := dec.Decode(iter) + alreadyLocked, err := dec.Decode(iter) if err != nil { return err } - if ok { + if alreadyLocked { if dec.lock.startTS != startTS { + // Locked by another transaction. errDeadlock := mvcc.deadlockDetector.Detect(startTS, dec.lock.startTS, farm.Fingerprint64(mutation.Key)) if errDeadlock != nil { return &ErrDeadlock{ @@ -666,11 +667,12 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation } return dec.lock.lockErr(mutation.Key) } - return nil } - // For pessimisticLockMutation, check the correspond rollback record, there may be rollbackLock + // For pessimisticLockMutation, check the corresponding rollback record, there may be rollbackLock // operation between startTS and forUpdateTS + // It's also possible that the key is already locked by the same transaction. Also do the conflict check to + // provide an idempotent result. val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off, lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock) if err != nil { if conflict, ok := err.(*ErrConflict); lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock && ok { @@ -709,21 +711,23 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation return nil } - lock := mvccLock{ - startTS: startTS, - primary: lctx.primary, - op: kvrpcpb.Op_PessimisticLock, - ttl: lctx.ttl, - forUpdateTS: forUpdateTS, - minCommitTS: lctx.minCommitTs, - } - writeKey := mvccEncode(mutation.Key, lockVer) - writeValue, err := lock.MarshalBinary() - if err != nil { - return err + if !alreadyLocked || dec.lock.forUpdateTS < forUpdateTS { + lock := mvccLock{ + startTS: startTS, + primary: lctx.primary, + op: kvrpcpb.Op_PessimisticLock, + ttl: lctx.ttl, + forUpdateTS: forUpdateTS, + minCommitTS: lctx.minCommitTs, + } + writeKey := mvccEncode(mutation.Key, lockVer) + writeValue, err := lock.MarshalBinary() + if err != nil { + return err + } + batch.Put(writeKey, writeValue) } - batch.Put(writeKey, writeValue) return nil } @@ -899,12 +903,8 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, if dec.value.valueType == typePut || dec.value.valueType == typeLock { if needCheckShouldNotExistForPessimisticLock { - return nil, &ErrAssertionFailed{ - StartTS: startTS, - Key: m.Key, - Assertion: m.Assertion, - ExistingStartTS: dec.value.startTS, - ExistingCommitTS: dec.value.commitTS, + return nil, &ErrKeyAlreadyExist{ + Key: m.Key, } } if needCheckAssertionForPrewerite && m.Assertion == kvrpcpb.Assertion_NotExist { diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 158292d615..3237ddbcaa 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -818,7 +818,9 @@ func (txn *KVTxn) filterAggressiveLockedKeys(lockCtx *tikv.LockCtx, allKeys [][] !txn.mayAggressiveLockingLastLockedKeysExpire() { // We can skip locking it since it's already locked during last attempt to aggressive locking, and // we already have the information that we need. - lockCtx.Values[keyStr] = lastResult.Value + if lockCtx.Values != nil { + lockCtx.Values[keyStr] = lastResult.Value + } txn.aggressiveLockingContext.currentLockedKeys[keyStr] = lastResult continue } From 5b269902669c48f7896552d3bc75444b10ce476f Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Fri, 30 Dec 2022 01:03:20 +0800 Subject: [PATCH 3/5] Fix insert test Signed-off-by: MyonKeminta --- integration_tests/2pc_test.go | 18 ++++++++++-------- internal/mockstore/mocktikv/errors.go | 1 + internal/mockstore/mocktikv/mvcc_leveldb.go | 8 +++++++- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index c65e24025e..5a462af240 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1252,9 +1252,12 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { // Acquiring new locks normally. mustAlreadyExist(insert(nil, "k3")) s.NoError(insert(nil, "k4")) - // Existence buffered in memdb of this transaction. - mustAlreadyExist(insert(nil, "k5")) - s.NoError(insert(nil, "k6")) + // The key added or deleted in the same transaction before entering aggressive locking. + // Since TiDB can detect it before invoking LockKeys, client-go actually didn't handle this case for now (no matter + // if in aggressive locking or not). So skip this test case here, and it can be uncommented if someday client-go + // supports such check. + // mustAlreadyExist(insert(nil, "k5")) + // s.NoError(insert(nil, "k6")) // Locked with conflict and then do pessimistic retry. txn2 := s.begin() @@ -1262,9 +1265,8 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { s.NoError(txn2.Delete([]byte("k8"))) s.NoError(txn2.Commit(context.Background())) lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} - s.NoError(insert(lockCtx, "k7")) - s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS) - s.Equal(txn2.GetCommitTS(), lockCtx.Values["k7"].LockedWithConflictTS) + err := insert(lockCtx, "k7") + s.IsType(errors.Cause(err), &tikverr.ErrWriteConflict{}) lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} s.NoError(insert(lockCtx, "k8")) s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS) @@ -1274,8 +1276,8 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { s.Nil(err) s.GreaterOrEqual(newForUpdateTS, txn2.GetCommitTS()) lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()} - mustAlreadyExist(insert(nil, "k7")) - s.NoError(insert(nil, "k8")) + mustAlreadyExist(insert(lockCtx, "k7")) + s.NoError(insert(lockCtx, "k8")) txn.CancelAggressiveLocking(context.Background()) s.NoError(txn.Rollback()) diff --git a/internal/mockstore/mocktikv/errors.go b/internal/mockstore/mocktikv/errors.go index 0b290b66e5..8fb6748674 100644 --- a/internal/mockstore/mocktikv/errors.go +++ b/internal/mockstore/mocktikv/errors.go @@ -107,6 +107,7 @@ type ErrConflict struct { ConflictTS uint64 ConflictCommitTS uint64 Key []byte + CanForceLock bool } func (e *ErrConflict) Error() string { diff --git a/internal/mockstore/mocktikv/mvcc_leveldb.go b/internal/mockstore/mocktikv/mvcc_leveldb.go index b85e9915b6..6891809b11 100644 --- a/internal/mockstore/mocktikv/mvcc_leveldb.go +++ b/internal/mockstore/mocktikv/mvcc_leveldb.go @@ -675,7 +675,7 @@ func (mvcc *MVCCLevelDB) pessimisticLockMutation(batch *leveldb.Batch, mutation // provide an idempotent result. val, err := checkConflictValue(iter, mutation, forUpdateTS, startTS, true, kvrpcpb.AssertionLevel_Off, lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock) if err != nil { - if conflict, ok := err.(*ErrConflict); lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock && ok { + if conflict, ok := err.(*ErrConflict); lctx.WakeUpMode == kvrpcpb.PessimisticLockWakeUpMode_WakeUpModeForceLock && ok && conflict.CanForceLock { lctx.results = append(lctx.results, &kvrpcpb.PessimisticLockKeyResult{ Type: kvrpcpb.PessimisticLockKeyResultType_LockResultLockedWithConflict, Value: val, @@ -903,6 +903,9 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, if dec.value.valueType == typePut || dec.value.valueType == typeLock { if needCheckShouldNotExistForPessimisticLock { + if writeConflictErr != nil { + return nil, writeConflictErr + } return nil, &ErrKeyAlreadyExist{ Key: m.Key, } @@ -947,6 +950,9 @@ func checkConflictValue(iter *Iterator, m *kvrpcpb.Mutation, forUpdateTS uint64, } // writeConflictErr is not nil only when write conflict is found and `allowLockWithConflict is set to true. + if writeConflictErr != nil { + writeConflictErr.(*ErrConflict).CanForceLock = true + } if getVal { return retVal, writeConflictErr } From db97a0e2f49b0a27aa0d0bdef158455b70f9ddac Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Fri, 30 Dec 2022 01:13:02 +0800 Subject: [PATCH 4/5] Avoid primary re-selecting in TestAggressiveLockingLoadValueOptionChanges Signed-off-by: MyonKeminta --- integration_tests/2pc_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 5a462af240..ed20db558f 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1398,9 +1398,13 @@ func (s *testCommitterSuite) TestAggressiveLockingLoadValueOptionChanges() { txn := s.begin() txn.SetPessimistic(true) + // Make the primary deterministic to avoid the following test code involves primary re-selecting logic. + lockCtx := &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} + s.NoError(txn.LockKeys(context.Background(), lockCtx, []byte("k0"))) + forUpdateTS := txn.StartTS() txn.StartAggressiveLocking() - lockCtx := &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} + lockCtx = &kv.LockCtx{ForUpdateTS: forUpdateTS, WaitStartTime: time.Now()} var txn2 transaction.TxnProbe if firstAttemptLockedWithConflict { From 887922511dbba83d2ebe17ce56526833d2037921 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 3 Jan 2023 13:01:42 +0800 Subject: [PATCH 5/5] Address comments Signed-off-by: MyonKeminta --- integration_tests/2pc_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index ed20db558f..96e402f3c1 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1231,7 +1231,7 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { s.NoError(txn.Set([]byte("k5"), []byte("v5"))) s.NoError(txn.Delete([]byte("k6"))) - insert := func(lockCtx *kv.LockCtx, key string) error { + insertPessimisticLock := func(lockCtx *kv.LockCtx, key string) error { txn.GetMemBuffer().UpdateFlags([]byte(key), kv.SetPresumeKeyNotExists) if lockCtx == nil { lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} @@ -1247,17 +1247,17 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { txn.StartAggressiveLocking() // Already-locked before aggressive locking. - mustAlreadyExist(insert(nil, "k1")) - s.NoError(insert(nil, "k2")) + mustAlreadyExist(insertPessimisticLock(nil, "k1")) + s.NoError(insertPessimisticLock(nil, "k2")) // Acquiring new locks normally. - mustAlreadyExist(insert(nil, "k3")) - s.NoError(insert(nil, "k4")) + mustAlreadyExist(insertPessimisticLock(nil, "k3")) + s.NoError(insertPessimisticLock(nil, "k4")) // The key added or deleted in the same transaction before entering aggressive locking. // Since TiDB can detect it before invoking LockKeys, client-go actually didn't handle this case for now (no matter // if in aggressive locking or not). So skip this test case here, and it can be uncommented if someday client-go // supports such check. - // mustAlreadyExist(insert(nil, "k5")) - // s.NoError(insert(nil, "k6")) + // mustAlreadyExist(insertPessimisticLock(nil, "k5")) + // s.NoError(insertPessimisticLock(nil, "k6")) // Locked with conflict and then do pessimistic retry. txn2 := s.begin() @@ -1265,10 +1265,10 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { s.NoError(txn2.Delete([]byte("k8"))) s.NoError(txn2.Commit(context.Background())) lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} - err := insert(lockCtx, "k7") + err := insertPessimisticLock(lockCtx, "k7") s.IsType(errors.Cause(err), &tikverr.ErrWriteConflict{}) lockCtx = &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()} - s.NoError(insert(lockCtx, "k8")) + s.NoError(insertPessimisticLock(lockCtx, "k8")) s.Equal(txn2.GetCommitTS(), lockCtx.MaxLockedWithConflictTS) s.Equal(txn2.GetCommitTS(), lockCtx.Values["k8"].LockedWithConflictTS) // Update forUpdateTS to simulate a pessimistic retry. @@ -1276,8 +1276,8 @@ func (s *testCommitterSuite) TestAggressiveLockingInsert() { s.Nil(err) s.GreaterOrEqual(newForUpdateTS, txn2.GetCommitTS()) lockCtx = &kv.LockCtx{ForUpdateTS: newForUpdateTS, WaitStartTime: time.Now()} - mustAlreadyExist(insert(lockCtx, "k7")) - s.NoError(insert(lockCtx, "k8")) + mustAlreadyExist(insertPessimisticLock(lockCtx, "k7")) + s.NoError(insertPessimisticLock(lockCtx, "k8")) txn.CancelAggressiveLocking(context.Background()) s.NoError(txn.Rollback())