From 5239c23ac89e0d24a1204cc4650bec75cfa168fd Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 19 May 2023 15:41:37 +0800 Subject: [PATCH] executor: revert #42285 and #42503 (#44000) --- executor/batch_point_get.go | 35 +++++++++ executor/insert_common.go | 12 --- executor/insert_test.go | 66 ---------------- executor/point_get.go | 12 +++ executor/replace.go | 4 - executor/write.go | 36 ++++----- .../pessimistictest/pessimistic_test.go | 75 ++++++++++++------- 7 files changed, 112 insertions(+), 128 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 965708073a097..ee9808700aaec 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -392,6 +392,23 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { if err != nil { return err } + // Change the unique index LOCK into PUT record. + if len(indexKeys) > 0 { + if !e.txn.Valid() { + return kv.ErrInvalidTxn + } + membuf := e.txn.GetMemBuffer() + for _, idxKey := range indexKeys { + handleVal := handleVals[string(idxKey)] + if len(handleVal) == 0 { + continue + } + err = membuf.Set(idxKey, handleVal) + if err != nil { + return err + } + } + } } // Fetch all values. values, err = batchGetter.BatchGet(ctx, keys) @@ -403,6 +420,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { if e.lock && rc { existKeys = make([]kv.Key, 0, 2*len(values)) } + changeLockToPutIdxKeys := make([]kv.Key, 0, len(indexKeys)) e.values = make([][]byte, 0, len(values)) for i, key := range keys { val := values[string(key)] @@ -437,6 +455,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { // lock primary key for clustered index table is redundant if len(indexKeys) != 0 { existKeys = append(existKeys, indexKeys[i]) + changeLockToPutIdxKeys = append(changeLockToPutIdxKeys, indexKeys[i]) } } } @@ -446,6 +465,22 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { if err != nil { return err } + if len(changeLockToPutIdxKeys) > 0 { + if !e.txn.Valid() { + return kv.ErrInvalidTxn + } + for _, idxKey := range changeLockToPutIdxKeys { + membuf := e.txn.GetMemBuffer() + handleVal := handleVals[string(idxKey)] + if len(handleVal) == 0 { + return kv.ErrNotExist + } + err = membuf.Set(idxKey, handleVal) + if err != nil { + return err + } + } + } } e.handles = handles return nil diff --git a/executor/insert_common.go b/executor/insert_common.go index 751f83c071eda..dbd4a5ae264cd 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -1166,10 +1166,6 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D } } else { e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr) - if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic { - // lock duplicated row key on insert-ignore - txnCtx.AddUnchangedRowKey(r.handleKey.newKey) - } continue } } else if !kv.IsErrNotFound(err) { @@ -1181,10 +1177,6 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D if err == nil { // If duplicate keys were found in BatchGet, mark row = nil. e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr) - if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic { - // lock duplicated unique key on insert-ignore - txnCtx.AddUnchangedRowKey(uk.newKey) - } skip = true break } @@ -1233,10 +1225,6 @@ func (e *InsertValues) removeRow(ctx context.Context, txn kv.Transaction, r toBe return err } if identical { - _, err := appendUnchangedRowForLock(e.ctx, r.t, handle, oldRow) - if err != nil { - return err - } return nil } diff --git a/executor/insert_test.go b/executor/insert_test.go index 293b888629846..b55c3a63765e3 100644 --- a/executor/insert_test.go +++ b/executor/insert_test.go @@ -1481,69 +1481,3 @@ func TestIssue32213(t *testing.T) { tk.MustQuery("select cast(test.t1.c1 as decimal(5, 3)) from test.t1").Check(testkit.Rows("99.999")) tk.MustQuery("select cast(test.t1.c1 as decimal(6, 3)) from test.t1").Check(testkit.Rows("100.000")) } - -func TestInsertLock(t *testing.T) { - store := testkit.CreateMockStore(t) - tk1 := testkit.NewTestKit(t, store) - tk2 := testkit.NewTestKit(t, store) - tk1.MustExec("use test") - tk2.MustExec("use test") - - for _, tt := range []struct { - name string - ddl string - dml string - }{ - { - "replace-pk", - "create table t (c int primary key clustered)", - "replace into t values (1)", - }, - { - "replace-uk", - "create table t (c int unique key)", - "replace into t values (1)", - }, - { - "insert-ingore-pk", - "create table t (c int primary key clustered)", - "insert ignore into t values (1)", - }, - { - "insert-ingore-uk", - "create table t (c int unique key)", - "insert ignore into t values (1)", - }, - { - "insert-update-pk", - "create table t (c int primary key clustered)", - "insert into t values (1) on duplicate key update c = values(c)", - }, - { - "insert-update-uk", - "create table t (c int unique key)", - "insert into t values (1) on duplicate key update c = values(c)", - }, - } { - t.Run(tt.name, func(t *testing.T) { - tk1.MustExec("drop table if exists t") - tk1.MustExec(tt.ddl) - tk1.MustExec("insert into t values (1)") - tk1.MustExec("begin") - tk1.MustExec(tt.dml) - done := make(chan struct{}) - go func() { - tk2.MustExec("delete from t") - done <- struct{}{} - }() - select { - case <-done: - require.Failf(t, "txn2 is not blocked by %q", tt.dml) - case <-time.After(100 * time.Millisecond): - } - tk1.MustExec("commit") - <-done - tk1.MustQuery("select * from t").Check([][]interface{}{}) - }) - } -} diff --git a/executor/point_get.go b/executor/point_get.go index bb958055dedd8..3e3cddb08d9ba 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -282,6 +282,18 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { return nil } + // Change the unique index LOCK into PUT record. + if e.lock { + if !e.txn.Valid() { + return kv.ErrInvalidTxn + } + memBuffer := e.txn.GetMemBuffer() + err = memBuffer.Set(e.idxKey, e.handleVal) + if err != nil { + return err + } + } + var iv kv.Handle iv, err = tablecodec.DecodeHandleInUniqueIndexValue(e.handleVal, e.tblInfo.IsCommonHandle) if err != nil { diff --git a/executor/replace.go b/executor/replace.go index c81b36d6e0abc..4093b0773db8d 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -86,10 +86,6 @@ func (e *ReplaceExec) removeRow(ctx context.Context, txn kv.Transaction, handle } if rowUnchanged { e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) - _, err := appendUnchangedRowForLock(e.ctx, r.t, handle, oldRow) - if err != nil { - return false, err - } return true, nil } diff --git a/executor/write.go b/executor/write.go index 6277cd1d09941..e32ff1770a77f 100644 --- a/executor/write.go +++ b/executor/write.go @@ -139,8 +139,22 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 { sc.AddAffectedRows(1) } - _, err := appendUnchangedRowForLock(sctx, t, h, oldData) - return false, err + + physicalID := t.Meta().ID + if pt, ok := t.(table.PartitionedTable); ok { + p, err := pt.GetPartitionByRow(sctx, oldData) + if err != nil { + return false, err + } + physicalID = p.GetPhysicalID() + } + + unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h) + txnCtx := sctx.GetSessionVars().TxnCtx + if txnCtx.IsPessimistic { + txnCtx.AddUnchangedRowKey(unchangedRowKey) + } + return false, nil } // Fill values into on-update-now fields, only if they are really changed. @@ -217,24 +231,6 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old return true, nil } -func appendUnchangedRowForLock(sctx sessionctx.Context, t table.Table, h kv.Handle, row []types.Datum) (bool, error) { - txnCtx := sctx.GetSessionVars().TxnCtx - if !txnCtx.IsPessimistic { - return false, nil - } - physicalID := t.Meta().ID - if pt, ok := t.(table.PartitionedTable); ok { - p, err := pt.GetPartitionByRow(sctx, row) - if err != nil { - return false, err - } - physicalID = p.GetPhysicalID() - } - unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h) - txnCtx.AddUnchangedRowKey(unchangedRowKey) - return true, nil -} - func rebaseAutoRandomValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newData *types.Datum, col *table.Column) error { tableInfo := t.Meta() if !tableInfo.ContainsAutoRandomBits() { diff --git a/tests/realtikvtest/pessimistictest/pessimistic_test.go b/tests/realtikvtest/pessimistictest/pessimistic_test.go index 1bb186817ce8a..ae7545e0e91f6 100644 --- a/tests/realtikvtest/pessimistictest/pessimistic_test.go +++ b/tests/realtikvtest/pessimistictest/pessimistic_test.go @@ -534,7 +534,7 @@ func TestOptimisticConflicts(t *testing.T) { tk.MustExec("begin pessimistic") // This SQL use BatchGet and cache data in the txn snapshot. // It can be changed to other SQLs that use BatchGet. - tk.MustExec("select * from conflict where id in (1, 2, 3)") + tk.MustExec("insert ignore into conflict values (1, 2)") tk2.MustExec("update conflict set c = c - 1") @@ -2816,37 +2816,60 @@ func TestAsyncCommitCalTSFail(t *testing.T) { tk2.MustExec("commit") } -func TestIssue28011(t *testing.T) { +func TestChangeLockToPut(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) tk.MustExec("use test") + tk2.MustExec("use test") - for _, tt := range []struct { - name string - lockQuery string - finalRows [][]interface{} - }{ - {"Update", "update t set b = 'x' where a = 'a'", testkit.Rows("a x", "b y", "c z")}, - {"BatchUpdate", "update t set b = 'x' where a in ('a', 'b', 'c')", testkit.Rows("a x", "b y", "c x")}, - {"SelectForUpdate", "select a from t where a = 'a' for update", testkit.Rows("a x", "b y", "c z")}, - {"BatchSelectForUpdate", "select a from t where a in ('a', 'b', 'c') for update", testkit.Rows("a x", "b y", "c z")}, - } { - t.Run(tt.name, func(t *testing.T) { - tk.MustExec("drop table if exists t") - tk.MustExec("create table t (a varchar(10) primary key nonclustered, b varchar(10))") - tk.MustExec("insert into t values ('a', 'x'), ('b', 'x'), ('c', 'z')") - tk.MustExec("begin") - tk.MustExec(tt.lockQuery) - tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c")) - tk.MustExec("replace into t values ('b', 'y')") - tk.MustQuery("select a from t").Check(testkit.Rows("a", "b", "c")) - tk.MustQuery("select a, b from t order by a").Check(tt.finalRows) - tk.MustExec("commit") - tk.MustQuery("select a, b from t order by a").Check(tt.finalRows) - tk.MustExec("admin check table t") - }) + tk.MustExec("drop table if exists tk") + tk.MustExec("create table t1(c1 varchar(20) key, c2 int, c3 int, unique key k1(c2), key k2(c3))") + tk.MustExec(`insert into t1 values ("1", 1, 1), ("2", 2, 2), ("3", 3, 3)`) + + // Test point get change lock to put. + for _, mode := range []string{"REPEATABLE-READ", "READ-COMMITTED"} { + tk.MustExec(fmt.Sprintf(`set tx_isolation = "%s"`, mode)) + tk.MustExec("begin pessimistic") + tk.MustQuery(`select * from t1 where c1 = "1" for update`).Check(testkit.Rows("1 1 1")) + tk.MustExec("commit") + tk.MustExec("begin pessimistic") + tk.MustQuery(`select * from t1 where c1 = "1" for update`).Check(testkit.Rows("1 1 1")) + tk.MustExec("commit") + tk.MustExec("admin check table t1") + tk2.MustExec("begin") + tk2.MustQuery(`select * from t1 use index(k1) where c2 = "1" for update`).Check(testkit.Rows("1 1 1")) + tk2.MustQuery(`select * from t1 use index(k1) where c2 = "3" for update`).Check(testkit.Rows("3 3 3")) + tk2.MustExec("commit") + tk2.MustExec("begin") + tk2.MustQuery(`select * from t1 use index(k2) where c3 = 1`).Check(testkit.Rows("1 1 1")) + tk2.MustQuery("select * from t1 use index(k2) where c3 > 1").Check(testkit.Rows("2 2 2", "3 3 3")) + tk2.MustExec("commit") } + + // Test batch point get change lock to put. + for _, mode := range []string{"REPEATABLE-READ", "READ-COMMITTED"} { + tk.MustExec(fmt.Sprintf(`set tx_isolation = "%s"`, mode)) + tk.MustExec("begin pessimistic") + tk.MustQuery(`select * from t1 where c1 in ("1", "5", "3") for update`).Check(testkit.Rows("1 1 1", "3 3 3")) + tk.MustExec("commit") + tk.MustExec("begin pessimistic") + tk.MustQuery(`select * from t1 where c1 in ("1", "2", "8") for update`).Check(testkit.Rows("1 1 1", "2 2 2")) + tk.MustExec("commit") + tk.MustExec("admin check table t1") + tk2.MustExec("begin") + tk2.MustQuery(`select * from t1 use index(k1) where c2 in ("1", "2", "3") for update`).Check(testkit.Rows("1 1 1", "2 2 2", "3 3 3")) + tk2.MustQuery(`select * from t1 use index(k2) where c2 in ("2") for update`).Check(testkit.Rows("2 2 2")) + tk2.MustExec("commit") + tk2.MustExec("begin") + tk2.MustQuery(`select * from t1 use index(k2) where c3 in (5, 8)`).Check(testkit.Rows()) + tk2.MustQuery(`select * from t1 use index(k2) where c3 in (1, 8) for update`).Check(testkit.Rows("1 1 1")) + tk2.MustQuery(`select * from t1 use index(k2) where c3 > 1`).Check(testkit.Rows("2 2 2", "3 3 3")) + tk2.MustExec("commit") + } + + tk.MustExec("admin check table t1") } func createTable(part bool, columnNames []string, columnTypes []string) string {