Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#42210
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
zyguan authored and ti-chi-bot committed Mar 15, 2023
1 parent 25555a8 commit 1e649c4
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 17 deletions.
128 changes: 128 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,22 +1087,76 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
if r.handleKey != nil {
_, err := txn.Get(ctx, r.handleKey.newKey)
if err == nil {
<<<<<<< HEAD
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
continue
}
if !kv.IsErrNotFound(err) {
=======
if replace {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey)
if err != nil {
return err
}
_, err2 := e.removeRow(ctx, txn, handle, r, false)
if err2 != nil {
return err2
}
} else {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(r.handleKey.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated row key on insert-ignore
txnCtx.AddUnchangedRowKey(r.handleKey.newKey)
}
continue
}
} else if !kv.IsErrNotFound(err) {
>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
return err
}
}
for _, uk := range r.uniqueKeys {
_, err := txn.Get(ctx, uk.newKey)
if err == nil {
<<<<<<< HEAD
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
skip = true
break
}
if !kv.IsErrNotFound(err) {
=======
if replace {
_, handle, err := tables.FetchDuplicatedHandle(
ctx,
uk.newKey,
true,
txn,
e.Table.Meta().ID,
uk.commonHandle,
)
if err != nil {
return err
}
if handle == nil {
continue
}
_, err = e.removeRow(ctx, txn, handle, r, true)
if err != nil {
return err
}
} else {
// If duplicate keys were found in BatchGet, mark row = nil.
e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr)
if txnCtx := e.ctx.GetSessionVars().TxnCtx; txnCtx.IsPessimistic {
// lock duplicated unique key on insert-ignore
txnCtx.AddUnchangedRowKey(uk.newKey)
}
skip = true
break
}
} else if !kv.IsErrNotFound(err) {
>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
return err
}
}
Expand All @@ -1124,6 +1178,80 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return nil
}

<<<<<<< HEAD
=======
// removeRow removes the duplicate row and cleanup its keys in the key-value map.
// But if the to-be-removed row equals to the to-be-added row, no remove or add
// things to do and return (true, nil).
func (e *InsertValues) removeRow(
ctx context.Context,
txn kv.Transaction,
handle kv.Handle,
r toBeCheckedRow,
inReplace bool,
) (bool, error) {
newRow := r.row
oldRow, err := getOldRow(ctx, e.ctx, txn, r.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when replace",
zap.String("handle", handle.String()),
zap.String("toBeInsertedRow", types.DatumsToStrNoErr(r.row)))
if kv.IsErrNotFound(err) {
err = errors.NotFoundf("can not be duplicated row, due to old row not found. handle %s", handle)
}
return false, err
}

identical, err := e.equalDatumsAsBinary(oldRow, newRow)
if err != nil {
return false, err
}
if identical {
if inReplace {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
}
_, err := appendUnchangedRowForLock(e.ctx, r.t, handle, oldRow)
if err != nil {
return false, err
}
return true, nil
}

err = r.t.RemoveRecord(e.ctx, handle, oldRow)
if err != nil {
return false, err
}
err = onRemoveRowForFK(e.ctx, oldRow, e.fkChecks, e.fkCascades)
if err != nil {
return false, err
}
if inReplace {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
} else {
e.ctx.GetSessionVars().StmtCtx.AddDeletedRows(1)
}

return false, nil
}

// equalDatumsAsBinary compare if a and b contains the same datum values in binary collation.
func (e *InsertValues) equalDatumsAsBinary(a []types.Datum, b []types.Datum) (bool, error) {
if len(a) != len(b) {
return false, nil
}
for i, ai := range a {
v, err := ai.Compare(e.ctx.GetSessionVars().StmtCtx, &b[i], collate.GetBinaryCollator())
if err != nil {
return false, errors.Trace(err)
}
if v != 0 {
return false, nil
}
}
return true, nil
}

>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) error {
return e.addRecordWithAutoIDHint(ctx, row, 0)
}
Expand Down
103 changes: 103 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,3 +1852,106 @@ func (s *testSuite13) TestReplaceAllocatingAutoID(c *C) {
// Note that this error is different from MySQL's duplicated primary key error.
tk.MustGetErrCode("REPLACE INTO t1 VALUES (0,'newmaxvalue');", errno.ErrAutoincReadFailed)
}
<<<<<<< HEAD
=======

func TestInsertIntoSelectError(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("DROP TABLE IF EXISTS t1;")
tk.MustExec("CREATE TABLE t1(a INT) ENGINE = InnoDB;")
tk.MustExec("INSERT IGNORE into t1(SELECT SLEEP(NULL));")
tk.MustQuery("SHOW WARNINGS;").Check(testkit.Rows("Warning 1210 Incorrect arguments to sleep"))
tk.MustExec("INSERT IGNORE into t1(SELECT SLEEP(-1));")
tk.MustQuery("SHOW WARNINGS;").Check(testkit.Rows("Warning 1210 Incorrect arguments to sleep"))
tk.MustExec("INSERT IGNORE into t1(SELECT SLEEP(1));")
tk.MustQuery("SELECT * FROM t1;").Check(testkit.Rows("0", "0", "0"))
tk.MustExec("DROP TABLE t1;")
}

