From d3b952a49a95b194d7f697a66bfa30e8bb20440c Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Fri, 30 Dec 2022 14:38:17 +0900 Subject: [PATCH] executor: write multi-valued index (#40172) close pingcap/tidb#40207 --- executor/admin.go | 4 +- executor/builder.go | 4 +- expression/multi_valued_index_test.go | 257 +++++++++++++++++ planner/core/logical_plan_builder.go | 3 + planner/core/plan_to_pb.go | 2 +- planner/core/plan_to_pb_test.go | 16 +- table/tables/index.go | 401 +++++++++++++++----------- table/tables/mutation_checker.go | 26 +- table/tables/tables.go | 4 +- types/convert.go | 2 + types/datum.go | 4 + types/json_binary.go | 22 ++ util/misc.go | 13 +- util/misc_test.go | 4 +- 14 files changed, 570 insertions(+), 192 deletions(-) diff --git a/executor/admin.go b/executor/admin.go index a0484ce957b30..21378b21b1677 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -163,7 +163,7 @@ func (e *CheckIndexRangeExec) constructIndexScanPB() *tipb.Executor { idxExec := &tipb.IndexScan{ TableId: e.table.ID, IndexId: e.index.ID, - Columns: util.ColumnsToProto(e.cols, e.table.PKIsHandle), + Columns: util.ColumnsToProto(e.cols, e.table.PKIsHandle, true), } return &tipb.Executor{Tp: tipb.ExecType_TypeIndexScan, IdxScan: idxExec} } @@ -814,7 +814,7 @@ func (e *CleanupIndexExec) constructIndexScanPB() *tipb.Executor { idxExec := &tipb.IndexScan{ TableId: e.physicalID, IndexId: e.index.Meta().ID, - Columns: util.ColumnsToProto(e.columns, e.table.Meta().PKIsHandle), + Columns: util.ColumnsToProto(e.columns, e.table.Meta().PKIsHandle, true), PrimaryColumnIds: tables.TryGetCommonPkColumnIds(e.table.Meta()), } return &tipb.Executor{Tp: tipb.ExecType_TypeIndexScan, IdxScan: idxExec} diff --git a/executor/builder.go b/executor/builder.go index c6baaf0932849..f771c706bfa9b 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2580,7 +2580,7 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(task plannercore.AnalyzeC SampleSize: int64(opts[ast.AnalyzeOptNumSamples]), SampleRate: sampleRate, SketchSize: maxSketchSize, - ColumnsInfo: util.ColumnsToProto(task.ColsInfo, task.TblInfo.PKIsHandle), + ColumnsInfo: util.ColumnsToProto(task.ColsInfo, task.TblInfo.PKIsHandle, false), ColumnGroups: colGroups, } if task.TblInfo != nil { @@ -2741,7 +2741,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeCo BucketSize: int64(opts[ast.AnalyzeOptNumBuckets]), SampleSize: MaxRegionSampleSize, SketchSize: maxSketchSize, - ColumnsInfo: util.ColumnsToProto(cols, task.HandleCols != nil && task.HandleCols.IsInt()), + ColumnsInfo: util.ColumnsToProto(cols, task.HandleCols != nil && task.HandleCols.IsInt(), false), CmsketchDepth: &depth, CmsketchWidth: &width, } diff --git a/expression/multi_valued_index_test.go b/expression/multi_valued_index_test.go index 33557726a1073..97e59993e23b4 100644 --- a/expression/multi_valued_index_test.go +++ b/expression/multi_valued_index_test.go @@ -15,11 +15,20 @@ package expression_test import ( + "context" "fmt" "testing" "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" + "github.com/stretchr/testify/require" ) func TestMultiValuedIndexDDL(t *testing.T) { @@ -225,3 +234,251 @@ func TestMultiValuedIndexDML(t *testing.T) { tk.MustGetErrCode(`insert into t values (json_array(cast('{"a":1}' as json)));`, errno.ErrInvalidJSONValueForFuncIndex) } } + +func TestWriteMultiValuedIndex(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1(pk int primary key, a json, index idx((cast(a as signed array))))") + tk.MustExec("insert into t1 values (1, '[1,2,2,3]')") + tk.MustExec("insert into t1 values (2, '[1,2,3]')") + tk.MustExec("insert into t1 values (3, '[]')") + tk.MustExec("insert into t1 values (4, '[2,3,4]')") + tk.MustExec("insert into t1 values (5, null)") + + t1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + require.NoError(t, err) + for _, index := range t1.Indices() { + if index.Meta().MVIndex { + checkCount(t, t1.IndexPrefix(), index, store, 10) + checkKey(t, t1.IndexPrefix(), index, store, [][]types.Datum{ + {types.NewDatum(nil), types.NewIntDatum(5)}, + {types.NewIntDatum(1), types.NewIntDatum(1)}, + {types.NewIntDatum(1), types.NewIntDatum(2)}, + {types.NewIntDatum(2), types.NewIntDatum(1)}, + {types.NewIntDatum(2), types.NewIntDatum(2)}, + {types.NewIntDatum(2), types.NewIntDatum(4)}, + {types.NewIntDatum(3), types.NewIntDatum(1)}, + {types.NewIntDatum(3), types.NewIntDatum(2)}, + {types.NewIntDatum(3), types.NewIntDatum(4)}, + {types.NewIntDatum(4), types.NewIntDatum(4)}, + }) + } + } + tk.MustExec("delete from t1") + for _, index := range t1.Indices() { + if index.Meta().MVIndex { + checkCount(t, t1.IndexPrefix(), index, store, 0) + } + } + + tk.MustExec("drop table t1") + tk.MustExec("create table t1(pk int primary key, a json, index idx((cast(a as char(5) array))))") + tk.MustExec("insert into t1 values (1, '[\"abc\", \"abc \"]')") + tk.MustExec("insert into t1 values (2, '[\"b\"]')") + tk.MustExec("insert into t1 values (3, '[\"b \"]')") + + t1, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + require.NoError(t, err) + for _, index := range t1.Indices() { + if index.Meta().MVIndex { + checkCount(t, t1.IndexPrefix(), index, store, 4) + checkKey(t, t1.IndexPrefix(), index, store, [][]types.Datum{ + {types.NewBytesDatum([]byte("abc")), types.NewIntDatum(1)}, + {types.NewBytesDatum([]byte("abc ")), types.NewIntDatum(1)}, + {types.NewBytesDatum([]byte("b")), types.NewIntDatum(2)}, + {types.NewBytesDatum([]byte("b ")), types.NewIntDatum(3)}, + }) + } + } + + tk.MustExec("update t1 set a = json_array_append(a, '$', 'bcd') where pk = 1") + tk.MustExec("update t1 set a = '[]' where pk = 2") + tk.MustExec("update t1 set a = '[\"abc\"]' where pk = 3") + + for _, index := range t1.Indices() { + if index.Meta().MVIndex { + checkCount(t, t1.IndexPrefix(), index, store, 4) + checkKey(t, t1.IndexPrefix(), index, store, [][]types.Datum{ + {types.NewBytesDatum([]byte("abc")), types.NewIntDatum(1)}, + {types.NewBytesDatum([]byte("abc")), types.NewIntDatum(3)}, + {types.NewBytesDatum([]byte("abc ")), types.NewIntDatum(1)}, + {types.NewBytesDatum([]byte("bcd")), types.NewIntDatum(1)}, + }) + } + } + + tk.MustExec("delete from t1") + for _, index := range t1.Indices() { + if index.Meta().MVIndex { + checkCount(t, t1.IndexPrefix(), index, store, 0) + } + } +} + +func TestWriteMultiValuedIndexPartitionTable(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`create table t1 +( + pk int primary key, + a json, + index idx ((cast(a as signed array))) +) partition by range columns (pk) (partition p0 values less than (10), partition p1 values less than (20));`) + tk.MustExec("insert into t1 values (1, '[1,2,2,3]')") + tk.MustExec("insert into t1 values (11, '[1,2,3]')") + tk.MustExec("insert into t1 values (2, '[]')") + tk.MustExec("insert into t1 values (12, '[2,3,4]')") + tk.MustExec("insert into t1 values (3, null)") + tk.MustExec("insert into t1 values (13, null)") + + t1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + require.NoError(t, err) + + expect := map[string]struct { + count int + vals [][]types.Datum + }{ + "p0": {4, [][]types.Datum{ + {types.NewDatum(nil), types.NewIntDatum(3)}, + {types.NewIntDatum(1), types.NewIntDatum(1)}, + {types.NewIntDatum(2), types.NewIntDatum(1)}, + {types.NewIntDatum(3), types.NewIntDatum(1)}, + }}, + "p1": {7, [][]types.Datum{ + {types.NewDatum(nil), types.NewIntDatum(13)}, + {types.NewIntDatum(1), types.NewIntDatum(11)}, + {types.NewIntDatum(2), types.NewIntDatum(11)}, + {types.NewIntDatum(2), types.NewIntDatum(12)}, + {types.NewIntDatum(3), types.NewIntDatum(11)}, + {types.NewIntDatum(3), types.NewIntDatum(12)}, + {types.NewIntDatum(4), types.NewIntDatum(12)}, + }}, + } + + for _, def := range t1.Meta().GetPartitionInfo().Definitions { + partition := t1.(table.PartitionedTable).GetPartition(def.ID) + for _, index := range partition.Indices() { + if index.Meta().MVIndex { + checkCount(t, partition.IndexPrefix(), index, store, expect[def.Name.L].count) + checkKey(t, partition.IndexPrefix(), index, store, expect[def.Name.L].vals) + } + } + } + + tk.MustExec("delete from t1") + for _, def := range t1.Meta().GetPartitionInfo().Definitions { + partition := t1.(table.PartitionedTable).GetPartition(def.ID) + for _, index := range partition.Indices() { + if index.Meta().MVIndex { + checkCount(t, partition.IndexPrefix(), index, store, 0) + } + } + } +} + +func TestWriteMultiValuedIndexUnique(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1(pk int primary key, a json, unique index idx((cast(a as signed array))))") + tk.MustExec("insert into t1 values (1, '[1,2,2]')") + tk.MustGetErrCode("insert into t1 values (2, '[1]')", errno.ErrDupEntry) + tk.MustExec("insert into t1 values (3, '[3,3,4]')") + + t1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + require.NoError(t, err) + for _, index := range t1.Indices() { + if index.Meta().MVIndex { + checkCount(t, t1.IndexPrefix(), index, store, 4) + checkKey(t, t1.IndexPrefix(), index, store, [][]types.Datum{ + {types.NewIntDatum(1)}, + {types.NewIntDatum(2)}, + {types.NewIntDatum(3)}, + {types.NewIntDatum(4)}, + }) + } + } +} + +func TestWriteMultiValuedIndexComposite(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1(pk int primary key, a json, c int, d int, index idx(c, (cast(a as signed array)), d))") + tk.MustExec("insert into t1 values (1, '[1,2,2]', 1, 1)") + tk.MustExec("insert into t1 values (2, '[2,2,2]', 2, 2)") + tk.MustExec("insert into t1 values (3, '[3,3,4]', 3, 3)") + tk.MustExec("insert into t1 values (4, null, 4, 4)") + tk.MustExec("insert into t1 values (5, '[]', 5, 5)") + + t1, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) + require.NoError(t, err) + for _, index := range t1.Indices() { + if index.Meta().MVIndex { + checkCount(t, t1.IndexPrefix(), index, store, 6) + checkKey(t, t1.IndexPrefix(), index, store, [][]types.Datum{ + {types.NewIntDatum(1), types.NewIntDatum(1), types.NewIntDatum(1), types.NewIntDatum(1)}, + {types.NewIntDatum(1), types.NewIntDatum(2), types.NewIntDatum(1), types.NewIntDatum(1)}, + {types.NewIntDatum(2), types.NewIntDatum(2), types.NewIntDatum(2), types.NewIntDatum(2)}, + {types.NewIntDatum(3), types.NewIntDatum(3), types.NewIntDatum(3), types.NewIntDatum(3)}, + {types.NewIntDatum(3), types.NewIntDatum(4), types.NewIntDatum(3), types.NewIntDatum(3)}, + {types.NewIntDatum(4), types.NewDatum(nil), types.NewIntDatum(4), types.NewIntDatum(4)}, + }) + } + } +} + +func checkCount(t *testing.T, prefix kv.Key, index table.Index, store kv.Storage, except int) { + c := 0 + checkIndex(t, prefix, index, store, func(it kv.Iterator) { + c++ + }) + require.Equal(t, except, c) +} + +func checkKey(t *testing.T, prefix kv.Key, index table.Index, store kv.Storage, except [][]types.Datum) { + idx := 0 + checkIndex(t, prefix, index, store, func(it kv.Iterator) { + indexKey := decodeIndexKey(t, it.Key()) + require.Equal(t, except[idx], indexKey) + idx++ + }) +} + +func checkIndex(t *testing.T, prefix kv.Key, index table.Index, store kv.Storage, fn func(kv.Iterator)) { + startKey := codec.EncodeInt(prefix, index.Meta().ID) + prefix.Next() + se := testkit.NewTestKit(t, store).Session() + err := sessiontxn.NewTxn(context.Background(), se) + require.NoError(t, err) + txn, err := se.Txn(true) + require.NoError(t, err) + it, err := txn.Iter(startKey, prefix.PrefixNext()) + require.NoError(t, err) + for it.Valid() && it.Key().HasPrefix(prefix) { + fn(it) + err = it.Next() + require.NoError(t, err) + } + it.Close() + se.Close() +} + +func decodeIndexKey(t *testing.T, key kv.Key) []types.Datum { + var idLen = 8 + var prefixLen = 1 + idLen /*tableID*/ + 2 + _, _, isRecord, err := tablecodec.DecodeKeyHead(key) + require.NoError(t, err) + require.False(t, isRecord) + indexKey := key[prefixLen+idLen:] + var datumValues []types.Datum + for len(indexKey) > 0 { + remain, d, err := codec.DecodeOne(indexKey) + require.NoError(t, err) + datumValues = append(datumValues, d) + indexKey = remain + } + return datumValues +} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index f60c1f617e467..592bb55f79619 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -5739,7 +5739,10 @@ func (b *PlanBuilder) buildUpdateLists(ctx context.Context, tableList []*ast.Tab } } + o := b.allowBuildCastArray + b.allowBuildCastArray = true newExpr, np, err = b.rewriteWithPreprocess(ctx, assign.Expr, p, nil, nil, false, rewritePreprocess(assign)) + b.allowBuildCastArray = o if err != nil { return nil, nil, false, err } diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index a68fdae38f5de..71a7f11c09a65 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -370,7 +370,7 @@ func (p *PhysicalIndexScan) ToPB(_ sessionctx.Context, _ kv.StoreType) (*tipb.Ex idxExec := &tipb.IndexScan{ TableId: p.Table.ID, IndexId: p.Index.ID, - Columns: util.ColumnsToProto(columns, p.Table.PKIsHandle), + Columns: util.ColumnsToProto(columns, p.Table.PKIsHandle, true), Desc: p.Desc, PrimaryColumnIds: pkColIds, } diff --git a/planner/core/plan_to_pb_test.go b/planner/core/plan_to_pb_test.go index 9ea55a23babdd..cb108678629f2 100644 --- a/planner/core/plan_to_pb_test.go +++ b/planner/core/plan_to_pb_test.go @@ -35,16 +35,16 @@ func TestColumnToProto(t *testing.T) { col := &model.ColumnInfo{ FieldType: *tp, } - pc := util.ColumnToProto(col) + pc := util.ColumnToProto(col, false) expect := &tipb.ColumnInfo{ColumnId: 0, Tp: 3, Collation: 83, ColumnLen: 11, Decimal: 0, Flag: 10, Elems: []string(nil), DefaultVal: []uint8(nil), PkHandle: false, XXX_unrecognized: []uint8(nil)} require.Equal(t, expect, pc) cols := []*model.ColumnInfo{col, col} - pcs := util.ColumnsToProto(cols, false) + pcs := util.ColumnsToProto(cols, false, false) for _, v := range pcs { require.Equal(t, int32(10), v.GetFlag()) } - pcs = util.ColumnsToProto(cols, true) + pcs = util.ColumnsToProto(cols, true, false) for _, v := range pcs { require.Equal(t, int32(10), v.GetFlag()) } @@ -56,19 +56,19 @@ func TestColumnToProto(t *testing.T) { col1 := &model.ColumnInfo{ FieldType: *tp, } - pc = util.ColumnToProto(col1) + pc = util.ColumnToProto(col1, false) require.Equal(t, int32(8), pc.Collation) collate.SetNewCollationEnabledForTest(true) - pc = util.ColumnToProto(col) + pc = util.ColumnToProto(col, false) expect = &tipb.ColumnInfo{ColumnId: 0, Tp: 3, Collation: -83, ColumnLen: 11, Decimal: 0, Flag: 10, Elems: []string(nil), DefaultVal: []uint8(nil), PkHandle: false, XXX_unrecognized: []uint8(nil)} require.Equal(t, expect, pc) - pcs = util.ColumnsToProto(cols, true) + pcs = util.ColumnsToProto(cols, true, false) for _, v := range pcs { require.Equal(t, int32(-83), v.Collation) } - pc = util.ColumnToProto(col1) + pc = util.ColumnToProto(col1, false) require.Equal(t, int32(-8), pc.Collation) tp = types.NewFieldType(mysql.TypeEnum) @@ -77,6 +77,6 @@ func TestColumnToProto(t *testing.T) { col2 := &model.ColumnInfo{ FieldType: *tp, } - pc = util.ColumnToProto(col2) + pc = util.ColumnToProto(col2, false) require.Len(t, pc.Elems, 2) } diff --git a/table/tables/index.go b/table/tables/index.go index 446a2e7288595..607afb9640aad 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -103,10 +103,61 @@ func (c *index) GenIndexValue(sc *stmtctx.StatementContext, distinct bool, index return tablecodec.GenIndexValuePortal(sc, c.tblInfo, c.idxInfo, c.needRestoredData, distinct, false, indexedValues, h, c.phyTblID, restoredData) } +// getIndexedValue will produce the result like: +// 1. If not multi-valued index, return directly. +// 2. (i1, [m1,m2], i2, ...) ==> [(i1, m1, i2, ...), (i1, m2, i2, ...)] +// 3. (i1, null, i2, ...) ==> [(i1, null, i2, ...)] +// 4. (i1, [], i2, ...) ==> nothing. +func (c *index) getIndexedValue(indexedValues []types.Datum) [][]types.Datum { + if !c.idxInfo.MVIndex { + return [][]types.Datum{indexedValues} + } + + vals := make([][]types.Datum, 0, 16) + jsonIdx := 0 + jsonIsNull := false + existsVals := make(map[string]struct{}) + var buf []byte + for !jsonIsNull { + val := make([]types.Datum, 0, len(indexedValues)) + for i, v := range indexedValues { + if !c.tblInfo.Columns[c.idxInfo.Columns[i].Offset].FieldType.IsArray() { + val = append(val, v) + } else { + if v.IsNull() { + val = append(val, v) + jsonIsNull = true + continue + } + elemCount := v.GetMysqlJSON().GetElemCount() + for { + // JSON cannot be indexed, if the value is JSON type, it must be multi-valued index. + if jsonIdx >= elemCount { + goto out + } + binaryJSON := v.GetMysqlJSON().ArrayGetElem(jsonIdx) + jsonIdx++ + buf = buf[:0] + key := string(binaryJSON.HashValue(buf)) + if _, exists := existsVals[key]; exists { + continue + } + existsVals[key] = struct{}{} + val = append(val, types.NewDatum(binaryJSON.GetValue())) + break + } + } + } + vals = append(vals, val) + } +out: + return vals +} + // Create creates a new entry in the kvIndex data. // If the index is unique and there is an existing entry with the same key, // Create will return the existing entry's handle as the first return value, ErrKeyExists as the second return value. -func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...table.CreateIdxOptFunc) (kv.Handle, error) { +func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...table.CreateIdxOptFunc) (kv.Handle, error) { if c.Meta().Unique { txn.CacheTableInfo(c.phyTblID, c.tblInfo) } @@ -114,225 +165,241 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue for _, fn := range opts { fn(&opt) } + + indexedValues := c.getIndexedValue(indexedValue) + ctx := opt.Ctx + if ctx != nil { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("index.Create", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + } else { + ctx = context.TODO() + } vars := sctx.GetSessionVars() writeBufs := vars.GetWriteStmtBufs() skipCheck := vars.StmtCtx.BatchCheck - key, distinct, err := c.GenIndexKey(vars.StmtCtx, indexedValues, h, writeBufs.IndexKeyBuf) - if err != nil { - return nil, err - } - - var ( - tempKey []byte - keyVer byte - keyIsTempIdxKey bool - ) - if !opt.FromBackFill { - key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) - if keyVer == TempIndexKeyTypeBackfill || keyVer == TempIndexKeyTypeDelete { - key, tempKey = tempKey, nil - keyIsTempIdxKey = true + for _, value := range indexedValues { + key, distinct, err := c.GenIndexKey(vars.StmtCtx, value, h, writeBufs.IndexKeyBuf) + if err != nil { + return nil, err } - } - ctx := opt.Ctx - if opt.Untouched { - txn, err1 := sctx.Txn(true) - if err1 != nil { - return nil, err1 + var ( + tempKey []byte + keyVer byte + keyIsTempIdxKey bool + ) + if !opt.FromBackFill { + key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key) + if keyVer == TempIndexKeyTypeBackfill || keyVer == TempIndexKeyTypeDelete { + key, tempKey = tempKey, nil + keyIsTempIdxKey = true + } } - // If the index kv was untouched(unchanged), and the key/value already exists in mem-buffer, - // should not overwrite the key with un-commit flag. - // So if the key exists, just do nothing and return. - v, err := txn.GetMemBuffer().Get(ctx, key) - if err == nil { - if len(v) != 0 { - return nil, nil + + if opt.Untouched { + txn, err1 := sctx.Txn(true) + if err1 != nil { + return nil, err1 } - // The key is marked as deleted in the memory buffer, as the existence check is done lazily - // for optimistic transactions by default. The "untouched" key could still exist in the store, - // it's needed to commit this key to do the existence check so unset the untouched flag. - if !txn.IsPessimistic() { - keyFlags, err := txn.GetMemBuffer().GetFlags(key) - if err != nil { - return nil, err + // If the index kv was untouched(unchanged), and the key/value already exists in mem-buffer, + // should not overwrite the key with un-commit flag. + // So if the key exists, just do nothing and return. + v, err := txn.GetMemBuffer().Get(ctx, key) + if err == nil { + if len(v) != 0 { + continue } - if keyFlags.HasPresumeKeyNotExists() { - opt.Untouched = false + // The key is marked as deleted in the memory buffer, as the existence check is done lazily + // for optimistic transactions by default. The "untouched" key could still exist in the store, + // it's needed to commit this key to do the existence check so unset the untouched flag. + if !txn.IsPessimistic() { + keyFlags, err := txn.GetMemBuffer().GetFlags(key) + if err != nil { + return nil, err + } + if keyFlags.HasPresumeKeyNotExists() { + opt.Untouched = false + } } } } - } - - // save the key buffer to reuse. - writeBufs.IndexKeyBuf = key - c.initNeedRestoreData.Do(func() { - c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns) - }) - idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx, c.tblInfo, c.idxInfo, c.needRestoredData, distinct, opt.Untouched, indexedValues, h, c.phyTblID, handleRestoreData) - if err != nil { - return nil, err - } - - opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic - if !distinct || skipCheck || opt.Untouched { - if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage. - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - } - err = txn.GetMemBuffer().Set(key, idxVal) + // save the key buffer to reuse. + writeBufs.IndexKeyBuf = key + c.initNeedRestoreData.Do(func() { + c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns) + }) + idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx, c.tblInfo, c.idxInfo, c.needRestoredData, distinct, opt.Untouched, value, h, c.phyTblID, handleRestoreData) if err != nil { return nil, err } - if len(tempKey) > 0 { - if !opt.Untouched { // Untouched key-values never occur in the storage. + + opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic + + if !distinct || skipCheck || opt.Untouched { + if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage. idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) } - err = txn.GetMemBuffer().Set(tempKey, idxVal) + err = txn.GetMemBuffer().Set(key, idxVal) if err != nil { return nil, err } - } - if !opt.IgnoreAssertion && (!opt.Untouched) { - if sctx.GetSessionVars().LazyCheckKeyNotExists() && !txn.IsPessimistic() { - err = txn.SetAssertion(key, kv.SetAssertUnknown) - } else { - err = txn.SetAssertion(key, kv.SetAssertNotExist) + if len(tempKey) > 0 { + if !opt.Untouched { // Untouched key-values never occur in the storage. + idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + } + err = txn.GetMemBuffer().Set(tempKey, idxVal) + if err != nil { + return nil, err + } } + if !opt.IgnoreAssertion && (!opt.Untouched) { + if sctx.GetSessionVars().LazyCheckKeyNotExists() && !txn.IsPessimistic() { + err = txn.SetAssertion(key, kv.SetAssertUnknown) + } else { + err = txn.SetAssertion(key, kv.SetAssertNotExist) + } + } + if err != nil { + return nil, err + } + continue } - return nil, err - } - if ctx != nil { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("index.Create", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) + var value []byte + if c.tblInfo.TempTableType != model.TempTableNone { + // Always check key for temporary table because it does not write to TiKV + value, err = txn.Get(ctx, key) + } else if sctx.GetSessionVars().LazyCheckKeyNotExists() { + value, err = txn.GetMemBuffer().Get(ctx, key) + } else { + value, err = txn.Get(ctx, key) } - } else { - ctx = context.TODO() - } - - var value []byte - if c.tblInfo.TempTableType != model.TempTableNone { - // Always check key for temporary table because it does not write to TiKV - value, err = txn.Get(ctx, key) - } else if sctx.GetSessionVars().LazyCheckKeyNotExists() { - value, err = txn.GetMemBuffer().Get(ctx, key) - } else { - value, err = txn.Get(ctx, key) - } - if err != nil && !kv.IsErrNotFound(err) { - return nil, err - } - if err != nil || len(value) == 0 || (keyIsTempIdxKey && tablecodec.CheckTempIndexValueIsDelete(value)) { - lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil - var needPresumeKey TempIndexKeyState - if keyIsTempIdxKey { - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) + if err != nil && !kv.IsErrNotFound(err) { + return nil, err + } + if err != nil || len(value) == 0 || (keyIsTempIdxKey && tablecodec.CheckTempIndexValueIsDelete(value)) { + lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil + var needPresumeKey TempIndexKeyState + if keyIsTempIdxKey { + idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, key, distinct, h, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err + } + } else { + if len(tempKey) > 0 { + needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err + } + } + } + if lazyCheck { + var flags []kv.FlagsOp + if needPresumeKey != KeyInTempIndexIsDeleted { + flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} + } + if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && + !vars.InRestrictedSQL && vars.ConnectionID > 0 { + flags = append(flags, kv.SetNeedConstraintCheckInPrewrite) + } + err = txn.GetMemBuffer().SetWithFlags(key, idxVal, flags...) + } else { + err = txn.GetMemBuffer().Set(key, idxVal) + } if err != nil { return nil, err } - } else { if len(tempKey) > 0 { - needPresumeKey, _, err = KeyExistInTempIndex(ctx, txn, tempKey, distinct, h, c.tblInfo.IsCommonHandle) + idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) + if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted { + err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists) + } else { + err = txn.GetMemBuffer().Set(tempKey, idxVal) + } if err != nil { return nil, err } } - } - if lazyCheck { - var flags []kv.FlagsOp - if needPresumeKey != KeyInTempIndexIsDeleted { - flags = []kv.FlagsOp{kv.SetPresumeKeyNotExists} + if opt.IgnoreAssertion { + continue } - if !vars.ConstraintCheckInPlacePessimistic && vars.TxnCtx.IsPessimistic && vars.InTxn() && - !vars.InRestrictedSQL && vars.ConnectionID > 0 { - flags = append(flags, kv.SetNeedConstraintCheckInPrewrite) - } - err = txn.GetMemBuffer().SetWithFlags(key, idxVal, flags...) - } else { - err = txn.GetMemBuffer().Set(key, idxVal) - } - if err != nil { - return nil, err - } - if len(tempKey) > 0 { - idxVal = tablecodec.EncodeTempIndexValue(idxVal, keyVer) - if lazyCheck && needPresumeKey != KeyInTempIndexIsDeleted { - err = txn.GetMemBuffer().SetWithFlags(tempKey, idxVal, kv.SetPresumeKeyNotExists) + if lazyCheck && !txn.IsPessimistic() { + err = txn.SetAssertion(key, kv.SetAssertUnknown) } else { - err = txn.GetMemBuffer().Set(tempKey, idxVal) + err = txn.SetAssertion(key, kv.SetAssertNotExist) } if err != nil { return nil, err } + continue } - if opt.IgnoreAssertion { - return nil, nil + + if keyIsTempIdxKey { + value = tablecodec.DecodeTempIndexOriginValue(value) } - if lazyCheck && !txn.IsPessimistic() { - err = txn.SetAssertion(key, kv.SetAssertUnknown) - } else { - err = txn.SetAssertion(key, kv.SetAssertNotExist) + handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) + if err != nil { + return nil, err } - return nil, err - } - - if keyIsTempIdxKey { - value = tablecodec.DecodeTempIndexOriginValue(value) - } - handle, err := tablecodec.DecodeHandleInUniqueIndexValue(value, c.tblInfo.IsCommonHandle) - if err != nil { - return nil, err + return handle, kv.ErrKeyExists } - return handle, kv.ErrKeyExists + return nil, nil } // Delete removes the entry for handle h and indexedValues from KV index. -func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error { - key, distinct, err := c.GenIndexKey(sc, indexedValues, h, nil) - if err != nil { - return err - } +func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) error { + indexedValues := c.getIndexedValue(indexedValue) + for _, value := range indexedValues { + key, distinct, err := c.GenIndexKey(sc, value, h, nil) + if err != nil { + return err + } - key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key) + key, tempKey, tempKeyVer := GenTempIdxKeyByState(c.idxInfo, key) - if distinct { - if len(key) > 0 { - err = txn.GetMemBuffer().DeleteWithFlags(key, kv.SetNeedLocked) - if err != nil { - return err + if distinct { + if len(key) > 0 { + err = txn.GetMemBuffer().DeleteWithFlags(key, kv.SetNeedLocked) + if err != nil { + return err + } } - } - if len(tempKey) > 0 { - tempVal := tablecodec.EncodeTempIndexValueDeletedUnique(h, tempKeyVer) - err = txn.GetMemBuffer().Set(tempKey, tempVal) - if err != nil { - return err + if len(tempKey) > 0 { + tempVal := tablecodec.EncodeTempIndexValueDeletedUnique(h, tempKeyVer) + err = txn.GetMemBuffer().Set(tempKey, tempVal) + if err != nil { + return err + } } - } - } else { - if len(key) > 0 { - err = txn.GetMemBuffer().Delete(key) - if err != nil { - return err + } else { + if len(key) > 0 { + err = txn.GetMemBuffer().Delete(key) + if err != nil { + return err + } } - } - if len(tempKey) > 0 { - tempVal := tablecodec.EncodeTempIndexValueDeleted(tempKeyVer) - err = txn.GetMemBuffer().Set(tempKey, tempVal) - if err != nil { - return err + if len(tempKey) > 0 { + tempVal := tablecodec.EncodeTempIndexValueDeleted(tempKeyVer) + err = txn.GetMemBuffer().Set(tempKey, tempVal) + if err != nil { + return err + } } } + if c.idxInfo.State == model.StatePublic { + // If the index is in public state, delete this index means it must exists. + err = txn.SetAssertion(key, kv.SetAssertExist) + } + if err != nil { + return err + } } - if c.idxInfo.State == model.StatePublic { - // If the index is in public state, delete this index means it must exists. - err = txn.SetAssertion(key, kv.SetAssertExist) - } - return err + return nil } const ( diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index e4513b8cae409..8445a266e3d2d 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -236,7 +236,7 @@ func checkIndexKeys( } for i, v := range decodedIndexValues { - fieldType := &t.Columns[indexInfo.Columns[i].Offset].FieldType + fieldType := t.Columns[indexInfo.Columns[i].Offset].FieldType.ArrayType() datum, err := tablecodec.DecodeColumnValue(v, fieldType, sessVars.Location()) if err != nil { return errors.Trace(err) @@ -347,9 +347,27 @@ func compareIndexData( cols[indexInfo.Columns[i].Offset].ColumnInfo, ) - comparison, err := decodedMutationDatum.Compare(sc, &expectedDatum, collate.GetCollator(decodedMutationDatum.Collation())) - if err != nil { - return errors.Trace(err) + var comparison int + var err error + // If it is multi-valued index, we should check the JSON contains the indexed value. + if cols[indexInfo.Columns[i].Offset].ColumnInfo.FieldType.IsArray() && expectedDatum.Kind() == types.KindMysqlJSON { + bj := expectedDatum.GetMysqlJSON() + count := bj.GetElemCount() + for elemIdx := 0; elemIdx < count; elemIdx++ { + jsonDatum := types.NewJSONDatum(bj.ArrayGetElem(elemIdx)) + comparison, err = jsonDatum.Compare(sc, &decodedMutationDatum, collate.GetBinaryCollator()) + if err != nil { + return errors.Trace(err) + } + if comparison == 0 { + break + } + } + } else { + comparison, err = decodedMutationDatum.Compare(sc, &expectedDatum, collate.GetCollator(decodedMutationDatum.Collation())) + if err != nil { + return errors.Trace(err) + } } if comparison != 0 { diff --git a/table/tables/tables.go b/table/tables/tables.go index 46d037c3197ac..709318a42332c 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -1948,7 +1948,7 @@ func BuildTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []*model.Co pkColIds := TryGetCommonPkColumnIds(tableInfo) tsExec := &tipb.TableScan{ TableId: tableInfo.ID, - Columns: util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle), + Columns: util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle, false), PrimaryColumnIds: pkColIds, } if tableInfo.IsCommonHandle { @@ -1962,7 +1962,7 @@ func BuildPartitionTableScanFromInfos(tableInfo *model.TableInfo, columnInfos [] pkColIds := TryGetCommonPkColumnIds(tableInfo) tsExec := &tipb.PartitionTableScan{ TableId: tableInfo.ID, - Columns: util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle), + Columns: util.ColumnsToProto(columnInfos, tableInfo.PKIsHandle, false), PrimaryColumnIds: pkColIds, IsFastScan: &fastScan, } diff --git a/types/convert.go b/types/convert.go index 96a12f64ca641..1b97b7336ef66 100644 --- a/types/convert.go +++ b/types/convert.go @@ -758,6 +758,8 @@ func ToString(value interface{}) (string, error) { return v.String(), nil case Set: return v.String(), nil + case BinaryJSON: + return v.String(), nil default: return "", errors.Errorf("cannot convert %v(type %T) to string", value, value) } diff --git a/types/datum.go b/types/datum.go index 34639cdf10bdb..231de4dd9a069 100644 --- a/types/datum.go +++ b/types/datum.go @@ -2016,6 +2016,10 @@ func (d *Datum) ToMysqlJSON() (j BinaryJSON, err error) { in = d.GetBinaryLiteral().ToString() case KindNull: in = nil + case KindMysqlTime: + in = d.GetMysqlTime() + case KindMysqlDuration: + in = d.GetMysqlDuration() default: in, err = d.ToString() } diff --git a/types/json_binary.go b/types/json_binary.go index 6fe01d2b4f28e..cacf5b69b025a 100644 --- a/types/json_binary.go +++ b/types/json_binary.go @@ -31,6 +31,8 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/util/hack" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" "golang.org/x/exp/slices" ) @@ -578,6 +580,26 @@ func (bj BinaryJSON) HashValue(buf []byte) []byte { return buf } +// GetValue return the primitive value of the JSON. +func (bj BinaryJSON) GetValue() any { + switch bj.TypeCode { + case JSONTypeCodeInt64: + return bj.GetInt64() + case JSONTypeCodeUint64: + return bj.GetUint64() + case JSONTypeCodeDuration: + return bj.GetDuration() + case JSONTypeCodeFloat64: + return bj.GetFloat64() + case JSONTypeCodeString: + return bj.GetString() + case JSONTypeCodeDate, JSONTypeCodeDatetime: + return bj.GetTime() + } + logutil.BgLogger().Error("unreachable JSON type", zap.Any("type", bj.TypeCode)) + return nil +} + // CreateBinaryJSON creates a BinaryJSON from interface. func CreateBinaryJSON(in interface{}) BinaryJSON { bj, err := CreateBinaryJSONWithCheck(in) diff --git a/util/misc.go b/util/misc.go index 0e28baa5f62fc..d336aa5765451 100644 --- a/util/misc.go +++ b/util/misc.go @@ -394,10 +394,10 @@ func TLSCipher2String(n uint16) string { } // ColumnsToProto converts a slice of model.ColumnInfo to a slice of tipb.ColumnInfo. -func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool) []*tipb.ColumnInfo { +func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool, forIndex bool) []*tipb.ColumnInfo { cols := make([]*tipb.ColumnInfo, 0, len(columns)) for _, c := range columns { - col := ColumnToProto(c) + col := ColumnToProto(c, forIndex) // TODO: Here `PkHandle`'s meaning is changed, we will change it to `IsHandle` when tikv's old select logic // is abandoned. if (pkIsHandle && mysql.HasPriKeyFlag(c.GetFlag())) || c.ID == model.ExtraHandleID { @@ -411,7 +411,7 @@ func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool) []*tipb.Column } // ColumnToProto converts model.ColumnInfo to tipb.ColumnInfo. -func ColumnToProto(c *model.ColumnInfo) *tipb.ColumnInfo { +func ColumnToProto(c *model.ColumnInfo, forIndex bool) *tipb.ColumnInfo { pc := &tipb.ColumnInfo{ ColumnId: c.ID, Collation: collate.RewriteNewCollationIDIfNeeded(int32(mysql.CollationNames[c.GetCollate()])), @@ -420,7 +420,12 @@ func ColumnToProto(c *model.ColumnInfo) *tipb.ColumnInfo { Flag: int32(c.GetFlag()), Elems: c.GetElems(), } - pc.Tp = int32(c.GetType()) + if forIndex { + // Use array type for read the multi-valued index. + pc.Tp = int32(c.FieldType.ArrayType().GetType()) + } else { + pc.Tp = int32(c.GetType()) + } return pc } diff --git a/util/misc_test.go b/util/misc_test.go index c1510625593f0..7a5baeb197d28 100644 --- a/util/misc_test.go +++ b/util/misc_test.go @@ -174,8 +174,8 @@ func TestToPB(t *testing.T) { } column2.SetCollate("utf8mb4_bin") - assert.Equal(t, "column_id:1 collation:-45 columnLen:-1 decimal:-1 ", ColumnToProto(column).String()) - assert.Equal(t, "column_id:1 collation:-45 columnLen:-1 decimal:-1 ", ColumnsToProto([]*model.ColumnInfo{column, column2}, false)[0].String()) + assert.Equal(t, "column_id:1 collation:-45 columnLen:-1 decimal:-1 ", ColumnToProto(column, false).String()) + assert.Equal(t, "column_id:1 collation:-45 columnLen:-1 decimal:-1 ", ColumnsToProto([]*model.ColumnInfo{column, column2}, false, false)[0].String()) } func TestComposeURL(t *testing.T) {