diff --git a/DEPS.bzl b/DEPS.bzl index 186347a3af101..2bf1ae8ba7731 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3519,8 +3519,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s=", - version = "v2.0.3", + sum = "h1:AyFCezjYx4KYXuyPM5o38lYqh5UdR0OM36UEocOx+Hs=", + version = "v2.0.4-0.20230131081004-cd83d1507d70", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/go.mod b/go.mod index 8babb1b852f2a..e3272df4421ad 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.3 + github.com/tikv/client-go/v2 v2.0.4-0.20230131081004-cd83d1507d70 github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index 81137a64e2707..2c6fa8d39095d 100644 --- a/go.sum +++ b/go.sum @@ -930,8 +930,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= -github.com/tikv/client-go/v2 v2.0.3 h1:/glZOHs/K2pkCioDVae+aThUHFYRYQkEgY4NUTgfh+s= -github.com/tikv/client-go/v2 v2.0.3/go.mod h1:MDT4J9LzgS7Bj1DnEq6Gk/puy6mp8TgUC92zGEVVLLg= +github.com/tikv/client-go/v2 v2.0.4-0.20230131081004-cd83d1507d70 h1:AyFCezjYx4KYXuyPM5o38lYqh5UdR0OM36UEocOx+Hs= +github.com/tikv/client-go/v2 v2.0.4-0.20230131081004-cd83d1507d70/go.mod h1:MDT4J9LzgS7Bj1DnEq6Gk/puy6mp8TgUC92zGEVVLLg= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 164e777c6ef4a..561c0aa12baaf 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -52,6 +52,13 @@ func (t *mockTxn) LockKeys(_ context.Context, _ *LockCtx, _ ...Key) error { return nil } +func (t *mockTxn) LockKeysFunc(_ context.Context, _ *LockCtx, fn func(), _ ...Key) error { + if fn != nil { + fn() + } + return nil +} + func (t *mockTxn) SetOption(opt int, val interface{}) { t.opts[opt] = val } diff --git a/kv/kv.go b/kv/kv.go index 38243aa13db08..4c855c0938308 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -222,6 +222,10 @@ type Transaction interface { // LockKeys tries to lock the entries with the keys in KV store. // Will block until all keys are locked successfully or an error occurs. LockKeys(ctx context.Context, lockCtx *LockCtx, keys ...Key) error + // LockKeysFunc tries to lock the entries with the keys in KV store. + // Will block until all keys are locked successfully or an error occurs. + // fn is called before LockKeys unlocks the keys. + LockKeysFunc(ctx context.Context, lockCtx *LockCtx, fn func(), keys ...Key) error // SetOption sets an option with a value, when val is nil, uses the default // value of this option. SetOption(opt int, val interface{}) diff --git a/session/txn.go b/session/txn.go index 85f77f8078679..552f81e88fc77 100644 --- a/session/txn.go +++ b/session/txn.go @@ -428,6 +428,11 @@ func (txn *LazyTxn) RollbackMemDBToCheckpoint(savepoint *tikv.MemDBCheckpoint) { // LockKeys Wrap the inner transaction's `LockKeys` to record the status func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error { + return txn.LockKeysFunc(ctx, lockCtx, nil, keys...) +} + +// LockKeysFunc Wrap the inner transaction's `LockKeys` to record the status +func (txn *LazyTxn) LockKeysFunc(ctx context.Context, lockCtx *kv.LockCtx, fn func(), keys ...kv.Key) error { failpoint.Inject("beforeLockKeys", func() {}) t := time.Now() @@ -438,15 +443,17 @@ func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...k txn.mu.TxnInfo.BlockStartTime.Valid = true txn.mu.TxnInfo.BlockStartTime.Time = t txn.mu.Unlock() - - err := txn.Transaction.LockKeys(ctx, lockCtx, keys...) - - txn.mu.Lock() - defer txn.mu.Unlock() - txn.updateState(originState) - txn.mu.TxnInfo.BlockStartTime.Valid = false - txn.mu.TxnInfo.EntriesCount = uint64(txn.Transaction.Len()) - return err + lockFunc := func() { + if fn != nil { + fn() + } + txn.mu.Lock() + defer txn.mu.Unlock() + txn.updateState(originState) + txn.mu.TxnInfo.BlockStartTime.Valid = false + txn.mu.TxnInfo.EntriesCount = uint64(txn.Transaction.Len()) + } + return txn.Transaction.LockKeysFunc(ctx, lockCtx, lockFunc, keys...) } func (txn *LazyTxn) reset() { diff --git a/sessionctx/BUILD.bazel b/sessionctx/BUILD.bazel index 6cd2317cf8f01..800001fd426b3 100644 --- a/sessionctx/BUILD.bazel +++ b/sessionctx/BUILD.bazel @@ -33,6 +33,7 @@ go_test( ], embed = [":sessionctx"], flaky = True, + race = "on", deps = [ "//testkit/testsetup", "@com_github_stretchr_testify//require", diff --git a/sessiontxn/BUILD.bazel b/sessiontxn/BUILD.bazel index e484defb5b0c1..a92e5a81dd92e 100644 --- a/sessiontxn/BUILD.bazel +++ b/sessiontxn/BUILD.bazel @@ -27,6 +27,8 @@ go_test( "txn_rc_tso_optimize_test.go", ], flaky = True, + race = "on", + shard_count = 2, deps = [ ":sessiontxn", "//domain", diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index b18b6d0db1f33..8013c4b1cad63 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -76,6 +76,12 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput return txn.extractKeyErr(err) } +func (txn *tikvTxn) LockKeysFunc(ctx context.Context, lockCtx *kv.LockCtx, fn func(), keysInput ...kv.Key) error { + keys := toTiKVKeys(keysInput) + err := txn.KVTxn.LockKeysFunc(ctx, lockCtx, fn, keys...) + return txn.extractKeyErr(err) +} + func (txn *tikvTxn) Commit(ctx context.Context) error { err := txn.KVTxn.Commit(ctx) return txn.extractKeyErr(err)