// https://github.com/pingcap/tidb/issues/32213.
func TestIssue32213(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test`)

tk.MustExec("create table test.t1(c1 float)")
tk.MustExec("insert into test.t1 values(999.99)")
tk.MustQuery("select cast(test.t1.c1 as decimal(4, 1)) from test.t1").Check(testkit.Rows("999.9"))
tk.MustQuery("select cast(test.t1.c1 as decimal(5, 1)) from test.t1").Check(testkit.Rows("1000.0"))

tk.MustExec("drop table if exists test.t1")
tk.MustExec("create table test.t1(c1 decimal(6, 4))")
tk.MustExec("insert into test.t1 values(99.9999)")
tk.MustQuery("select cast(test.t1.c1 as decimal(5, 3)) from test.t1").Check(testkit.Rows("99.999"))
tk.MustQuery("select cast(test.t1.c1 as decimal(6, 3)) from test.t1").Check(testkit.Rows("100.000"))
}

func TestInsertLock(t *testing.T) {
store := testkit.CreateMockStore(t)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")

for _, tt := range []struct {
name string
ddl string
dml string
}{
{
"replace-pk",
"create table t (c int primary key clustered)",
"replace into t values (1)",
},
{
"replace-uk",
"create table t (c int unique key)",
"replace into t values (1)",
},
{
"insert-ingore-pk",
"create table t (c int primary key clustered)",
"insert ignore into t values (1)",
},
{
"insert-ingore-uk",
"create table t (c int unique key)",
"insert ignore into t values (1)",
},
{
"insert-update-pk",
"create table t (c int primary key clustered)",
"insert into t values (1) on duplicate key update c = values(c)",
},
{
"insert-update-uk",
"create table t (c int unique key)",
"insert into t values (1) on duplicate key update c = values(c)",
},
} {
t.Run(tt.name, func(t *testing.T) {
tk1.MustExec("drop table if exists t")
tk1.MustExec(tt.ddl)
tk1.MustExec("insert into t values (1)")
tk1.MustExec("begin")
tk1.MustExec(tt.dml)
done := make(chan struct{})
go func() {
tk2.MustExec("delete from t")
done <- struct{}{}
}()
select {
case <-done:
require.Failf(t, "txn2 is not blocked by %q", tt.dml)
case <-time.After(100 * time.Millisecond):
}
tk1.MustExec("commit")
<-done
tk1.MustQuery("select * from t").Check([][]interface{}{})
})
}
}
>>>>>>> 24cd54c7462 (executor: lock duplicated keys on insert-ignore & replace-nothing (#42210))
36 changes: 20 additions & 16 deletions executor/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,8 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
if sctx.GetSessionVars().ClientCapability&mysql.ClientFoundRows > 0 {
sc.AddAffectedRows(1)
}

physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, oldData)
if err != nil {
return false, err
}
physicalID = p.GetPhysicalID()
}

unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.IsPessimistic {
txnCtx.AddUnchangedRowKey(unchangedRowKey)
}
return false, nil
_, err := appendUnchangedRowForLock(sctx, t, h, oldData)
return false, err
}

// 4. Fill values into on-update-now fields, only if they are really changed.
Expand Down Expand Up @@ -222,6 +208,24 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, old
return true, nil
}

func appendUnchangedRowForLock(sctx sessionctx.Context, t table.Table, h kv.Handle, row []types.Datum) (bool, error) {
txnCtx := sctx.GetSessionVars().TxnCtx
if !txnCtx.IsPessimistic {
return false, nil
}
physicalID := t.Meta().ID
if pt, ok := t.(table.PartitionedTable); ok {
p, err := pt.GetPartitionByRow(sctx, row)
if err != nil {
return false, err
}
physicalID = p.GetPhysicalID()
}
unchangedRowKey := tablecodec.EncodeRowKeyWithHandle(physicalID, h)
txnCtx.AddUnchangedRowKey(unchangedRowKey)
return true, nil
}

func rebaseAutoRandomValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newData *types.Datum, col *table.Column) error {
tableInfo := t.Meta()
if !tableInfo.ContainsAutoRandomBits() {
Expand Down
2 changes: 1 addition & 1 deletion session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func (s *testPessimisticSuite) TestOptimisticConflicts(c *C) {
tk.MustExec("begin pessimistic")
// This SQL use BatchGet and cache data in the txn snapshot.
// It can be changed to other SQLs that use BatchGet.
tk.MustExec("insert ignore into conflict values (1, 2)")
tk.MustExec("select * from conflict where id in (1, 2, 3)")

tk2.MustExec("update conflict set c = c - 1")

Expand Down

0 comments on commit 1e649c4

Please sign in to comment.