From c014c72d64d09c03e14aa1f9adcfb1a467fefb1b Mon Sep 17 00:00:00 2001 From: zyguan Date: Mon, 27 Mar 2023 23:07:42 +0800 Subject: [PATCH 1/3] txnkv: add new API for lock->put optimization Signed-off-by: zyguan --- integration_tests/2pc_test.go | 90 ++++++++++++++++++++++++++++++++++- txnkv/transaction/2pc.go | 46 ++++++++++++++---- txnkv/transaction/txn.go | 26 +++++++++- 3 files changed, 150 insertions(+), 12 deletions(-) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index f3e91e0d77..72f9587973 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -42,6 +42,7 @@ import ( "fmt" "math" "math/rand" + "strconv" "sync" "sync/atomic" "testing" @@ -1731,7 +1732,7 @@ func (s *testCommitterSuite) TestFlagsInMemBufferMutations() { forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) { handle := db.IterWithFlags(key, nil).Handle() - mutations.Push(op, isPessimisticLock, assertExist, assertNotExist, handle) + mutations.Push(op, isPessimisticLock, assertExist, assertNotExist, handle, nil) }) forEachCase(func(op kvrpcpb.Op, key []byte, value []byte, i int, isPessimisticLock, assertExist, assertNotExist bool) { @@ -1742,3 +1743,90 @@ func (s *testCommitterSuite) TestFlagsInMemBufferMutations() { s.Equal(assertNotExist, mutations.IsAssertNotExist(i)) }) } + +func (s *testCommitterSuite) TestSetLockedKeyValue() { + ctx := context.Background() + k1 := []byte("k1") + v1 := []byte("v1") + v2 := []byte("v2") + + mustLockKey := func(txn transaction.TxnProbe, key []byte) { + s.Require().NoError(txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}, key)) + } + mutOpVals := func(opVals ...interface{}) func(m transaction.CommitterMutations) { + s.Require().Equal(0, len(opVals)%2) + return func(m transaction.CommitterMutations) { + s.Require().Equal(m.Len(), len(opVals)/2) + for i := 0; i < len(opVals); i += 2 { + s.Require().Equal(opVals[i], m.GetOp(0)) + s.Require().Equal(opVals[i+1], m.GetValue(0)) + } + } + } + + for _, tt := range []struct { + name string + actions []func(txn transaction.TxnProbe) + check func(m transaction.CommitterMutations) + }{ + { + "NoLock", + []func(txn transaction.TxnProbe){ + func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) }, + }, + mutOpVals(), + }, + { + "LockOnly", + []func(txn transaction.TxnProbe){ + func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) }, + func(txn transaction.TxnProbe) { mustLockKey(txn, k1) }, + }, + mutOpVals(kvrpcpb.Op_Put, v1), + }, + { + "LockAndSet", + []func(txn transaction.TxnProbe){ + func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) }, + func(txn transaction.TxnProbe) { mustLockKey(txn, k1) }, + func(txn transaction.TxnProbe) { s.Require().NoError(txn.Set(k1, v2)) }, + }, + mutOpVals(kvrpcpb.Op_Put, v2), + }, + { + "LockAndDelete", + []func(txn transaction.TxnProbe){ + func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) }, + func(txn transaction.TxnProbe) { mustLockKey(txn, k1) }, + func(txn transaction.TxnProbe) { s.Require().NoError(txn.Delete(k1)) }, + }, + mutOpVals(kvrpcpb.Op_Del, []byte{}), + }, + } { + var testAll func(name string, state []bool, actions []func(txn transaction.TxnProbe)) + testAll = func(name string, state []bool, actions []func(txn transaction.TxnProbe)) { + if len(actions) == len(tt.actions) { + s.Run(name, func() { + txn := s.begin() + for _, action := range actions { + action(txn) + } + c, err := txn.NewCommitter(1) + s.Require().NoError(err) + tt.check(c.GetMutations()) + s.Require().NoError(txn.Rollback()) + }) + return + } + for i, used := range state { + if used { + continue + } + state[i] = true + testAll(name+"-"+strconv.Itoa(i), state, append(actions, tt.actions[i])) + state[i] = false + } + } + testAll(tt.name, make([]bool, len(tt.actions)), nil) + } +} diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 04231c413e..8713dc9b82 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -185,6 +185,8 @@ type memBufferMutations struct { // MSB LSB // [13 bits: Op][1 bit: assertNotExist][1 bit: assertExist][1 bit: isPessimisticLock] handles []unionstore.MemKeyHandle + // overlay of mutation values + overlay map[unionstore.MemKeyHandle][]byte } func newMemBufferMutations(sizeHint int, storage *unionstore.MemDB) *memBufferMutations { @@ -211,7 +213,13 @@ func (m *memBufferMutations) GetKeys() [][]byte { } func (m *memBufferMutations) GetValue(i int) []byte { - v, _ := m.storage.GetValueByHandle(m.handles[i]) + h := m.handles[i] + if m.overlay != nil { + if v, ok := m.overlay[h]; ok { + return v + } + } + v, _ := m.storage.GetValueByHandle(h) return v } @@ -235,10 +243,11 @@ func (m *memBufferMutations) Slice(from, to int) CommitterMutations { return &memBufferMutations{ handles: m.handles[from:to], storage: m.storage, + overlay: m.overlay, } } -func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist bool, handle unionstore.MemKeyHandle) { +func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, assertNotExist bool, handle unionstore.MemKeyHandle, value []byte) { // See comments of `m.handles` field about the format of the user data `aux`. aux := uint16(op) << 3 if isPessimisticLock { @@ -252,6 +261,12 @@ func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, } handle.UserData = aux m.handles = append(m.handles, handle) + if value != nil { + if m.overlay == nil { + m.overlay = make(map[unionstore.MemKeyHandle][]byte) + } + m.overlay[handle] = value + } } // CommitterMutationFlags represents various bit flags of mutations. @@ -493,7 +508,7 @@ func (c *twoPhaseCommitter) checkSchemaOnAssertionFail(ctx context.Context, asse } func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { - var size, putCnt, delCnt, lockCnt, checkCnt int + var size, putCnt, delCnt, lockCnt, checkCnt, putFromLockCnt int txn := c.txn memBuf := txn.GetMemBuffer() @@ -508,15 +523,25 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { _ = err key := it.Key() flags := it.Flags() - var value []byte - var op kvrpcpb.Op + var ( + value []byte + cachedValue []byte + op kvrpcpb.Op + ) if !it.HasValue() { if !flags.HasLocked() { continue } - op = kvrpcpb.Op_Lock - lockCnt++ + if val, ok := txn.getValueByLockedKey(key); ok { + // Change the LOCK into PUT if the value of this key has a cached value. + cachedValue = val + op = kvrpcpb.Op_Put + putFromLockCnt++ + } else { + op = kvrpcpb.Op_Lock + lockCnt++ + } } else { value = it.Value() var isUnnecessaryKV bool @@ -581,8 +606,8 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { if c.txn.schemaAmender != nil || c.txn.assertionLevel == kvrpcpb.AssertionLevel_Off { mustExist, mustNotExist, hasAssertUnknown = false, false, false } - c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, it.Handle()) - size += len(key) + len(value) + c.mutations.Push(op, isPessimistic, mustExist, mustNotExist, it.Handle(), cachedValue) + size += len(key) + len(value) + len(cachedValue) if c.txn.assertionLevel != kvrpcpb.AssertionLevel_Off { // Check mutations for pessimistic-locked keys with the read results of pessimistic lock requests. @@ -635,6 +660,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { zap.Int("dels", delCnt), zap.Int("locks", lockCnt), zap.Int("checks", checkCnt), + zap.Int("putsFromLocks", putFromLockCnt), zap.Uint64("txnStartTS", txn.startTS)) } @@ -1758,7 +1784,7 @@ func (c *twoPhaseCommitter) tryAmendTxn(ctx context.Context, startInfoSchema Sch return false, err } handle := c.txn.GetMemBuffer().IterWithFlags(key, nil).Handle() - c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), handle) + c.mutations.Push(op, addMutations.IsPessimisticLock(i), addMutations.IsAssertExists(i), addMutations.IsAssertNotExist(i), handle, nil) } } return false, nil diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 22da179423..a03ce0214e 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -85,11 +85,14 @@ type KVTxn struct { startTS uint64 startTime time.Time // Monotonic timestamp for recording txn time consuming. commitTS uint64 - mu sync.Mutex // For thread-safe LockKeys function. + mu sync.Mutex // For thread-safe LockKeys, SetLockedKeyValue functions. setCnt int64 vars *tikv.Variables committer *twoPhaseCommitter lockedCnt int + // lockedKV is used to cache kv pairs that have been locked, the 2pc committer will read this map when init + // mutations, convert lock into put if needed. + lockedKVs map[string][]byte valid bool @@ -749,6 +752,27 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput return nil } +// SetLockedKeyValue caches a key-value pair whose key has been locked. Those key-value pairs may be turned to PUT +// record if possible. +func (txn *KVTxn) SetLockedKeyValue(key []byte, value []byte) { + txn.mu.Lock() + if txn.lockedKVs == nil { + txn.lockedKVs = make(map[string][]byte) + } + txn.lockedKVs[string(key)] = value + txn.mu.Unlock() +} + +// getValueByLockedKey returns the cached value of the given locked key. +func (txn *KVTxn) getValueByLockedKey(key []byte) (value []byte, ok bool) { + txn.mu.Lock() + if txn.lockedKVs != nil { + value, ok = txn.lockedKVs[string(key)] + } + txn.mu.Unlock() + return +} + // deduplicateKeys deduplicate the keys, it use sort instead of map to avoid memory allocation. func deduplicateKeys(keys [][]byte) [][]byte { sort.Slice(keys, func(i, j int) bool { From b3d48569616286fc9c230724ba275926c2485664 Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 28 Mar 2023 12:49:49 +0800 Subject: [PATCH 2/3] address the comment about check function name Signed-off-by: zyguan --- integration_tests/2pc_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index 72f9587973..b31be9c9b5 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1753,7 +1753,7 @@ func (s *testCommitterSuite) TestSetLockedKeyValue() { mustLockKey := func(txn transaction.TxnProbe, key []byte) { s.Require().NoError(txn.LockKeys(ctx, &kv.LockCtx{ForUpdateTS: txn.StartTS(), WaitStartTime: time.Now()}, key)) } - mutOpVals := func(opVals ...interface{}) func(m transaction.CommitterMutations) { + checkByOpVals := func(opVals ...interface{}) func(m transaction.CommitterMutations) { s.Require().Equal(0, len(opVals)%2) return func(m transaction.CommitterMutations) { s.Require().Equal(m.Len(), len(opVals)/2) @@ -1774,7 +1774,7 @@ func (s *testCommitterSuite) TestSetLockedKeyValue() { []func(txn transaction.TxnProbe){ func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) }, }, - mutOpVals(), + checkByOpVals(), }, { "LockOnly", @@ -1782,7 +1782,7 @@ func (s *testCommitterSuite) TestSetLockedKeyValue() { func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) }, func(txn transaction.TxnProbe) { mustLockKey(txn, k1) }, }, - mutOpVals(kvrpcpb.Op_Put, v1), + checkByOpVals(kvrpcpb.Op_Put, v1), }, { "LockAndSet", @@ -1791,7 +1791,7 @@ func (s *testCommitterSuite) TestSetLockedKeyValue() { func(txn transaction.TxnProbe) { mustLockKey(txn, k1) }, func(txn transaction.TxnProbe) { s.Require().NoError(txn.Set(k1, v2)) }, }, - mutOpVals(kvrpcpb.Op_Put, v2), + checkByOpVals(kvrpcpb.Op_Put, v2), }, { "LockAndDelete", @@ -1800,7 +1800,7 @@ func (s *testCommitterSuite) TestSetLockedKeyValue() { func(txn transaction.TxnProbe) { mustLockKey(txn, k1) }, func(txn transaction.TxnProbe) { s.Require().NoError(txn.Delete(k1)) }, }, - mutOpVals(kvrpcpb.Op_Del, []byte{}), + checkByOpVals(kvrpcpb.Op_Del, []byte{}), }, } { var testAll func(name string, state []bool, actions []func(txn transaction.TxnProbe)) From bf68304756968eba2b7b504ecc14fb54163aeaa9 Mon Sep 17 00:00:00 2001 From: zyguan Date: Tue, 28 Mar 2023 15:16:00 +0800 Subject: [PATCH 3/3] address comments Signed-off-by: zyguan --- integration_tests/2pc_test.go | 32 ++++++++++++++++++++++++++------ txnkv/transaction/2pc.go | 12 +++++++++--- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/integration_tests/2pc_test.go b/integration_tests/2pc_test.go index b31be9c9b5..61f105f6ac 100644 --- a/integration_tests/2pc_test.go +++ b/integration_tests/2pc_test.go @@ -1759,15 +1759,20 @@ func (s *testCommitterSuite) TestSetLockedKeyValue() { s.Require().Equal(m.Len(), len(opVals)/2) for i := 0; i < len(opVals); i += 2 { s.Require().Equal(opVals[i], m.GetOp(0)) - s.Require().Equal(opVals[i+1], m.GetValue(0)) + if opVals[i+1] == nil { + s.Require().Nil(m.GetValue(0)) + } else { + s.Require().Equal(opVals[i+1], m.GetValue(0)) + } } } } for _, tt := range []struct { - name string - actions []func(txn transaction.TxnProbe) - check func(m transaction.CommitterMutations) + name string + actions []func(txn transaction.TxnProbe) + checkPessimistic func(m transaction.CommitterMutations) + checkOptimisitc func(m transaction.CommitterMutations) }{ { "NoLock", @@ -1775,6 +1780,7 @@ func (s *testCommitterSuite) TestSetLockedKeyValue() { func(txn transaction.TxnProbe) { txn.SetLockedKeyValue(k1, v1) }, }, checkByOpVals(), + checkByOpVals(), }, { "LockOnly", @@ -1783,6 +1789,7 @@ func (s *testCommitterSuite) TestSetLockedKeyValue() { func(txn transaction.TxnProbe) { mustLockKey(txn, k1) }, }, checkByOpVals(kvrpcpb.Op_Put, v1), + checkByOpVals(kvrpcpb.Op_Lock, nil), }, { "LockAndSet", @@ -1792,6 +1799,7 @@ func (s *testCommitterSuite) TestSetLockedKeyValue() { func(txn transaction.TxnProbe) { s.Require().NoError(txn.Set(k1, v2)) }, }, checkByOpVals(kvrpcpb.Op_Put, v2), + checkByOpVals(kvrpcpb.Op_Put, v2), }, { "LockAndDelete", @@ -1801,19 +1809,31 @@ func (s *testCommitterSuite) TestSetLockedKeyValue() { func(txn transaction.TxnProbe) { s.Require().NoError(txn.Delete(k1)) }, }, checkByOpVals(kvrpcpb.Op_Del, []byte{}), + checkByOpVals(kvrpcpb.Op_Del, []byte{}), }, } { var testAll func(name string, state []bool, actions []func(txn transaction.TxnProbe)) testAll = func(name string, state []bool, actions []func(txn transaction.TxnProbe)) { if len(actions) == len(tt.actions) { - s.Run(name, func() { + s.Run("Pessimistic"+name, func() { + txn := s.begin() + txn.SetPessimistic(true) + for _, action := range actions { + action(txn) + } + c, err := txn.NewCommitter(1) + s.Require().NoError(err) + tt.checkPessimistic(c.GetMutations()) + s.Require().NoError(txn.Rollback()) + }) + s.Run("Optimistic"+name, func() { txn := s.begin() for _, action := range actions { action(txn) } c, err := txn.NewCommitter(1) s.Require().NoError(err) - tt.check(c.GetMutations()) + tt.checkOptimisitc(c.GetMutations()) s.Require().NoError(txn.Rollback()) }) return diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 8713dc9b82..d6108130fa 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -261,7 +261,13 @@ func (m *memBufferMutations) Push(op kvrpcpb.Op, isPessimisticLock, assertExist, } handle.UserData = aux m.handles = append(m.handles, handle) - if value != nil { + if len(value) > 0 { + if op != kvrpcpb.Op_Put { + panic("op must be PUT when pushing with value") + } + if !isPessimisticLock { + panic("key must be locked when pushing with value") + } if m.overlay == nil { m.overlay = make(map[unionstore.MemKeyHandle][]byte) } @@ -525,7 +531,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { flags := it.Flags() var ( value []byte - cachedValue []byte + cachedValue []byte = nil op kvrpcpb.Op ) @@ -533,7 +539,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { if !flags.HasLocked() { continue } - if val, ok := txn.getValueByLockedKey(key); ok { + if val, ok := txn.getValueByLockedKey(key); ok && len(val) > 0 && c.isPessimistic { // Change the LOCK into PUT if the value of this key has a cached value. cachedValue = val op = kvrpcpb.Op_Put