Skip to content

Commit

Permalink
executor,session: reimplement lock->put by SetLockedKeyValue (pingc…
Browse files Browse the repository at this point in the history
…ap#42642)

ref pingcap#28011

Signed-off-by: zyguan <zhongyangguan@gmail.com>
  • Loading branch information
zyguan committed Mar 29, 2023
1 parent ef9330e commit 483ffd9
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 16 deletions.
19 changes: 10 additions & 9 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,15 +450,16 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
membuf := e.txn.GetMemBuffer()
for _, idxKey := range indexKeys {
handleVal := handleVals[string(idxKey)]
if len(handleVal) == 0 {
continue
}
err = membuf.Set(idxKey, handleVal)
if err != nil {
return err
txn, ok := e.txn.(interface {
ChangeLockIntoPut(context.Context, kv.Key, []byte) bool
})
if ok {
for _, idxKey := range indexKeys {
handleVal := handleVals[string(idxKey)]
if len(handleVal) == 0 {
continue
}
txn.ChangeLockIntoPut(ctx, idxKey, handleVal)
}
}
}
Expand Down
9 changes: 5 additions & 4 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,11 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if !e.txn.Valid() {
return kv.ErrInvalidTxn
}
memBuffer := e.txn.GetMemBuffer()
err = memBuffer.Set(e.idxKey, e.handleVal)
if err != nil {
return err
txn, ok := e.txn.(interface {
ChangeLockIntoPut(context.Context, kv.Key, []byte) bool
})
if ok {
txn.ChangeLockIntoPut(ctx, e.idxKey, e.handleVal)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.1-0.20230117081319-35a262e90d9b
github.com/tikv/client-go/v2 v2.0.1-0.20230329072435-fc18f677df02
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,8 @@ github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df/go.mod h1:6Fq8o
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.1-0.20230117081319-35a262e90d9b h1:IUH/4BrP9BJm7to+XUJslcwaZONuIEwwClBnlrO7zJM=
github.com/tikv/client-go/v2 v2.0.1-0.20230117081319-35a262e90d9b/go.mod h1:VTlli8fRRpcpISj9I2IqroQmcAFfaTyBquiRhofOcDs=
github.com/tikv/client-go/v2 v2.0.1-0.20230329072435-fc18f677df02 h1:5dBj57AfcdDSU6uV2RzGZcZDYOHIi+6aUbwREkVggxg=
github.com/tikv/client-go/v2 v2.0.1-0.20230329072435-fc18f677df02/go.mod h1:VTlli8fRRpcpISj9I2IqroQmcAFfaTyBquiRhofOcDs=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 h1:jxgmKOscXSjaFEKQGRyY5qOpK8hLqxs2irb/uDJMtwk=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710/go.mod h1:AtvppPwkiyUgQlR1W9qSqfTB+OsOIu19jDCOxOsPkmU=
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
Expand Down
18 changes: 18 additions & 0 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,24 @@ func (txn *LazyTxn) LockKeysFunc(ctx context.Context, lockCtx *kv.LockCtx, fn fu
return txn.Transaction.LockKeysFunc(ctx, lockCtx, lockFunc, keys...)
}

// ChangeLockIntoPut tries to cache a locked key-value pair that might be converted to PUT on commit, returns true if
// the key-value pair has been cached.
func (txn *LazyTxn) ChangeLockIntoPut(ctx context.Context, key kv.Key, value []byte) bool {
if len(value) == 0 {
return false
}
cache, ok := txn.Transaction.(interface{ SetLockedKeyValue([]byte, []byte) })
if !ok {
return false
}
_, err := txn.GetMemBuffer().Get(ctx, key)
if !kv.IsErrNotFound(err) {
return false
}
cache.SetLockedKeyValue(key, value)
return true
}

func (txn *LazyTxn) reset() {
txn.cleanup()
txn.changeToInvalid()
Expand Down
34 changes: 34 additions & 0 deletions tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2913,6 +2913,40 @@ func TestChangeLockToPut(t *testing.T) {
tk.MustExec("admin check table t1")
}

func TestIssue28011(t *testing.T) {
store, clean := realtikvtest.CreateMockStoreAndSetup(t)
defer clean()

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

for _, tt := range []struct {
name string
lockQuery string
finalRows [][]interface{}
}{
{"Update", "update t set b = 'x' where a = 'a'", testkit.Rows("a x", "b y", "c z")},
{"BatchUpdate", "update t set b = 'x' where a in ('a', 'b', 'c')", testkit.Rows("a x", "b y", "c x")},
{"SelectForUpdate", "select a from t where a = 'a' for update", testkit.Rows("a x", "b y", "c z")},
{"BatchSelectForUpdate", "select a from t where a in ('a', 'b', 'c') for update", testkit.Rows("a x", "b y", "c z")},
} {
t.Run(tt.name, func(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a varchar(10) primary key nonclustered, b varchar(10))")
tk.MustExec("insert into t values ('a', 'x'), ('b', 'x'), ('c', 'z')")
tk.MustExec("begin pessimistic")
tk.MustExec(tt.lockQuery)
tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c"))
tk.MustExec("replace into t values ('b', 'y')")
tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c"))
tk.MustQuery("select a, b from t order by a").Check(tt.finalRows)
tk.MustExec("commit")
tk.MustQuery("select a, b from t order by a").Check(tt.finalRows)
tk.MustExec("admin check table t")
})
}
}

func createTable(part bool, columnNames []string, columnTypes []string) string {
var str string
str = "create table t("
Expand Down

0 comments on commit 483ffd9

Please sign in to comment.