Skip to content


ddl: support online create multi-valued index (#40304)
Browse files Browse the repository at this point in the history
close #40337
  • Loading branch information
xiongjiwei authored Jan 6, 2023
1 parent af968f2 commit 43ebc64
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 27 deletions.
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ go_test(
Expand Down
60 changes: 34 additions & 26 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,7 @@ type addIndexWorker struct {
// The following attributes are used to reduce memory allocation.
idxKeyBufs [][]byte
batchCheckKeys []kv.Key
batchCheckValues [][]byte
distinctCheckFlags []bool

Expand Down Expand Up @@ -1468,6 +1469,7 @@ func (w *addIndexWorker) initBatchCheckBufs(batchCount int) {

w.batchCheckKeys = w.batchCheckKeys[:0]
w.batchCheckValues = w.batchCheckValues[:0]
w.distinctCheckFlags = w.distinctCheckFlags[:0]

Expand Down Expand Up @@ -1517,16 +1519,28 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i

stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
cnt := 0
for i, record := range idxRecords {
idxKey, distinct, err := w.index.GenIndexKey(stmtCtx, record.vals, record.handle, w.idxKeyBufs[i])
if err != nil {
return errors.Trace(err)
iter := w.index.GenIndexKVIter(stmtCtx, record.vals, record.handle, idxRecords[i].rsData)
for iter.Valid() {
var buf []byte
if cnt < len(w.idxKeyBufs) {
buf = w.idxKeyBufs[cnt]
key, val, distinct, err := iter.Next(buf)
if err != nil {
return errors.Trace(err)
if cnt < len(w.idxKeyBufs) {
w.idxKeyBufs[cnt] = key
} else {
w.idxKeyBufs = append(w.idxKeyBufs, key)
w.batchCheckKeys = append(w.batchCheckKeys, key)
w.batchCheckValues = append(w.batchCheckValues, val)
w.distinctCheckFlags = append(w.distinctCheckFlags, distinct)
// save the buffer to reduce memory allocations.
w.idxKeyBufs[i] = idxKey

w.batchCheckKeys = append(w.batchCheckKeys, idxKey)
w.distinctCheckFlags = append(w.distinctCheckFlags, distinct)

batchVals, err := txn.BatchGet(context.Background(), w.batchCheckKeys)
Expand All @@ -1548,12 +1562,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i
} else if w.distinctCheckFlags[i] {
// The keys in w.batchCheckKeys also maybe duplicate,
// so we need to backfill the not found key into `batchVals` map.
needRsData := tables.NeedRestoredData(w.index.Meta().Columns, w.table.Meta().Columns)
val, err := tablecodec.GenIndexValuePortal(stmtCtx, w.table.Meta(), w.index.Meta(), needRsData, w.distinctCheckFlags[i], false, idxRecords[i].vals, idxRecords[i].handle, 0, idxRecords[i].rsData)
if err != nil {
return errors.Trace(err)
batchVals[string(key)] = val
batchVals[string(key)] = w.batchCheckValues[i]
// Constrains is already checked.
Expand Down Expand Up @@ -1641,19 +1650,18 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
} else { // The lightning environment is ready.
vars := w.sessCtx.GetSessionVars()
sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs()
key, distinct, err := w.index.GenIndexKey(sCtx, idxRecord.vals, idxRecord.handle, writeBufs.IndexKeyBuf)
if err != nil {
return errors.Trace(err)
idxVal, err := w.index.GenIndexValue(sCtx, distinct, idxRecord.vals, idxRecord.handle, idxRecord.rsData)
if err != nil {
return errors.Trace(err)
err = w.writerCtx.WriteRow(key, idxVal)
if err != nil {
return errors.Trace(err)
iter := w.index.GenIndexKVIter(sCtx, idxRecord.vals, idxRecord.handle, idxRecord.rsData)
for iter.Valid() {
key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf)
if err != nil {
return errors.Trace(err)
err = w.writerCtx.WriteRow(key, idxVal)
if err != nil {
return errors.Trace(err)
writeBufs.IndexKeyBuf = key
writeBufs.IndexKeyBuf = key
Expand Down
71 changes: 71 additions & 0 deletions ddl/mv_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2023 PingCAP, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl_test

import (


func TestMultiValuedIndexOnlineDDL(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;")
var sb strings.Builder
sb.WriteString("insert into t values ")
for i := 0; i < 100; i++ {
sb.WriteString(fmt.Sprintf("(%d, '[%d, %d, %d]')", i, i+1, i+2, i+3))
if i != 99 {

internalTK := testkit.NewTestKit(t, store)
internalTK.MustExec("use test")

hook := &ddl.TestDDLCallback{Do: dom}
n := 100
hook.OnJobRunBeforeExported = func(job *model.Job) {
internalTK.MustExec(fmt.Sprintf("insert into t values (%d, '[%d, %d, %d]')", n, n, n+1, n+2))
internalTK.MustExec(fmt.Sprintf("delete from t where pk = %d", n-4))
internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+1000, n-3))
o := dom.DDL().GetHook()

tk.MustExec("alter table t add index idx((cast(a as signed array)))")
tk.MustExec("admin check table t")

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (pk int primary key, a json);")
tk.MustExec("insert into t values (1, '[1,2,3]');")
tk.MustExec("insert into t values (2, '[2,3,4]');")
tk.MustExec("insert into t values (3, '[3,4,5]');")
tk.MustExec("insert into t values (4, '[-4,5,6]');")
tk.MustGetErrCode("alter table t add unique index idx((cast(a as signed array)));", errno.ErrDupEntry)
tk.MustGetErrMsg("alter table t add index idx((cast(a as unsigned array)));", "[ddl:8202]Cannot decode index value, because [types:1690]constant -4 overflows bigint")
10 changes: 9 additions & 1 deletion table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func WithCtx(ctx context.Context) CreateIdxOptFunc {

// IndexIter is index kvs iter.
type IndexIter interface {
Next(kb []byte) ([]byte, []byte, bool, error)
Valid() bool

// Index is the interface for index data on KV store.
type Index interface {
// Meta returns IndexInfo.
Expand All @@ -79,9 +85,11 @@ type Index interface {
Create(ctx sessionctx.Context, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOptFunc) (kv.Handle, error)
// Delete supports delete from statement.
Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error
// GenIndexKVIter generate index key and value for multi-valued index, use iterator to reduce the memory allocation.
GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) IndexIter
// Exist supports check index exists or not.
Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error)
// GenIndexKey generates an index key.
// GenIndexKey generates an index key. If the index is a multi-valued index, use GenIndexKVIter instead.
GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.Datum, h kv.Handle, buf []byte) (key []byte, distinct bool, err error)
// GenIndexValue generates an index value.
GenIndexValue(sc *stmtctx.StatementContext, distinct bool, indexedValues []types.Datum, h kv.Handle, restoredData []types.Datum) ([]byte, error)
Expand Down
40 changes: 40 additions & 0 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,46 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
return nil

func (c *index) GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) table.IndexIter {
indexedValues := c.getIndexedValue(indexedValue)
return &indexGenerator{
c: c,
sctx: sc,
indexedVals: indexedValues,
h: h,
handleRestoreData: handleRestoreData,
i: 0,

type indexGenerator struct {
c *index
sctx *stmtctx.StatementContext
indexedVals [][]types.Datum
h kv.Handle
handleRestoreData []types.Datum

i int

func (s *indexGenerator) Next(kb []byte) ([]byte, []byte, bool, error) {
val := s.indexedVals[s.i]
key, distinct, err := s.c.GenIndexKey(s.sctx, val, s.h, kb)
if err != nil {
return nil, nil, false, err
idxVal, err := s.c.GenIndexValue(s.sctx, distinct, val, s.h, s.handleRestoreData)
if err != nil {
return nil, nil, false, err
return key, idxVal, distinct, err

func (s *indexGenerator) Valid() bool {
return s.i < len(s.indexedVals)

const (
// TempIndexKeyTypeNone means the key is not a temporary index key.
TempIndexKeyTypeNone byte = 0
Expand Down
48 changes: 48 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,54 @@ func TestAddIndexIngestWriterCountOnPartitionTable(t *testing.T) {
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)

func TestIngestMVIndexOnPartitionTable(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;")
var sb strings.Builder
sb.WriteString("insert into t values ")
for i := 0; i < 10240; i++ {
sb.WriteString(fmt.Sprintf("(%d, '[%d, %d, %d]')", i, i+1, i+2, i+3))
if i != 10240-1 {
tk.MustExec("alter table t add index idx((cast(a as signed array)));")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
tk.MustExec("admin check table t")

tk.MustExec("drop table t")
tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;")
var wg sync.WaitGroup
go func() {
n := 10240
internalTK := testkit.NewTestKit(t, store)
internalTK.MustExec("use addindexlit;")

for i := 0; i < 1024; i++ {
internalTK.MustExec(fmt.Sprintf("insert into t values (%d, '[%d, %d, %d]')", n, n, n+1, n+2))
internalTK.MustExec(fmt.Sprintf("delete from t where pk = %d", n-10))
internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+1000, n-5))
tk.MustExec("alter table t add index idx((cast(a as signed array)));")
tk.MustExec("admin check table t")

func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
Expand Down

0 comments on commit 43ebc64

Please sign in to comment.