From fe7c3316fb0be8228bc1ac5ae3ac9a507ae5c1c8 Mon Sep 17 00:00:00 2001 From: tangenta Date: Fri, 10 Feb 2023 15:22:00 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #40749 Signed-off-by: ti-chi-bot --- ddl/index.go | 30 ++ ddl/index_merge_tmp.go | 72 +++-- ddl/index_merge_tmp_test.go | 334 ++++++++++++++++++++ ddl/indexmergetest/BUILD.bazel | 33 ++ executor/insert.go | 30 +- executor/replace.go | 17 +- table/tables/index.go | 438 ++++++++++++++++++++++---- table/tables/mutation_checker.go | 34 +- table/tables/mutation_checker_test.go | 4 +- tablecodec/tablecodec.go | 245 ++++++++++---- tablecodec/tablecodec_test.go | 102 ++++-- 11 files changed, 1122 insertions(+), 217 deletions(-) create mode 100644 ddl/indexmergetest/BUILD.bazel diff --git a/ddl/index.go b/ddl/index.go index e69e1962c112b..4a0bca7bbc46c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -890,8 +890,32 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} +<<<<<<< HEAD rh := newReorgHandler(t, w.sess, w.concurrentDDL) reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, elements, mergingTmpIdx) +======= + + failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) && indexInfo.BackfillState == model.BackfillStateMerging && + MockDMLExecutionStateMerging != nil { + MockDMLExecutionStateMerging() + } + }) + + sctx, err1 := w.sessPool.get() + if err1 != nil { + err = err1 + return + } + defer w.sessPool.put(sctx) + rh := newReorgHandler(newSession(sctx)) + dbInfo, err := t.GetDatabase(job.SchemaID) + if err != nil { + return false, ver, errors.Trace(err) + } + reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx) +>>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. @@ -1627,6 +1651,12 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC // MockDMLExecution is only used for test. var MockDMLExecution func() +// MockDMLExecutionMerging is only used for test. +var MockDMLExecutionMerging func() + +// MockDMLExecutionStateMerging is only used for test. +var MockDMLExecutionStateMerging func() + func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 2cf3b6d1d106d..b7fca70b8ed34 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" @@ -199,6 +200,12 @@ func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskC return nil }) + failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) && MockDMLExecutionMerging != nil { + MockDMLExecutionMerging() + } + }) logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000) return } @@ -233,40 +240,49 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor return false, nil } - originVal, handle, isDelete, unique, keyVer := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle) - if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete { - // For 'm' version kvs, they are double-written. - // For 'd' version kvs, they are written in the delete-only state and can be dropped safely. - return true, nil + tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle) + if err != nil { + return false, err } + tempIdxVal = tempIdxVal.FilterOverwritten() + + // Extract the operations on the original index and replay them later. + for _, elem := range tempIdxVal { + if elem.KeyVer == tables.TempIndexKeyTypeMerge || elem.KeyVer == tables.TempIndexKeyTypeDelete { + // For 'm' version kvs, they are double-written. + // For 'd' version kvs, they are written in the delete-only state and can be dropped safely. + continue + } - if handle == nil { - // If the handle is not found in the value of the temp index, it means - // 1) This is not a deletion marker, the handle is in the key or the origin value. - // 2) This is a deletion marker, but the handle is in the key of temp index. - handle, err = tablecodec.DecodeIndexHandle(indexKey, originVal, len(w.index.Meta().Columns)) - if err != nil { - return false, err + if elem.Handle == nil { + // If the handle is not found in the value of the temp index, it means + // 1) This is not a deletion marker, the handle is in the key or the origin value. + // 2) This is a deletion marker, but the handle is in the key of temp index. + elem.Handle, err = tablecodec.DecodeIndexHandle(indexKey, elem.Value, len(w.index.Meta().Columns)) + if err != nil { + return false, err + } } - } - originIdxKey := make([]byte, len(indexKey)) - copy(originIdxKey, indexKey) - tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey) + originIdxKey := make([]byte, len(indexKey)) + copy(originIdxKey, indexKey) + tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey) - idxRecord := &temporaryIndexRecord{ - handle: handle, - delete: isDelete, - unique: unique, - skip: false, - } - if !isDelete { - idxRecord.vals = originVal - idxRecord.distinct = tablecodec.IndexKVIsUnique(originVal) + idxRecord := &temporaryIndexRecord{ + handle: elem.Handle, + delete: elem.Delete, + unique: elem.Distinct, + skip: false, + } + if !elem.Delete { + idxRecord.vals = elem.Value + idxRecord.distinct = tablecodec.IndexKVIsUnique(elem.Value) + } + w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord) + w.originIdxKeys = append(w.originIdxKeys, originIdxKey) + w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey) } - w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord) - w.originIdxKeys = append(w.originIdxKeys, originIdxKey) - w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey) + lastKey = indexKey return true, nil }) diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 563b8614d4311..26c355282c608 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -21,7 +21,13 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/ingest" +<<<<<<< HEAD:ddl/index_merge_tmp_test.go "github.com/pingcap/tidb/domain" +======= + "github.com/pingcap/tidb/ddl/internal/callback" + "github.com/pingcap/tidb/ddl/testutil" + "github.com/pingcap/tidb/errno" +>>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)):ddl/indexmergetest/merge_test.go "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/tablecodec" @@ -541,3 +547,331 @@ func TestAddIndexMergeConflictWithPessimistic(t *testing.T) { tk.MustExec("admin check table t;") tk.MustQuery("select * from t;").Check(testkit.Rows("1 2")) } + +func TestAddIndexMergeInsertOnMerging(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0)") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + var err error + switch job.SchemaState { + case model.StateDeleteOnly: + _, err = tk1.Exec("insert into t values (5, 5)") + assert.NoError(t, err) + case model.StateWriteOnly: + _, err = tk1.Exec("insert into t values (5, 7)") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where b = 7") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecutionStateMerging = func() { + _, err := tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) // [kv:1062]Duplicate entry '5' for key 't.idx' + _, err = tk1.Exec("insert into t values (5, 8) on duplicate key update a = 6;") + assert.NoError(t, err) // The row should be normally updated to (6, 5). + ddl.MockDMLExecutionStateMerging = nil + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "return(true)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("6 5")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging")) +} + +func TestAddIndexMergeReplaceOnMerging(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0);") + tk.MustExec("insert into t values (5, 5);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + ddl.MockDMLExecution = func() { + _, err := tk1.Exec("delete from t where b = 5;") + assert.NoError(t, err) + } + + ddl.MockDMLExecutionStateMerging = func() { + _, err := tk1.Exec("replace into t values (5, 8);") + assert.NoError(t, err) + ddl.MockDMLExecutionStateMerging = nil + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging", "return(true)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionStateMerging")) +} + +func TestAddIndexMergeInsertToDeletedTempIndex(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int default 0, b int default 0)") + tk.MustExec("insert into t values (5, 5);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + var err error + switch job.SchemaState { + case model.StateWriteOnly: + _, err = tk1.Exec("delete from t where b = 5") + assert.NoError(t, err) + _, err := tk1.Exec("set @@tidb_constraint_check_in_place = true;") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) + _, err = tk1.Exec("set @@tidb_constraint_check_in_place = false;") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (5, 8);") + assert.Error(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("5 8")) +} + +func TestAddIndexMergeReplaceDelete(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, a int default 0);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + switch job.SchemaState { + case model.StateDeleteOnly: + _, err := tk1.Exec("insert into t values (1, 1);") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecutionMerging = func() { + _, err := tk1.Exec("replace into t values (2, 1);") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where id = 2;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows()) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecutionMerging")) +} + +func TestAddIndexMergeDeleteDifferentHandle(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, c char(10));") + tk.MustExec("insert into t values (1, 'a');") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runDML := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runDML { + return + } + if job.SnapshotVer == 0 { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert into t values (2, 'a');") + assert.NoError(t, err) + _, err = tk1.Exec("replace into t values (3, 'a');") + assert.NoError(t, err) + runDML = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecution = func() { + // It is too late to remove the duplicated index value. + _, err := tk1.Exec("delete from t where id = 1;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + tk.MustGetErrCode("alter table t add unique index idx(c);", errno.ErrDupEntry) + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("3 a")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) +} + +func TestAddIndexDecodeTempIndexCommonHandle(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id_a bigint, id_b char(20), c char(20), primary key (id_a, id_b));") + tk.MustExec("insert into t values (1, 'id_1', 'char_1');") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runDML := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runDML { + return + } + if job.SnapshotVer == 0 { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert into t values (2, 'id_2', 'char_2');") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (3, 'id_3', 'char_3');") + assert.NoError(t, err) + runDML = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(c);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 id_1 char_1", "2 id_2 char_2", "3 id_3 char_3")) +} + +func TestAddIndexInsertIgnoreOnBackfill(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, b int);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + runDML := false + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() || runDML { + return + } + switch job.SchemaState { + case model.StateWriteReorganization: + _, err := tk1.Exec("insert ignore into t values (1, 1);") + assert.NoError(t, err) + _, err = tk1.Exec("insert ignore into t values (2, 2);") + assert.NoError(t, err) + _, err = tk1.Exec("update t set b = null where id = 1;") + assert.NoError(t, err) + runDML = true + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + tk.MustExec("alter table t add unique index idx(b);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows("1 ", "2 2")) +} + +func TestAddIndexMultipleDelete(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, b int);") + tk.MustExec("insert into t values (1, 1), (2, 1), (3, 1), (4, 1), (5, 1), (6, 1);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + switch job.SchemaState { + case model.StateDeleteOnly: + _, err := tk1.Exec("delete from t where id in (4, 5, 6);") + assert.NoError(t, err) + case model.StateWriteOnly: + _, err := tk1.Exec("delete from t where id in (2, 3);") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecution = func() { + _, err := tk1.Exec("delete from t where id = 1;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(b);") + tk.MustExec("admin check table t;") + tk.MustQuery("select * from t;").Check(testkit.Rows()) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) +} diff --git a/ddl/indexmergetest/BUILD.bazel b/ddl/indexmergetest/BUILD.bazel new file mode 100644 index 0000000000000..b70146ae8d461 --- /dev/null +++ b/ddl/indexmergetest/BUILD.bazel @@ -0,0 +1,33 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "indexmergetest_test", + timeout = "moderate", + srcs = [ + "main_test.go", + "merge_test.go", + ], + flaky = True, + race = "on", + shard_count = 4, + deps = [ + "//config", + "//ddl", + "//ddl/ingest", + "//ddl/internal/callback", + "//ddl/testutil", + "//domain", + "//errno", + "//kv", + "//meta/autoid", + "//parser/model", + "//tablecodec", + "//testkit", + "//testkit/testsetup", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//tikv", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/executor/insert.go b/executor/insert.go index d0b09e302860c..b5336b3f1fcd8 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -158,11 +159,11 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t for _, r := range rows { for _, uk := range r.uniqueKeys { if val, found := values[string(uk.newKey)]; found { - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) + if isTemp, _ := tablecodec.CheckTempIndexKey(uk.newKey); isTemp { + // If it is a temp index, the value cannot be decoded by DecodeHandleInUniqueIndexValue. + // Since this function is an optimization, we can skip prefetching the rows referenced by + // temp indexes. + continue } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) if err != nil { @@ -261,26 +262,13 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D } for _, uk := range r.uniqueKeys { - val, err := txn.Get(ctx, uk.newKey) + _, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle) if err != nil { - if kv.IsErrNotFound(err) { - continue - } return err } - // Since the temp index stores deleted key with marked 'deleteu' for unique key at the end - // of value, So if return a key we check and skip deleted key. - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) - } - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) - if err != nil { - return err + if handle == nil { + continue } - err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate) if err != nil { if kv.IsErrNotFound(err) { diff --git a/executor/replace.go b/executor/replace.go index 158a620fb300e..4093b0773db8d 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -172,22 +173,12 @@ func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error { // 3. error: the error. func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r toBeCheckedRow) (bool, bool, error) { for _, uk := range r.uniqueKeys { - val, err := txn.Get(ctx, uk.newKey) + _, handle, err := tables.FetchDuplicatedHandle(ctx, uk.newKey, true, txn, e.Table.Meta().ID, uk.commonHandle) if err != nil { - if kv.IsErrNotFound(err) { - continue - } return false, false, err } - if tablecodec.IsTempIndexKey(uk.newKey) { - if tablecodec.CheckTempIndexValueIsDelete(val) { - continue - } - val = tablecodec.DecodeTempIndexOriginValue(val) - } - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(val, uk.commonHandle) - if err != nil { - return false, true, err + if handle == nil { + continue } rowUnchanged, err := e.removeRow(ctx, txn, handle, r) if err != nil { diff --git a/table/tables/index.go b/table/tables/index.go index 446a2e7288595..4e27da351dff3 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -16,7 +16,6 @@ package tables import ( "context" - "errors" "sync" "github.com/opentracing/opentracing-go" @@ -184,6 +183,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if err != nil { return nil, err } +<<<<<<< HEAD if len(tempKey) > 0 { if !opt.Untouched { // Untouched key-values never occur in the storage. idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) @@ -195,6 +195,159 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } if !opt.IgnoreAssertion && (!opt.Untouched) { if sctx.GetSessionVars().LazyCheckKeyNotExists() && !txn.IsPessimistic() { +======= + + var ( + tempKey []byte + keyVer byte + keyIsTempIdxKey bool + ) + if !opt.FromBackFill { + key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) + if keyVer == TempIndexKeyTypeBackfill || keyVer == TempIndexKeyTypeDelete { + key, tempKey = tempKey, nil + keyIsTempIdxKey = true + } + } + + if opt.Untouched { + txn, err1 := sctx.Txn(true) + if err1 != nil { + return nil, err1 + } + // If the index kv was untouched(unchanged), and the key/value already exists in mem-buffer, + // should not overwrite the key with un-commit flag. + // So if the key exists, just do nothing and return. + v, err := txn.GetMemBuffer().Get(ctx, key) + if err == nil { + if len(v) != 0 { + continue + } + // The key is marked as deleted in the memory buffer, as the existence check is done lazily + // for optimistic transactions by default. The "untouched" key could still exist in the store, + // it's needed to commit this key to do the existence check so unset the untouched flag. + if !txn.IsPessimistic() { + keyFlags, err := txn.GetMemBuffer().GetFlags(key) + if err != nil { + return nil, err + } + if keyFlags.HasPresumeKeyNotExists() { + opt.Untouched = false + } + } + } + } + + // save the key buffer to reuse. + writeBufs.IndexKeyBuf = key + c.initNeedRestoreData.Do(func() { + c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns) + }) + idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx, c.tblInfo, c.idxInfo, c.needRestoredData, distinct, opt.Untouched, value, h, c.phyTblID, handleRestoreData) + if err != nil { + return nil, err + } + + opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic + + if !distinct || skipCheck || opt.Untouched { + val := idxVal + if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage. + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} + val = tempVal.Encode(nil) + } + err = txn.GetMemBuffer().Set(key, val) + if err != nil { + return nil, err + } + if len(tempKey) > 0 { + if !opt.Untouched { // Untouched key-values never occur in the storage. + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} + val = tempVal.Encode(nil) + } + err = txn.GetMemBuffer().Set(tempKey, val) + if err != nil { + return nil, err + } + } + if !opt.IgnoreAssertion && (!opt.Untouched) { + if sctx.GetSessionVars().LazyCheckKeyNotExists() && !txn.IsPessimistic() { + err = txn.SetAssertion(key, kv.SetAssertUnknown) + } else { + err = txn.SetAssertion(key, kv.SetAssertNotExist) + } + } + if err != nil { + return nil, err + } + continue + } + + var value []byte + if c.tblInfo.TempTableType != model.TempTableNone { + // Always check key for temporary table because it does not write to TiKV + value, err = txn.Get(ctx, key) + } else if sctx.GetSessionVars().LazyCheckKeyNotExists() { + value, err = txn.GetMemBuffer().Get(ctx, key) + } else { + value, err = txn.Get(ctx, key) + } + if err != nil && !kv.IsErrNotFound(err) { + return nil, err + } + var tempIdxVal tablecodec.TempIndexValue + if len(value) > 0 && keyIsTempIdxKey { + tempIdxVal, err = tablecodec.DecodeTempIndexValue(value, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err + } + } + // The index key value is not found or deleted. + if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) { + val := idxVal + lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil + if keyIsTempIdxKey { + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} + val = tempVal.Encode(value) + } + needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, h, + keyIsTempIdxKey, c.tblInfo.IsCommonHandle, c.tblInfo.ID) + if err != nil { + return nil, err + } + if lazyCheck { + var flags []kv.FlagsOp + if needPresumeNotExists { + flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} + } + if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && + !vars.InRestrictedSQL && vars.ConnectionID > 0 { + flags = append(flags, kv.SetNeedConstraintCheckInPrewrite) + } + err = txn.GetMemBuffer().SetWithFlags(key, val, flags...) + } else { + err = txn.GetMemBuffer().Set(key, val) + } + if err != nil { + return nil, err + } + if len(tempKey) > 0 { + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} + val = tempVal.Encode(value) + if lazyCheck && needPresumeNotExists { + err = txn.GetMemBuffer().SetWithFlags(tempKey, val, kv.SetPresumeKeyNotExists) + } else { + err = txn.GetMemBuffer().Set(tempKey, val) + } + if err != nil { + return nil, err + } + } + if opt.IgnoreAssertion { + continue + } + if lazyCheck && !txn.IsPessimistic() { +>>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) err = txn.SetAssertion(key, kv.SetAssertUnknown) } else { err = txn.SetAssertion(key, kv.SetAssertNotExist) @@ -242,6 +395,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } } } +<<<<<<< HEAD if lazyCheck { var flags []kv.FlagsOp if needPresumeKey != KeyInTempIndexIsDeleted { @@ -254,6 +408,11 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue err = txn.GetMemBuffer().SetWithFlags(key, idxVal, flags...) } else { err = txn.GetMemBuffer().Set(key, idxVal) +======= + + if keyIsTempIdxKey && !tempIdxVal.IsEmpty() { + value = tempIdxVal.Current().Value +>>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) } if err != nil { return nil, err @@ -290,9 +449,118 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue return handle, kv.ErrKeyExists } +func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, tempKey kv.Key, + h kv.Handle, keyIsTempIdxKey bool, isCommon bool, tblID int64) (needFlag bool, err error) { + var uniqueTempKey kv.Key + if keyIsTempIdxKey { + uniqueTempKey = key + } else if len(tempKey) > 0 { + uniqueTempKey = tempKey + } else { + return true, nil + } + foundKey, dupHandle, err := FetchDuplicatedHandle(ctx, uniqueTempKey, true, txn, tblID, isCommon) + if err != nil { + return false, err + } + if foundKey && dupHandle != nil && !dupHandle.Equal(h) { + return false, kv.ErrKeyExists + } + return false, nil +} + // Delete removes the entry for handle h and indexedValues from KV index. +<<<<<<< HEAD func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error { key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil) +======= +func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) error { + indexedValues := c.getIndexedValue(indexedValue) + for _, value := range indexedValues { + key, distinct, err := c.GenIndexKey(sc, value, h, nil) + if err != nil { + return err + } + + key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key) + var originTempVal []byte + if len(tempKey) > 0 && c.idxInfo.Unique { + // Get the origin value of the unique temporary index key. + // Append the new delete operations to the end of the origin value. + originTempVal, err = getKeyInTxn(context.TODO(), txn, tempKey) + if err != nil { + return err + } + } + tempValElem := tablecodec.TempIndexValueElem{Handle: h, KeyVer: tempKeyVer, Delete: true, Distinct: distinct} + + if distinct { + if len(key) > 0 { + err = txn.GetMemBuffer().DeleteWithFlags(key, kv.SetNeedLocked) + if err != nil { + return err + } + } + if len(tempKey) > 0 { + // Append to the end of the origin value for distinct value. + tempVal := tempValElem.Encode(originTempVal) + err = txn.GetMemBuffer().Set(tempKey, tempVal) + if err != nil { + return err + } + } + } else { + if len(key) > 0 { + err = txn.GetMemBuffer().Delete(key) + if err != nil { + return err + } + } + if len(tempKey) > 0 { + tempVal := tempValElem.Encode(nil) + err = txn.GetMemBuffer().Set(tempKey, tempVal) + if err != nil { + return err + } + } + } + if c.idxInfo.State == model.StatePublic { + // If the index is in public state, delete this index means it must exists. + err = txn.SetAssertion(key, kv.SetAssertExist) + } + if err != nil { + return err + } + } + return nil +} + +func (c *index) GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) table.IndexIter { + indexedValues := c.getIndexedValue(indexedValue) + return &indexGenerator{ + c: c, + sctx: sc, + indexedVals: indexedValues, + h: h, + handleRestoreData: handleRestoreData, + i: 0, + } +} + +type indexGenerator struct { + c *index + sctx *stmtctx.StatementContext + indexedVals [][]types.Datum + h kv.Handle + handleRestoreData []types.Datum + + i int +} + +func (s *indexGenerator) Next(kb []byte) ([]byte, []byte, bool, error) { + val := s.indexedVals[s.i] + key, distinct, err := s.c.GenIndexKey(s.sctx, val, s.h, kb) +>>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) if err != nil { return err } @@ -389,6 +657,7 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV if err1 != nil { return false, nil, err } +<<<<<<< HEAD switch KeyExistInfo { case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted: return false, nil, nil @@ -396,8 +665,67 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV return true, h1, kv.ErrKeyExists case KeyInTempIndexIsItself: return true, h, nil +======= + // If index current is in creating status and using ingest mode, we need first + // check key exist status in temp index. + key, tempKey, _ := GenTempIdxKeyByState(c.idxInfo, key) + if len(tempKey) > 0 { + key = tempKey } + foundKey, dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, distinct, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle) + if err != nil || !foundKey { + return false, nil, err + } + if dupHandle != nil && !dupHandle.Equal(h) { + return false, nil, err + } + continue + } + return true, h, nil +} + +// FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key. +func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, + txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { + if isTemp, originIdxID := tablecodec.CheckTempIndexKey(key); isTemp { + return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID, originIdxID, isCommon) + } + // The index key is not from temp index. + val, err := getKeyInTxn(ctx, txn, key) + if err != nil || len(val) == 0 { + return false, nil, err + } + if distinct { + h, err := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommon) + return true, h, err + } + return true, nil, nil +} + +func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, distinct bool, + txn kv.Transaction, tableID, idxID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { + tempRawVal, err := getKeyInTxn(ctx, txn, tempKey) + if err != nil { + return false, nil, err + } + if tempRawVal == nil { + originKey := tempKey.Clone() + tablecodec.TempIndexKey2IndexKey(idxID, originKey) + originVal, err := getKeyInTxn(ctx, txn, originKey) + if err != nil || originVal == nil { + return false, nil, err + } + if distinct { + originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) + if err != nil { + return false, nil, err + } + return true, originHandle, err +>>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) + } + return false, nil, nil } +<<<<<<< HEAD value, err := txn.Get(context.TODO(), key) if kv.IsErrNotFound(err) { @@ -421,6 +749,60 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV } return true, h, nil +======= + tempVal, err := tablecodec.DecodeTempIndexValue(tempRawVal, isCommon) + if err != nil { + return false, nil, err + } + curElem := tempVal.Current() + if curElem.Delete { + originKey := tempKey.Clone() + tablecodec.TempIndexKey2IndexKey(idxID, originKey) + originVal, err := getKeyInTxn(ctx, txn, originKey) + if err != nil || originVal == nil { + return false, nil, err + } + if distinct { + originHandle, err := tablecodec.DecodeHandleInUniqueIndexValue(originVal, isCommon) + if err != nil { + return false, nil, err + } + if originHandle.Equal(curElem.Handle) { + // The key has been deleted. This is not a duplicated key. + return false, nil, nil + } + // The inequality means multiple modifications happened in the same key. + // We use the handle in origin index value to check if the row exists. + recPrefix := tablecodec.GenTableRecordPrefix(tableID) + rowKey := tablecodec.EncodeRecordKey(recPrefix, originHandle) + rowVal, err := getKeyInTxn(ctx, txn, rowKey) + if err != nil || rowVal == nil { + return false, nil, err + } + // The row exists. This is the duplicated key. + return true, originHandle, nil + } + return false, nil, nil + } + // The value in temp index is not the delete marker. + if distinct { + h, err := tablecodec.DecodeHandleInUniqueIndexValue(curElem.Value, isCommon) + return true, h, err + } + return true, nil, nil +} + +// getKeyInTxn gets the value of the key in the transaction, and ignore the ErrNotExist error. +func getKeyInTxn(ctx context.Context, txn kv.Transaction, key kv.Key) ([]byte, error) { + val, err := txn.Get(ctx, key) + if err != nil { + if kv.IsErrNotFound(err) { + return nil, nil + } + return nil, err + } + return val, nil +>>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) } func (c *index) FetchValues(r []types.Datum, vals []types.Datum) ([]types.Datum, error) { @@ -498,57 +880,3 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * } return colInfo } - -// TempIndexKeyState is the state of the temporary index key. -type TempIndexKeyState byte - -const ( - // KeyInTempIndexUnknown whether the key exists or not in temp index is unknown. - KeyInTempIndexUnknown TempIndexKeyState = iota - // KeyInTempIndexNotExist the key is not exist in temp index. - KeyInTempIndexNotExist - // KeyInTempIndexIsDeleted the key is marked deleted in temp index. - KeyInTempIndexIsDeleted - // KeyInTempIndexIsItself the key is correlated to itself in temp index. - KeyInTempIndexIsItself - // KeyInTempIndexConflict the key is conflict in temp index. - KeyInTempIndexConflict -) - -// KeyExistInTempIndex is used to check the unique key exist status in temp index. -func KeyExistInTempIndex(ctx context.Context, txn kv.Transaction, key kv.Key, distinct bool, h kv.Handle, IsCommonHandle bool) (TempIndexKeyState, kv.Handle, error) { - // Only check temp index key. - if !tablecodec.IsTempIndexKey(key) { - return KeyInTempIndexUnknown, nil, nil - } - value, err := txn.Get(ctx, key) - if kv.IsErrNotFound(err) { - return KeyInTempIndexNotExist, nil, nil - } - if err != nil { - return KeyInTempIndexUnknown, nil, err - } - - // Since KeyExistInTempIndex only accept temp index key, so the value length should great than 1 for key version. - if len(value) < 1 { - return KeyInTempIndexUnknown, nil, errors.New("temp index value length should great than 1") - } - - if tablecodec.CheckTempIndexValueIsDelete(value) { - return KeyInTempIndexIsDeleted, nil, nil - } - - // Check if handle equal. - var handle kv.Handle - if distinct { - originVal := tablecodec.DecodeTempIndexOriginValue(value) - handle, err = tablecodec.DecodeHandleInUniqueIndexValue(originVal, IsCommonHandle) - if err != nil { - return KeyInTempIndexUnknown, nil, err - } - if !handle.Equal(h) { - return KeyInTempIndexConflict, handle, kv.ErrKeyExists - } - } - return KeyInTempIndexIsItself, handle, nil -} diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index e4513b8cae409..2229bfbb9d138 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -106,7 +106,7 @@ func CheckDataConsistency( // } if rowInsertion.key != nil { - if err = checkHandleConsistency(rowInsertion, indexMutations, columnMaps.IndexIDToInfo, t.Meta().Name.O); err != nil { + if err = checkHandleConsistency(rowInsertion, indexMutations, columnMaps.IndexIDToInfo, t.Meta()); err != nil { return errors.Trace(err) } } @@ -123,7 +123,7 @@ func CheckDataConsistency( // in row insertions and index insertions are consistent. // A PUT_index implies a PUT_row with the same handle. // Deletions are not checked since the values of deletions are unknown -func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, tableName string) error { +func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, indexIDToInfo map[int64]*model.IndexInfo, tblInfo *model.TableInfo) error { var insertionHandle kv.Handle var err error @@ -154,7 +154,18 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in indexHandle kv.Handle ) if idxID != m.indexID { - value = tablecodec.DecodeTempIndexOriginValue(m.value) + if tablecodec.TempIndexValueIsUntouched(m.value) { + // We never commit the untouched key values to the storage. Skip this check. + continue + } + var tempIdxVal tablecodec.TempIndexValue + tempIdxVal, err = tablecodec.DecodeTempIndexValue(m.value, tblInfo.IsCommonHandle) + if err != nil { + return err + } + if !tempIdxVal.IsEmpty() { + value = tempIdxVal.Current().Value + } if len(value) == 0 { // Skip the deleted operation values. continue @@ -170,7 +181,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in } // NOTE: handle type can be different, see issue 29520 if indexHandle.IsInt() == insertionHandle.IsInt() && indexHandle.Compare(insertionHandle) != 0 { - err = ErrInconsistentHandle.GenWithStackByArgs(tableName, indexInfo.Name.O, indexHandle, insertionHandle, m, rowInsertion) + err = ErrInconsistentHandle.GenWithStackByArgs(tblInfo.Name, indexInfo.Name.O, indexHandle, insertionHandle, m, rowInsertion) logutil.BgLogger().Error("inconsistent handle in index and record insertions", zap.Error(err)) return err } @@ -209,9 +220,20 @@ func checkIndexKeys( return errors.New("index not found") } + var isTmpIdxValAndDeleted bool // If this is temp index data, need remove last byte of index data. if idxID != m.indexID { - value = append(value, m.value[:len(m.value)-1]...) + if tablecodec.TempIndexValueIsUntouched(m.value) { + // We never commit the untouched key values to the storage. Skip this check. + continue + } + tmpVal, err := tablecodec.DecodeTempIndexValue(m.value, t.Meta().IsCommonHandle) + if err != nil { + return err + } + curElem := tmpVal.Current() + isTmpIdxValAndDeleted = curElem.Delete + value = append(value, curElem.Value...) } else { value = append(value, m.value...) } @@ -245,7 +267,7 @@ func checkIndexKeys( } // When it is in add index new backfill state. - if len(value) == 0 || (idxID != m.indexID && (tablecodec.CheckTempIndexValueIsDelete(value))) { + if len(value) == 0 || isTmpIdxValAndDeleted { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToRemove, indexInfo, t.Meta()) } else { err = compareIndexData(sessVars.StmtCtx, t.Columns, indexData, rowToInsert, indexInfo, t.Meta()) diff --git a/table/tables/mutation_checker_test.go b/table/tables/mutation_checker_test.go index 43fb35c21a5b6..4c44e90a7d244 100644 --- a/table/tables/mutation_checker_test.go +++ b/table/tables/mutation_checker_test.go @@ -310,9 +310,9 @@ func TestCheckIndexKeysAndCheckHandleConsistency(t *testing.T) { require.Nil(t, err) rowMutation := mutation{key: rowKey, value: rowValue} corruptedRowMutation := mutation{key: corruptedRowKey, value: rowValue} - err = checkHandleConsistency(rowMutation, indexMutations, maps.IndexIDToInfo, "t") + err = checkHandleConsistency(rowMutation, indexMutations, maps.IndexIDToInfo, &tableInfo) require.Nil(t, err) - err = checkHandleConsistency(corruptedRowMutation, indexMutations, maps.IndexIDToInfo, "t") + err = checkHandleConsistency(corruptedRowMutation, indexMutations, maps.IndexIDToInfo, &tableInfo) require.NotNil(t, err) } } diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index c757bb4f82aa0..a28db76980cfa 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1143,8 +1143,8 @@ func TempIndexKey2IndexKey(originIdxID int64, tempIdxKey []byte) { binary.BigEndian.PutUint64(tempIdxKey[prefixLen:], eid) } -// IsTempIndexKey check whether the input key is for a temp index. -func IsTempIndexKey(indexKey []byte) bool { +// CheckTempIndexKey checks whether the input key is for a temp index. +func CheckTempIndexKey(indexKey []byte) (isTemp bool, originIdxID int64) { var ( indexIDKey []byte indexID int64 @@ -1154,101 +1154,206 @@ func IsTempIndexKey(indexKey []byte) bool { indexIDKey = indexKey[prefixLen : prefixLen+8] indexID = codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(indexIDKey)) tempIndexID = int64(TempIndexPrefix) | indexID - return tempIndexID == indexID + return tempIndexID == indexID, indexID & IndexIDMask } // TempIndexValueFlag is the flag of temporary index value. type TempIndexValueFlag byte const ( - // TempIndexValueFlagNormal means the following value is the normal index value. + // TempIndexValueFlagNormal means the following value is a distinct the normal index value. TempIndexValueFlagNormal TempIndexValueFlag = iota - // TempIndexValueFlagDeleted means this is a representation of a "delete" operation. + // TempIndexValueFlagNonDistinctNormal means the following value is the non-distinct normal index value. + TempIndexValueFlagNonDistinctNormal + // TempIndexValueFlagDeleted means the following value is the distinct and deleted index value. TempIndexValueFlagDeleted + // TempIndexValueFlagNonDistinctDeleted means the following value is the non-distinct deleted index value. + TempIndexValueFlagNonDistinctDeleted ) -// EncodeTempIndexValue encodes the value of temporary index. -// Note: this function changes the input value. -func EncodeTempIndexValue(value []byte, keyVer byte) []byte { - value = append(value, 0) - copy(value[1:], value[:len(value)-1]) - value[0] = byte(TempIndexValueFlagNormal) // normal flag + value + tempKeyVer - value = append(value, keyVer) - return value -} +// TempIndexValue is the value of temporary index. +// It contains one or more element, each element represents a history index operations on the original index. +// A temp index value element is encoded as one of: +// - [flag 1 byte][value_length 2 bytes ] [value value_len bytes] [key_version 1 byte] {distinct normal} +// - [flag 1 byte][value value_len bytes] [key_version 1 byte] {non-distinct normal} +// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [key_version 1 byte] {distinct deleted} +// - [flag 1 byte] [key_version 1 byte] {non-distinct deleted} +// +// The temp index value is encoded as: +// - [element 1][element 2]...[element n] {for distinct values} +// - [element 1] {for non-distinct values} +type TempIndexValue []*TempIndexValueElem -// EncodeTempIndexValueDeletedUnique encodes the value of temporary index for unique index. -func EncodeTempIndexValueDeletedUnique(handle kv.Handle, keyVer byte) []byte { - var hEncoded []byte - var hLen int - if handle.IsInt() { - var data [8]byte - binary.BigEndian.PutUint64(data[:], uint64(handle.IntValue())) - hEncoded = data[:] - hLen = 8 - } else { - hEncoded = handle.Encoded() - hLen = len(hEncoded) - } - val := make([]byte, 0, 1+hLen+1) // deleted flag + handle + tempKeyVer - val = append(val, byte(TempIndexValueFlagDeleted)) - val = append(val, hEncoded...) - val = append(val, keyVer) - return val +// IsEmpty checks whether the value is empty. +func (v TempIndexValue) IsEmpty() bool { + return len(v) == 0 } -// EncodeTempIndexValueDeleted encodes the delete operation on origin index to a value for temporary index. -func EncodeTempIndexValueDeleted(keyVer byte) []byte { - // Handle is not needed because it is already in the key. - val := make([]byte, 0, 2) // deleted flag + tempKeyVer - val = append(val, byte(TempIndexValueFlagDeleted)) - val = append(val, keyVer) - return val +// Current returns the current latest temp index value. +func (v TempIndexValue) Current() *TempIndexValueElem { + return v[len(v)-1] } -// DecodeTempIndexValue decodes the value of temporary index. -func DecodeTempIndexValue(value []byte, isCommonHandle bool) (originVal []byte, handle kv.Handle, isDelete bool, isUnique bool, keyVer byte) { - if len(value) == 0 { - return nil, nil, false, false, 0 +// FilterOverwritten is used by the temp index merge process to remove the overwritten index operations. +// For example, the value {temp_idx_key -> [h2, h2d, h3, h1d]} recorded four operations on the original index. +// Since 'h2d' overwrites 'h2', we can remove 'h2' from the value. +func (v TempIndexValue) FilterOverwritten() TempIndexValue { + if len(v) <= 1 || !v[0].Distinct { + return v } - switch TempIndexValueFlag(value[0]) { - case TempIndexValueFlagNormal: - originVal = value[1 : len(value)-1] - keyVer = value[len(value)-1] - case TempIndexValueFlagDeleted: - isDelete = true - if len(value) == 2 { - keyVer = value[1] + occurred := kv.NewHandleMap() + for i := len(v) - 1; i >= 0; i-- { + if _, ok := occurred.Get(v[i].Handle); !ok { + occurred.Set(v[i].Handle, struct{}{}) } else { - isUnique = true - if isCommonHandle { - handle, _ = kv.NewCommonHandle(value[1 : len(value)-1]) + v[i] = nil + } + } + ret := v[:0] + for _, elem := range v { + if elem != nil { + ret = append(ret, elem) + } + } + return ret +} + +// TempIndexValueElem represents a history index operations on the original index. +// A temp index value element is encoded as one of: +// - [flag 1 byte][value_length 2 bytes ] [value value_len bytes] [key_version 1 byte] {distinct normal} +// - [flag 1 byte][value value_len bytes] [key_version 1 byte] {non-distinct normal} +// - [flag 1 byte][handle_length 2 bytes] [handle handle_len bytes] [key_version 1 byte] {distinct deleted} +// - [flag 1 byte] [key_version 1 byte] {non-distinct deleted} +type TempIndexValueElem struct { + Value []byte + Handle kv.Handle + KeyVer byte + Delete bool + Distinct bool +} + +// Encode encodes the temp index value. +func (v *TempIndexValueElem) Encode(buf []byte) []byte { + if v.Delete { + if v.Distinct { + handle := v.Handle + var hEncoded []byte + var hLen uint16 + if handle.IsInt() { + hEncoded = codec.EncodeUint(hEncoded, uint64(handle.IntValue())) + hLen = 8 } else { - handle = decodeIntHandleInIndexValue(value[1 : len(value)-1]) + hEncoded = handle.Encoded() + hLen = uint16(len(hEncoded)) + } + // flag + handle length + handle + temp key version + if buf == nil { + buf = make([]byte, 0, hLen+4) } - keyVer = value[len(value)-1] + buf = append(buf, byte(TempIndexValueFlagDeleted)) + buf = append(buf, byte(hLen>>8), byte(hLen)) + buf = append(buf, hEncoded...) + buf = append(buf, v.KeyVer) + return buf } - } - return + // flag + temp key version + if buf == nil { + buf = make([]byte, 0, 2) + } + buf = append(buf, byte(TempIndexValueFlagNonDistinctDeleted)) + buf = append(buf, v.KeyVer) + return buf + } + if v.Distinct { + // flag + value length + value + temp key version + if buf == nil { + buf = make([]byte, 0, len(v.Value)+4) + } + buf = append(buf, byte(TempIndexValueFlagNormal)) + vLen := uint16(len(v.Value)) + buf = append(buf, byte(vLen>>8), byte(vLen)) + buf = append(buf, v.Value...) + buf = append(buf, v.KeyVer) + return buf + } + // flag + value + temp key version + if buf == nil { + buf = make([]byte, 0, len(v.Value)+2) + } + buf = append(buf, byte(TempIndexValueFlagNonDistinctNormal)) + buf = append(buf, v.Value...) + buf = append(buf, v.KeyVer) + return buf } -// CheckTempIndexValueIsDelete checks whether the value is a delete operation. -func CheckTempIndexValueIsDelete(value []byte) bool { - if len(value) == 0 { - return false +// DecodeTempIndexValue decodes the temp index value. +func DecodeTempIndexValue(value []byte, isCommonHandle bool) (TempIndexValue, error) { + var ( + values []*TempIndexValueElem + err error + ) + for len(value) > 0 { + v := &TempIndexValueElem{} + value, err = v.DecodeOne(value, isCommonHandle) + if err != nil { + return nil, err + } + values = append(values, v) } - return TempIndexValueFlag(value[0]) == TempIndexValueFlagDeleted + return values, nil } -// DecodeTempIndexOriginValue decodes the value of origin index from a temp index value. -func DecodeTempIndexOriginValue(value []byte) []byte { - if len(value) == 0 { - return nil +// DecodeOne decodes one temp index value element. +func (v *TempIndexValueElem) DecodeOne(b []byte, isCommonHandle bool) (remain []byte, err error) { + flag := TempIndexValueFlag(b[0]) + b = b[1:] + switch flag { + case TempIndexValueFlagNormal: + vLen := (uint16(b[0]) << 8) + uint16(b[1]) + b = b[2:] + v.Value = b[:vLen] + b = b[vLen:] + v.KeyVer = b[0] + b = b[1:] + v.Distinct = true + v.Handle, err = DecodeHandleInUniqueIndexValue(v.Value, isCommonHandle) + return b, err + case TempIndexValueFlagNonDistinctNormal: + v.Value = b[:len(b)-1] + v.KeyVer = b[len(b)-1] + return nil, nil + case TempIndexValueFlagDeleted: + hLen := (uint16(b[0]) << 8) + uint16(b[1]) + b = b[2:] + if isCommonHandle { + v.Handle, _ = kv.NewCommonHandle(b[:hLen]) + } else { + v.Handle = decodeIntHandleInIndexValue(b[:hLen]) + } + b = b[hLen:] + v.KeyVer = b[0] + b = b[1:] + v.Distinct = true + v.Delete = true + return b, nil + case TempIndexValueFlagNonDistinctDeleted: + v.KeyVer = b[0] + b = b[1:] + v.Delete = true + return b, nil + default: + return nil, errors.New("invalid temp index value") } - if TempIndexValueFlag(value[0]) == TempIndexValueFlagNormal { - return value[1 : len(value)-1] +} + +// TempIndexValueIsUntouched returns true if the value is untouched. +// All the temp index value has the suffix of temp key version. +// All the temp key versions differ from the uncommitted KV flag. +func TempIndexValueIsUntouched(b []byte) bool { + if len(b) > 0 && b[len(b)-1] == kv.UnCommitIndexKVFlag { + return true } - return nil + return false } // GenIndexValuePortal is the portal for generating index value. diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index 231d58cf18bd3..adc4ccc78c13b 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -617,26 +617,84 @@ func TestTempIndexValueCodec(t *testing.T) { require.NoError(t, err) encodedValueCopy := make([]byte, len(encodedValue)) copy(encodedValueCopy, encodedValue) - tempIdxVal := EncodeTempIndexValue(encodedValue, 'b') - originVal, handle, isDelete, unique, keyVer := DecodeTempIndexValue(tempIdxVal, false) - require.Nil(t, handle) - require.False(t, isDelete || unique) - require.Equal(t, keyVer, byte('b')) - require.EqualValues(t, encodedValueCopy, originVal) - - tempIdxVal = EncodeTempIndexValueDeletedUnique(kv.IntHandle(100), 'm') - originVal, handle, isDelete, unique, keyVer = DecodeTempIndexValue(tempIdxVal, false) - require.Equal(t, handle.IntValue(), int64(100)) - require.True(t, isDelete) - require.True(t, unique) - require.Equal(t, keyVer, byte('m')) - require.Empty(t, originVal) - - tempIdxVal = EncodeTempIndexValueDeleted('b') - originVal, handle, isDelete, unique, keyVer = DecodeTempIndexValue(tempIdxVal, false) - require.Nil(t, handle) - require.True(t, isDelete) - require.False(t, unique) - require.Equal(t, keyVer, byte('b')) - require.Empty(t, originVal) + + tempIdxVal := TempIndexValueElem{ + Value: encodedValue, + KeyVer: 'b', + } + val := tempIdxVal.Encode(nil) + var newTempIdxVal TempIndexValueElem + remain, err := newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + idxVal := EncodeHandleInUniqueIndexValue(kv.IntHandle(100), false) + tempIdxVal = TempIndexValueElem{ + Value: idxVal, + KeyVer: 'm', + Distinct: true, + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.Equal(t, newTempIdxVal.Handle.IntValue(), int64(100)) + newTempIdxVal.Handle = nil + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + tempIdxVal = TempIndexValueElem{ + Delete: true, + KeyVer: 'b', + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + tempIdxVal = TempIndexValueElem{ + Delete: true, + KeyVer: 'b', + Distinct: true, + Handle: kv.IntHandle(100), + } + newTempIdxVal = TempIndexValueElem{} + val = tempIdxVal.Encode(nil) + remain, err = newTempIdxVal.DecodeOne(val, false) + require.NoError(t, err) + require.Equal(t, 0, len(remain)) + require.EqualValues(t, tempIdxVal, newTempIdxVal) + + // Test multiple temp index value elements. + idxVal = EncodeHandleInUniqueIndexValue(kv.IntHandle(100), false) + tempIdxVal = TempIndexValueElem{ + Value: idxVal, + KeyVer: 'm', + Distinct: true, + } + tempIdxVal2 := TempIndexValueElem{ + Handle: kv.IntHandle(100), + KeyVer: 'm', + Distinct: true, + Delete: true, + } + idxVal3 := EncodeHandleInUniqueIndexValue(kv.IntHandle(101), false) + tempIdxVal3 := TempIndexValueElem{ + Value: idxVal3, + KeyVer: 'm', + Distinct: true, + } + val = tempIdxVal.Encode(nil) + val = tempIdxVal2.Encode(val) + val = tempIdxVal3.Encode(val) + var result TempIndexValue + result, err = DecodeTempIndexValue(val, false) + require.NoError(t, err) + require.Equal(t, 3, len(result)) + require.Equal(t, result[0].Handle.IntValue(), int64(100)) + require.Equal(t, result[1].Handle.IntValue(), int64(100)) + require.Equal(t, result[2].Handle.IntValue(), int64(101)) } From 32f4d3b317739ecf0dbf4e20be12d85e8d1ab461 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 13 Feb 2023 20:08:07 +0800 Subject: [PATCH 2/3] resolve conflict --- ddl/index.go | 19 +- ddl/index_merge_tmp_test.go | 19 +- table/tables/index.go | 396 ++++++------------------------------ 3 files changed, 66 insertions(+), 368 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 4a0bca7bbc46c..863a38c3a1312 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -890,10 +890,6 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} -<<<<<<< HEAD - rh := newReorgHandler(t, w.sess, w.concurrentDDL) - reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, elements, mergingTmpIdx) -======= failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) { //nolint:forcetypeassert @@ -903,19 +899,8 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, } }) - sctx, err1 := w.sessPool.get() - if err1 != nil { - err = err1 - return - } - defer w.sessPool.put(sctx) - rh := newReorgHandler(newSession(sctx)) - dbInfo, err := t.GetDatabase(job.SchemaID) - if err != nil { - return false, ver, errors.Trace(err) - } - reorgInfo, err := getReorgInfo(d.jobContext(job.ID), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx) ->>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) + rh := newReorgHandler(t, w.sess, w.concurrentDDL) + reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, elements, mergingTmpIdx) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 26c355282c608..42cf75bb4a2cd 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -21,13 +21,8 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/ingest" -<<<<<<< HEAD:ddl/index_merge_tmp_test.go "github.com/pingcap/tidb/domain" -======= - "github.com/pingcap/tidb/ddl/internal/callback" - "github.com/pingcap/tidb/ddl/testutil" "github.com/pingcap/tidb/errno" ->>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)):ddl/indexmergetest/merge_test.go "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/tablecodec" @@ -561,7 +556,7 @@ func TestAddIndexMergeInsertOnMerging(t *testing.T) { d := dom.DDL() originalCallback := d.GetHook() defer d.SetHook(originalCallback) - callback := &callback.TestDDLCallback{} + callback := &ddl.TestDDLCallback{} onJobUpdatedExportedFunc := func(job *model.Job) { if t.Failed() { return @@ -639,7 +634,7 @@ func TestAddIndexMergeInsertToDeletedTempIndex(t *testing.T) { d := dom.DDL() originalCallback := d.GetHook() defer d.SetHook(originalCallback) - callback := &callback.TestDDLCallback{} + callback := &ddl.TestDDLCallback{} onJobUpdatedExportedFunc := func(job *model.Job) { if t.Failed() { return @@ -682,7 +677,7 @@ func TestAddIndexMergeReplaceDelete(t *testing.T) { d := dom.DDL() originalCallback := d.GetHook() defer d.SetHook(originalCallback) - callback := &callback.TestDDLCallback{} + callback := &ddl.TestDDLCallback{} onJobUpdatedExportedFunc := func(job *model.Job) { if t.Failed() { return @@ -723,7 +718,7 @@ func TestAddIndexMergeDeleteDifferentHandle(t *testing.T) { d := dom.DDL() originalCallback := d.GetHook() defer d.SetHook(originalCallback) - callback := &callback.TestDDLCallback{} + callback := &ddl.TestDDLCallback{} runDML := false onJobUpdatedExportedFunc := func(job *model.Job) { if t.Failed() || runDML { @@ -770,7 +765,7 @@ func TestAddIndexDecodeTempIndexCommonHandle(t *testing.T) { d := dom.DDL() originalCallback := d.GetHook() defer d.SetHook(originalCallback) - callback := &callback.TestDDLCallback{} + callback := &ddl.TestDDLCallback{} runDML := false onJobUpdatedExportedFunc := func(job *model.Job) { if t.Failed() || runDML { @@ -809,7 +804,7 @@ func TestAddIndexInsertIgnoreOnBackfill(t *testing.T) { d := dom.DDL() originalCallback := d.GetHook() defer d.SetHook(originalCallback) - callback := &callback.TestDDLCallback{} + callback := &ddl.TestDDLCallback{} runDML := false onJobUpdatedExportedFunc := func(job *model.Job) { if t.Failed() || runDML { @@ -848,7 +843,7 @@ func TestAddIndexMultipleDelete(t *testing.T) { d := dom.DDL() originalCallback := d.GetHook() defer d.SetHook(originalCallback) - callback := &callback.TestDDLCallback{} + callback := &ddl.TestDDLCallback{} onJobUpdatedExportedFunc := func(job *model.Job) { if t.Failed() { return diff --git a/table/tables/index.go b/table/tables/index.go index 4e27da351dff3..0f856447464e0 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -176,178 +176,27 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic if !distinct || skipCheck || opt.Untouched { + val := idxVal if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage. - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} + val = tempVal.Encode(nil) } - err = txn.GetMemBuffer().Set(key, idxVal) + err = txn.GetMemBuffer().Set(key, val) if err != nil { return nil, err } -<<<<<<< HEAD if len(tempKey) > 0 { if !opt.Untouched { // Untouched key-values never occur in the storage. - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - } - err = txn.GetMemBuffer().Set(tempKey, idxVal) - if err != nil { - return nil, err - } - } - if !opt.IgnoreAssertion && (!opt.Untouched) { - if sctx.GetSessionVars().LazyCheckKeyNotExists() && !txn.IsPessimistic() { -======= - - var ( - tempKey []byte - keyVer byte - keyIsTempIdxKey bool - ) - if !opt.FromBackFill { - key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) - if keyVer == TempIndexKeyTypeBackfill || keyVer == TempIndexKeyTypeDelete { - key, tempKey = tempKey, nil - keyIsTempIdxKey = true - } - } - - if opt.Untouched { - txn, err1 := sctx.Txn(true) - if err1 != nil { - return nil, err1 - } - // If the index kv was untouched(unchanged), and the key/value already exists in mem-buffer, - // should not overwrite the key with un-commit flag. - // So if the key exists, just do nothing and return. - v, err := txn.GetMemBuffer().Get(ctx, key) - if err == nil { - if len(v) != 0 { - continue - } - // The key is marked as deleted in the memory buffer, as the existence check is done lazily - // for optimistic transactions by default. The "untouched" key could still exist in the store, - // it's needed to commit this key to do the existence check so unset the untouched flag. - if !txn.IsPessimistic() { - keyFlags, err := txn.GetMemBuffer().GetFlags(key) - if err != nil { - return nil, err - } - if keyFlags.HasPresumeKeyNotExists() { - opt.Untouched = false - } - } - } - } - - // save the key buffer to reuse. - writeBufs.IndexKeyBuf = key - c.initNeedRestoreData.Do(func() { - c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns) - }) - idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx, c.tblInfo, c.idxInfo, c.needRestoredData, distinct, opt.Untouched, value, h, c.phyTblID, handleRestoreData) - if err != nil { - return nil, err - } - - opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic - - if !distinct || skipCheck || opt.Untouched { - val := idxVal - if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage. tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} val = tempVal.Encode(nil) } - err = txn.GetMemBuffer().Set(key, val) + err = txn.GetMemBuffer().Set(tempKey, val) if err != nil { return nil, err } - if len(tempKey) > 0 { - if !opt.Untouched { // Untouched key-values never occur in the storage. - tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: distinct} - val = tempVal.Encode(nil) - } - err = txn.GetMemBuffer().Set(tempKey, val) - if err != nil { - return nil, err - } - } - if !opt.IgnoreAssertion && (!opt.Untouched) { - if sctx.GetSessionVars().LazyCheckKeyNotExists() && !txn.IsPessimistic() { - err = txn.SetAssertion(key, kv.SetAssertUnknown) - } else { - err = txn.SetAssertion(key, kv.SetAssertNotExist) - } - } - if err != nil { - return nil, err - } - continue - } - - var value []byte - if c.tblInfo.TempTableType != model.TempTableNone { - // Always check key for temporary table because it does not write to TiKV - value, err = txn.Get(ctx, key) - } else if sctx.GetSessionVars().LazyCheckKeyNotExists() { - value, err = txn.GetMemBuffer().Get(ctx, key) - } else { - value, err = txn.Get(ctx, key) - } - if err != nil && !kv.IsErrNotFound(err) { - return nil, err } - var tempIdxVal tablecodec.TempIndexValue - if len(value) > 0 && keyIsTempIdxKey { - tempIdxVal, err = tablecodec.DecodeTempIndexValue(value, c.tblInfo.IsCommonHandle) - if err != nil { - return nil, err - } - } - // The index key value is not found or deleted. - if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) { - val := idxVal - lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil - if keyIsTempIdxKey { - tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} - val = tempVal.Encode(value) - } - needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, h, - keyIsTempIdxKey, c.tblInfo.IsCommonHandle, c.tblInfo.ID) - if err != nil { - return nil, err - } - if lazyCheck { - var flags []kv.FlagsOp - if needPresumeNotExists { - flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} - } - if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && - !vars.InRestrictedSQL && vars.ConnectionID > 0 { - flags = append(flags, kv.SetNeedConstraintCheckInPrewrite) - } - err = txn.GetMemBuffer().SetWithFlags(key, val, flags...) - } else { - err = txn.GetMemBuffer().Set(key, val) - } - if err != nil { - return nil, err - } - if len(tempKey) > 0 { - tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} - val = tempVal.Encode(value) - if lazyCheck && needPresumeNotExists { - err = txn.GetMemBuffer().SetWithFlags(tempKey, val, kv.SetPresumeKeyNotExists) - } else { - err = txn.GetMemBuffer().Set(tempKey, val) - } - if err != nil { - return nil, err - } - } - if opt.IgnoreAssertion { - continue - } - if lazyCheck && !txn.IsPessimistic() { ->>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) + if !opt.IgnoreAssertion && (!opt.Untouched) { + if sctx.GetSessionVars().LazyCheckKeyNotExists() && !txn.IsPessimistic() { err = txn.SetAssertion(key, kv.SetAssertUnknown) } else { err = txn.SetAssertion(key, kv.SetAssertNotExist) @@ -378,51 +227,49 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue if err != nil && !kv.IsErrNotFound(err) { return nil, err } - if err != nil || len(value) == 0 || (keyIsTempIdxKey && tablecodec.CheckTempIndexValueIsDelete(value)) { + var tempIdxVal tablecodec.TempIndexValue + if len(value) > 0 && keyIsTempIdxKey { + tempIdxVal, err = tablecodec.DecodeTempIndexValue(value, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err + } + } + // The index key value is not found or deleted. + if err != nil || len(value) == 0 || (!tempIdxVal.IsEmpty() && tempIdxVal.Current().Delete) { + val := idxVal lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil - var needPresumeKey TempIndexKeyState if keyIsTempIdxKey { - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) - if err != nil { - return nil, err - } - } else { - if len(tempKey) > 0 { - needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) - if err != nil { - return nil, err - } - } + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} + val = tempVal.Encode(value) + } + needPresumeNotExists, err := needPresumeKeyNotExistsFlag(ctx, txn, key, tempKey, h, + keyIsTempIdxKey, c.tblInfo.IsCommonHandle, c.tblInfo.ID) + if err != nil { + return nil, err } -<<<<<<< HEAD if lazyCheck { var flags []kv.FlagsOp - if needPresumeKey != KeyInTempIndexIsDeleted { + if needPresumeNotExists { flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} } if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && !vars.InRestrictedSQL && vars.ConnectionID > 0 { flags = append(flags, kv.SetNeedConstraintCheckInPrewrite) } - err = txn.GetMemBuffer().SetWithFlags(key, idxVal, flags...) + err = txn.GetMemBuffer().SetWithFlags(key, val, flags...) } else { - err = txn.GetMemBuffer().Set(key, idxVal) -======= - - if keyIsTempIdxKey && !tempIdxVal.IsEmpty() { - value = tempIdxVal.Current().Value ->>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) + err = txn.GetMemBuffer().Set(key, val) } if err != nil { return nil, err } if len(tempKey) > 0 { - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted { - err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists) + tempVal := tablecodec.TempIndexValueElem{Value: idxVal, KeyVer: keyVer, Distinct: true} + val = tempVal.Encode(value) + if lazyCheck && needPresumeNotExists { + err = txn.GetMemBuffer().SetWithFlags(tempKey, val, kv.SetPresumeKeyNotExists) } else { - err = txn.GetMemBuffer().Set(tempKey, idxVal) + err = txn.GetMemBuffer().Set(tempKey, val) } if err != nil { return nil, err @@ -439,8 +286,8 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue return nil, err } - if keyIsTempIdxKey { - value = tablecodec.DecodeTempIndexOriginValue(value) + if keyIsTempIdxKey && !tempIdxVal.IsEmpty() { + value = tempIdxVal.Current().Value } handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) if err != nil { @@ -470,102 +317,23 @@ func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, t } // Delete removes the entry for handle h and indexedValues from KV index. -<<<<<<< HEAD func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error { key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil) -======= -func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) error { - indexedValues := c.getIndexedValue(indexedValue) - for _, value := range indexedValues { - key, distinct, err := c.GenIndexKey(sc, value, h, nil) - if err != nil { - return err - } - - key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key) - var originTempVal []byte - if len(tempKey) > 0 && c.idxInfo.Unique { - // Get the origin value of the unique temporary index key. - // Append the new delete operations to the end of the origin value. - originTempVal, err = getKeyInTxn(context.TODO(), txn, tempKey) - if err != nil { - return err - } - } - tempValElem := tablecodec.TempIndexValueElem{Handle: h, KeyVer: tempKeyVer, Delete: true, Distinct: distinct} - - if distinct { - if len(key) > 0 { - err = txn.GetMemBuffer().DeleteWithFlags(key, kv.SetNeedLocked) - if err != nil { - return err - } - } - if len(tempKey) > 0 { - // Append to the end of the origin value for distinct value. - tempVal := tempValElem.Encode(originTempVal) - err = txn.GetMemBuffer().Set(tempKey, tempVal) - if err != nil { - return err - } - } - } else { - if len(key) > 0 { - err = txn.GetMemBuffer().Delete(key) - if err != nil { - return err - } - } - if len(tempKey) > 0 { - tempVal := tempValElem.Encode(nil) - err = txn.GetMemBuffer().Set(tempKey, tempVal) - if err != nil { - return err - } - } - } - if c.idxInfo.State == model.StatePublic { - // If the index is in public state, delete this index means it must exists. - err = txn.SetAssertion(key, kv.SetAssertExist) - } - if err != nil { - return err - } - } - return nil -} - -func (c *index) GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) table.IndexIter { - indexedValues := c.getIndexedValue(indexedValue) - return &indexGenerator{ - c: c, - sctx: sc, - indexedVals: indexedValues, - h: h, - handleRestoreData: handleRestoreData, - i: 0, - } -} - -type indexGenerator struct { - c *index - sctx *stmtctx.StatementContext - indexedVals [][]types.Datum - h kv.Handle - handleRestoreData []types.Datum - - i int -} - -func (s *indexGenerator) Next(kb []byte) ([]byte, []byte, bool, error) { - val := s.indexedVals[s.i] - key, distinct, err := s.c.GenIndexKey(s.sctx, val, s.h, kb) ->>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) if err != nil { return err } key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key) + var originTempVal []byte + if len(tempKey) > 0 && c.idxInfo.Unique { + // Get the origin value of the unique temporary index key. + // Append the new delete operations to the end of the origin value. + originTempVal, err = getKeyInTxn(context.TODO(), txn, tempKey) + if err != nil { + return err + } + } + tempValElem := tablecodec.TempIndexValueElem{Handle: h, KeyVer: tempKeyVer, Delete: true, Distinct: distinct} if distinct { if len(key) > 0 { @@ -575,7 +343,8 @@ func (s *indexGenerator) Next(kb []byte) ([]byte, []byte, bool, error) { } } if len(tempKey) > 0 { - tempVal := tablecodec.EncodeTempIndexValueDeletedUnique(h, tempKeyVer) + // Append to the end of the origin value for distinct value. + tempVal := tempValElem.Encode(originTempVal) err = txn.GetMemBuffer().Set(tempKey, tempVal) if err != nil { return err @@ -589,7 +358,7 @@ func (s *indexGenerator) Next(kb []byte) ([]byte, []byte, bool, error) { } } if len(tempKey) > 0 { - tempVal := tablecodec.EncodeTempIndexValueDeleted(tempKeyVer) + tempVal := tempValElem.Encode(nil) err = txn.GetMemBuffer().Set(tempKey, tempVal) if err != nil { return err @@ -644,42 +413,18 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV if err != nil { return false, nil, err } - - var ( - tempKey []byte - keyVer byte - ) // If index current is in creating status and using ingest mode, we need first // check key exist status in temp index. - key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) - if keyVer != TempIndexKeyTypeNone { - KeyExistInfo, h1, err1 := KeyExistInTempIndex(context.TODO(), txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) - if err1 != nil { - return false, nil, err - } -<<<<<<< HEAD - switch KeyExistInfo { - case KeyInTempIndexNotExist, KeyInTempIndexIsDeleted: - return false, nil, nil - case KeyInTempIndexConflict: - return true, h1, kv.ErrKeyExists - case KeyInTempIndexIsItself: - return true, h, nil -======= - // If index current is in creating status and using ingest mode, we need first - // check key exist status in temp index. - key, tempKey, _ := GenTempIdxKeyByState(c.idxInfo, key) - if len(tempKey) > 0 { - key = tempKey - } - foundKey, dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, distinct, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle) - if err != nil || !foundKey { - return false, nil, err - } - if dupHandle != nil && !dupHandle.Equal(h) { - return false, nil, err - } - continue + key, tempKey, _ := GenTempIdxKeyByState(c.idxInfo, key) + if len(tempKey) > 0 { + key = tempKey + } + foundKey, dupHandle, err := FetchDuplicatedHandle(context.TODO(), key, distinct, txn, c.tblInfo.ID, c.tblInfo.IsCommonHandle) + if err != nil || !foundKey { + return false, nil, err + } + if dupHandle != nil && !dupHandle.Equal(h) { + return false, nil, err } return true, h, nil } @@ -721,35 +466,9 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d return false, nil, err } return true, originHandle, err ->>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) } return false, nil, nil } -<<<<<<< HEAD - - value, err := txn.Get(context.TODO(), key) - if kv.IsErrNotFound(err) { - return false, nil, nil - } - if err != nil { - return false, nil, err - } - - // For distinct index, the value of key is handle. - if distinct { - var handle kv.Handle - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) - if err != nil { - return false, nil, err - } - if !handle.Equal(h) { - return true, handle, kv.ErrKeyExists - } - return true, handle, nil - } - - return true, h, nil -======= tempVal, err := tablecodec.DecodeTempIndexValue(tempRawVal, isCommon) if err != nil { return false, nil, err @@ -802,7 +521,6 @@ func getKeyInTxn(ctx context.Context, txn kv.Transaction, key kv.Key) ([]byte, e return nil, err } return val, nil ->>>>>>> c6e6d621e2 (ddl: check the key existence on original index (#40749)) } func (c *index) FetchValues(r []types.Datum, vals []types.Datum) ([]types.Datum, error) { From 4238b026e4407ba55a2867c41fe0b8b18c2d6873 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 13 Feb 2023 21:05:39 +0800 Subject: [PATCH 3/3] update bazel --- ddl/indexmergetest/BUILD.bazel | 33 --------------------------------- 1 file changed, 33 deletions(-) diff --git a/ddl/indexmergetest/BUILD.bazel b/ddl/indexmergetest/BUILD.bazel index b70146ae8d461..e69de29bb2d1d 100644 --- a/ddl/indexmergetest/BUILD.bazel +++ b/ddl/indexmergetest/BUILD.bazel @@ -1,33 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_test") - -go_test( - name = "indexmergetest_test", - timeout = "moderate", - srcs = [ - "main_test.go", - "merge_test.go", - ], - flaky = True, - race = "on", - shard_count = 4, - deps = [ - "//config", - "//ddl", - "//ddl/ingest", - "//ddl/internal/callback", - "//ddl/testutil", - "//domain", - "//errno", - "//kv", - "//meta/autoid", - "//parser/model", - "//tablecodec", - "//testkit", - "//testkit/testsetup", - "@com_github_pingcap_failpoint//:failpoint", - "@com_github_stretchr_testify//assert", - "@com_github_stretchr_testify//require", - "@com_github_tikv_client_go_v2//tikv", - "@org_uber_go_goleak//:goleak", - ], -)