Skip to content

Commit

Permalink
*: fix panic when add index on prefixed pk tables (#39740) (#39744)
Browse files Browse the repository at this point in the history
close #39723
  • Loading branch information
ti-chi-bot authored Dec 8, 2022
1 parent 2bebfec commit 2ab3b2b
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 19 deletions.
43 changes: 42 additions & 1 deletion ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
Expand All @@ -36,6 +38,8 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/generic"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/timeutil"
Expand Down Expand Up @@ -113,6 +117,10 @@ type copReqSender struct {
func (c *copReqSender) run() {
p := c.senderPool
defer p.wg.Done()
var curTaskID int
defer util.Recover(metrics.LabelDDL, "copReqSender.run", func() {
p.resultsCh <- idxRecResult{id: curTaskID, err: dbterror.ErrReorgPanic}
}, false)
for {
if util.HasCancelled(c.ctx) {
return
Expand All @@ -121,13 +129,19 @@ func (c *copReqSender) run() {
if !ok {
return
}
curTaskID = task.id
logutil.BgLogger().Info("[ddl-ingest] start a cop-request task",
zap.Int("id", task.id), zap.String("task", task.String()))
rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey())
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
}
failpoint.Inject("MockCopSenderPanic", func(val failpoint.Value) {
if val.(bool) {
panic("mock panic")
}
})
var done bool
var total int
for !done {
Expand Down Expand Up @@ -431,12 +445,39 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se
if err != nil {
return nil, false, errors.Trace(err)
}
rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo)
rsData := getRestoreData(c.tblInfo, c.idxInfo, c.pkInfo, hdDt)
buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false})
}
return buf, false, nil
}

func getRestoreData(tblInfo *model.TableInfo, targetIdx, pkIdx *model.IndexInfo, handleDts []types.Datum) []types.Datum {
if !collate.NewCollationEnabled() || !tblInfo.IsCommonHandle || tblInfo.CommonHandleVersion == 0 {
return nil
}
if pkIdx == nil {
return nil
}
for i, pkIdxCol := range pkIdx.Columns {
pkCol := tblInfo.Columns[pkIdxCol.Offset]
if !types.NeedRestoredData(&pkCol.FieldType) {
// Since the handle data cannot be null, we can use SetNull to
// indicate that this column does not need to be restored.
handleDts[i].SetNull()
continue
}
tables.TryTruncateRestoredData(&handleDts[i], pkCol, pkIdxCol, targetIdx)
tables.ConvertDatumToTailSpaceCount(&handleDts[i], pkCol)
}
dtToRestored := handleDts[:0]
for _, handleDt := range handleDts {
if !handleDt.IsNull() {
dtToRestored = append(dtToRestored, handleDt)
}
}
return dtToRestored
}

func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(sCtx.GetSessionVars().Location())
Expand Down
45 changes: 27 additions & 18 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1890,28 +1890,37 @@ func TryGetHandleRestoredDataWrapper(tblInfo *model.TableInfo, row []types.Datum
} else {
datum = row[pkCol.Offset]
}
// Try to truncate index values.
// Says that primary key(a (8)),
// For index t(a), don't truncate the value.
// For index t(a(9)), truncate to a(9).
// For index t(a(7)), truncate to a(8).
truncateTargetCol := pkIdxCol
for _, idxCol := range idx.Columns {
if idxCol.Offset == pkCol.Offset {
truncateTargetCol = maxIndexLen(pkIdxCol, idxCol)
break
}
}
tablecodec.TruncateIndexValue(&datum, truncateTargetCol, pkCol)
if collate.IsBinCollation(pkCol.GetCollate()) {
rsData = append(rsData, types.NewIntDatum(stringutil.GetTailSpaceCount(datum.GetString())))
} else {
rsData = append(rsData, datum)
}
TryTruncateRestoredData(&datum, pkCol, pkIdxCol, idx)
ConvertDatumToTailSpaceCount(&datum, pkCol)
rsData = append(rsData, datum)
}
return rsData
}

// TryTruncateRestoredData tries to truncate index values.
// Says that primary key(a (8)),
// For index t(a), don't truncate the value.
// For index t(a(9)), truncate to a(9).
// For index t(a(7)), truncate to a(8).
func TryTruncateRestoredData(datum *types.Datum, pkCol *model.ColumnInfo,
pkIdxCol *model.IndexColumn, idx *model.IndexInfo) {
truncateTargetCol := pkIdxCol
for _, idxCol := range idx.Columns {
if idxCol.Offset == pkIdxCol.Offset {
truncateTargetCol = maxIndexLen(pkIdxCol, idxCol)
break
}
}
tablecodec.TruncateIndexValue(datum, truncateTargetCol, pkCol)
}

// ConvertDatumToTailSpaceCount converts a string datum to an int datum that represents the tail space count.
func ConvertDatumToTailSpaceCount(datum *types.Datum, col *model.ColumnInfo) {
if collate.IsBinCollation(col.GetCollate()) {
*datum = types.NewIntDatum(stringutil.GetTailSpaceCount(datum.GetString()))
}
}

func maxIndexLen(idxA, idxB *model.IndexColumn) *model.IndexColumn {
if idxA.Length == types.UnspecifiedLength {
return idxA
Expand Down
48 changes: 48 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,51 @@ func TestAddIndexIngestGeneratedColumns(t *testing.T) {
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1 2 1", "2 2 2 4 2", "3 3 3 6 3"))
assertLastNDDLUseIngest(4)
}

func TestAddIndexIngestRestoredData(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

tk.MustExec(`
CREATE TABLE tbl_5 (
col_21 time DEFAULT '04:48:17',
col_22 varchar(403) COLLATE utf8_unicode_ci DEFAULT NULL,
col_23 year(4) NOT NULL,
col_24 char(182) CHARACTER SET gbk COLLATE gbk_chinese_ci NOT NULL,
col_25 set('Alice','Bob','Charlie','David') COLLATE utf8_unicode_ci DEFAULT NULL,
PRIMARY KEY (col_24(3)) /*T![clustered_index] CLUSTERED */,
KEY idx_10 (col_22)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
`)
tk.MustExec("INSERT INTO tbl_5 VALUES ('15:33:15','&U+x1',2007,'','Bob');")
tk.MustExec("alter table tbl_5 add unique key idx_13 ( col_23 );")
tk.MustExec("admin check table tbl_5;")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

func TestAddIndexIngestPanicOnCopRead(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockCopSenderPanic", "return(true)"))
tk.MustExec("create table t (a int, b int, c int, d int, primary key (a) clustered);")
tk.MustExec("insert into t (a, b, c, d) values (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3);")
tk.MustExec("alter table t add index idx(b);")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/MockCopSenderPanic"))
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
jobTp := rows[0][3].(string)
// Fallback to txn-merge process.
require.True(t, strings.Contains(jobTp, "txn-merge"), jobTp)
}

0 comments on commit 2ab3b2b

Please sign in to comment.