Skip to content

Commit

Permalink
txn: record resolving locks (#473)
Browse files Browse the repository at this point in the history
* record resolving locks

Signed-off-by: longfangsong <longfangsong@icloud.com>

* rename to ResolvingLock

Signed-off-by: longfangsong <longfangsong@icloud.com>

* Add test probe for resolving

Signed-off-by: longfangsong <longfangsong@icloud.com>

* refactor according to comments

Signed-off-by: longfangsong <longfangsong@icloud.com>

* Add  ResolveLocksDone for other call of ResolveLocks

Signed-off-by: longfangsong <longfangsong@icloud.com>

* Update txnkv/txnlock/lock_resolver.go

Co-authored-by: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com>
Signed-off-by: longfangsong <longfangsong@icloud.com>

* Update to new design

Signed-off-by: longfangsong <longfangsong@icloud.com>

Co-authored-by: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com>
Co-authored-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
3 people authored May 17, 2022
1 parent 79f010d commit 7ba562b
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 3 deletions.
8 changes: 8 additions & 0 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest))
}
lockWaitStartTime := action.WaitStartTime
var resolvingRecordToken *int
for {
// if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit
if action.LockWaitTime() > 0 && action.LockWaitTime() != kv.LockAlwaysWait {
Expand Down Expand Up @@ -226,6 +227,13 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
startTime = time.Now()
if resolvingRecordToken == nil {
token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS)
resolvingRecordToken = &token
defer c.store.GetLockResolver().ResolveLocksDone(c.startTS, *resolvingRecordToken)
} else {
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken)
}
msBeforeTxnExpired, err := c.store.GetLockResolver().ResolveLocks(bo, 0, locks)
if err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B

req := c.buildPrewriteRequest(batch, txnSize)
sender := locate.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient())
var resolvingRecordToken *int
defer func() {
if err != nil {
// If we fail to receive response for async commit prewrite, it will be undetermined whether this
Expand Down Expand Up @@ -379,6 +380,13 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B
locks = append(locks, lock)
}
start := time.Now()
if resolvingRecordToken == nil {
token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS)
resolvingRecordToken = &token
defer c.store.GetLockResolver().ResolveLocksDone(c.startTS, *resolvingRecordToken)
} else {
c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken)
}
msBeforeExpired, err := c.store.GetLockResolver().ResolveLocks(bo, c.startTS, locks)
if err != nil {
return err
Expand Down
76 changes: 76 additions & 0 deletions txnkv/txnlock/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ type LockResolver struct {
resolveLockLiteThreshold uint64
mu struct {
sync.RWMutex
// These two fields is used to tracking lock resolving information
// currentStartTS -> caller token -> resolving locks
resolving map[uint64][][]Lock
// currentStartTS -> concurrency resolving lock process in progress
// use concurrency counting here to speed up checking
// whether we can free the resource used in `resolving`
resolvingConcurrency map[uint64]int
// resolved caches resolved txns (FIFO, txn id -> txnStatus).
resolved map[uint64]TxnStatus
recentResolved *list.List
Expand All @@ -76,13 +83,23 @@ type LockResolver struct {
asyncResolveCancel func()
}

// ResolvingLock stands for current resolving locks' information
type ResolvingLock struct {
TxnID uint64
LockTxnID uint64
Key []byte
Primary []byte
}

// NewLockResolver creates a new LockResolver instance.
func NewLockResolver(store storage) *LockResolver {
r := &LockResolver{
store: store,
resolveLockLiteThreshold: config.GetGlobalConfig().TiKVClient.ResolveLockLiteThreshold,
}
r.mu.resolved = make(map[uint64]TxnStatus)
r.mu.resolving = make(map[uint64][][]Lock)
r.mu.resolvingConcurrency = make(map[uint64]int)
r.mu.recentResolved = list.New()
r.asyncResolveCtx, r.asyncResolveCancel = context.WithCancel(context.Background())
return r
Expand Down Expand Up @@ -322,6 +339,45 @@ func (lr *LockResolver) ResolveLocksForRead(bo *retry.Backoffer, callerStartTS u
return lr.resolveLocks(bo, callerStartTS, locks, true, lite)
}

// RecordResolvingLocks records a txn which startTS is callerStartTS tries to resolve locks
// Call this when start trying to resolve locks
// Return a token which is used to call ResolvingLocksDone
func (lr *LockResolver) RecordResolvingLocks(locks []*Lock, callerStartTS uint64) int {
resolving := make([]Lock, 0, len(locks))
for _, lock := range locks {
resolving = append(resolving, *lock)
}
lr.mu.Lock()
lr.mu.resolvingConcurrency[callerStartTS]++
token := len(lr.mu.resolving[callerStartTS])
lr.mu.resolving[callerStartTS] = append(lr.mu.resolving[callerStartTS], resolving)
lr.mu.Unlock()
return token
}

// UpdateResolvingLocks update the lock resoling information of the txn `callerStartTS`
func (lr *LockResolver) UpdateResolvingLocks(locks []*Lock, callerStartTS uint64, token int) {
resolving := make([]Lock, 0, len(locks))
for _, lock := range locks {
resolving = append(resolving, *lock)
}
lr.mu.Lock()
lr.mu.resolving[callerStartTS][token] = resolving
lr.mu.Unlock()
}

// ResolveLocksDone will remove resolving locks information related with callerStartTS
func (lr *LockResolver) ResolveLocksDone(callerStartTS uint64, token int) {
lr.mu.Lock()
lr.mu.resolving[callerStartTS] = nil
lr.mu.resolvingConcurrency[callerStartTS]--
if lr.mu.resolvingConcurrency[callerStartTS] == 0 {
delete(lr.mu.resolving, callerStartTS)
delete(lr.mu.resolvingConcurrency, callerStartTS)
}
lr.mu.Unlock()
}

func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, forRead bool, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) {
if lr.testingKnobs.meetLock != nil {
lr.testingKnobs.meetLock(locks)
Expand Down Expand Up @@ -425,6 +481,26 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64,
return msBeforeTxnExpired.value(), canIgnore, canAccess, nil
}

// Resolving returns the locks' information we are resolving currently.
func (lr *LockResolver) Resolving() []ResolvingLock {
result := []ResolvingLock{}
lr.mu.RLock()
defer lr.mu.RUnlock()
for txnID, items := range lr.mu.resolving {
for _, item := range items {
for _, lock := range item {
result = append(result, ResolvingLock{
TxnID: txnID,
LockTxnID: lock.TxnID,
Key: lock.Key,
Primary: lock.Primary,
})
}
}
}
return result
}

type txnExpireTime struct {
initialized bool
txnExpire int64
Expand Down
7 changes: 7 additions & 0 deletions txnkv/txnlock/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,10 @@ func (l LockResolverProbe) IsNonAsyncCommitLock(err error) bool {
_, ok := errors.Cause(err).(*nonAsyncCommitLock)
return ok
}

// SetResolving set the resolving lock status for LockResolver
func (l LockResolverProbe) SetResolving(currentStartTS uint64, locks []Lock) {
l.mu.Lock()
defer l.mu.Unlock()
l.mu.resolving[currentStartTS] = append(l.mu.resolving[currentStartTS], locks)
}
15 changes: 15 additions & 0 deletions txnkv/txnsnapshot/client_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,21 @@ func (ch *ClientHelper) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64,
return msBeforeTxnExpired, nil
}

// UpdateResolvingLocks wraps the UpdateResolvingLocks function
func (ch *ClientHelper) UpdateResolvingLocks(locks []*txnlock.Lock, callerStartTS uint64, token int) {
ch.lockResolver.UpdateResolvingLocks(locks, callerStartTS, token)
}

// RecordResolvingLocks wraps the RecordResolvingLocks function
func (ch *ClientHelper) RecordResolvingLocks(locks []*txnlock.Lock, callerStartTS uint64) int {
return ch.lockResolver.RecordResolvingLocks(locks, callerStartTS)
}

// ResolveLocksDone wraps the ResolveLocksDone function
func (ch *ClientHelper) ResolveLocksDone(callerStartTS uint64, token int) {
ch.lockResolver.ResolveLocksDone(callerStartTS, token)
}

// SendReqCtx wraps the SendReqCtx function and use the resolved lock result in the kvrpcpb.Context.
func (ch *ClientHelper) SendReqCtx(bo *retry.Backoffer, req *tikvrpc.Request, regionID locate.RegionVerID, timeout time.Duration, et tikvrpc.EndpointType, directStoreAddr string, opts ...locate.StoreSelectorOption) (*tikvrpc.Response, *locate.RPCContext, string, error) {
sender := locate.NewRegionRequestSender(ch.regionCache, ch.client)
Expand Down
11 changes: 10 additions & 1 deletion txnkv/txnsnapshot/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
sender := locate.NewRegionRequestSender(s.snapshot.store.GetRegionCache(), s.snapshot.store.GetTiKVClient())
var reqEndKey, reqStartKey []byte
var loc *locate.KeyLocation
var resolvingRecordToken *int
var err error
for {
if !s.reverse {
Expand Down Expand Up @@ -293,7 +294,15 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
if err != nil {
return err
}
msBeforeExpired, err := txnlock.NewLockResolver(s.snapshot.store).ResolveLocks(bo, s.snapshot.version, []*txnlock.Lock{lock})
locks := []*txnlock.Lock{lock}
if resolvingRecordToken == nil {
token := s.snapshot.store.GetLockResolver().RecordResolvingLocks(locks, s.snapshot.version)
resolvingRecordToken = &token
defer s.snapshot.store.GetLockResolver().ResolveLocksDone(s.snapshot.version, *resolvingRecordToken)
} else {
s.snapshot.store.GetLockResolver().UpdateResolvingLocks(locks, s.snapshot.version, *resolvingRecordToken)
}
msBeforeExpired, err := s.snapshot.store.GetLockResolver().ResolveLocks(bo, s.snapshot.version, locks)
if err != nil {
return err
}
Expand Down
20 changes: 18 additions & 2 deletions txnkv/txnsnapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
s.mu.RUnlock()

pending := batch.keys
var resolvingRecordToken *int
for {
s.mu.RLock()
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &kvrpcpb.BatchGetRequest{
Expand Down Expand Up @@ -450,6 +451,13 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
s.mergeExecDetail(batchGetResp.ExecDetailsV2)
}
if len(lockedKeys) > 0 {
if resolvingRecordToken == nil {
token := cli.RecordResolvingLocks(locks, s.version)
resolvingRecordToken = &token
defer cli.ResolveLocksDone(s.version, *resolvingRecordToken)
} else {
cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken)
}
msBeforeExpired, err := cli.ResolveLocks(bo, s.version, locks)
if err != nil {
return err
Expand Down Expand Up @@ -565,6 +573,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
}

var firstLock *txnlock.Lock
var resolvingRecordToken *int
for {
util.EvalFailpoint("beforeSendPointGet")
loc, err := s.store.GetRegionCache().LocateKey(bo, k)
Expand Down Expand Up @@ -617,8 +626,15 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
cli.resolvedLocks.Put(lock.TxnID)
continue
}

msBeforeExpired, err := cli.ResolveLocks(bo, s.version, []*txnlock.Lock{lock})
locks := []*txnlock.Lock{lock}
if resolvingRecordToken == nil {
token := cli.RecordResolvingLocks(locks, s.version)
resolvingRecordToken = &token
defer cli.ResolveLocksDone(s.version, *resolvingRecordToken)
} else {
cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken)
}
msBeforeExpired, err := cli.ResolveLocks(bo, s.version, locks)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 7ba562b

Please sign in to comment.