Skip to content

Commit

Permalink
ddl: unregister add index job after DDL done (#39769)
Browse files Browse the repository at this point in the history
close #39768
  • Loading branch information
tangenta authored Dec 8, 2022
1 parent 2d2f4f5 commit 2a928d8
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
6 changes: 6 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,9 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
job.Args = []interface{}{indexInfo.ID, false /*if exists*/, getPartitionIDs(tbl.Meta())}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", tblInfo.State)
}
Expand Down Expand Up @@ -963,6 +966,9 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Finish this job.
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
job.Args[0] = indexInfo.ID
} else {
// the partition ids were append by convertAddIdxJob2RollbackJob, it is weird, but for the compatibility,
Expand Down
6 changes: 6 additions & 0 deletions tests/realtikvtest/addindextest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ go_test(
"//ddl",
"//ddl/ingest",
"//ddl/testutil",
"//domain",
"//errno",
"//parser/model",
"//testkit",
"//tests/realtikvtest",
"//util/logutil",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_uber_go_zap//:zap",
],
)
71 changes: 71 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/ddl/testutil"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestAddIndexIngestMemoryUsage(t *testing.T) {
Expand Down Expand Up @@ -320,3 +326,68 @@ func TestAddIndexIngestPanicOnCopRead(t *testing.T) {
// Fallback to txn-merge process.
require.True(t, strings.Contains(jobTp, "txn-merge"), jobTp)
}

func TestAddIndexIngestCancel(t *testing.T) {
store, dom := realtikvtest.CreateMockStoreAndDomainAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("create table t (a int, b int);")
tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3);")
defHook := dom.DDL().GetHook()
customHook := newTestCallBack(t, dom)
cancelled := false
customHook.OnJobRunBeforeExported = func(job *model.Job) {
if cancelled {
return
}
if job.Type == model.ActionAddIndex && job.SchemaState == model.StateWriteReorganization {
idx := findIdxInfo(dom, "addindexlit", "t", "idx")
if idx == nil {
return
}
if idx.BackfillState == model.BackfillStateRunning {
tk2 := testkit.NewTestKit(t, store)
rs, err := tk2.Exec(fmt.Sprintf("admin cancel ddl jobs %d", job.ID))
assert.NoError(t, err)
assert.NoError(t, rs.Close())
cancelled = true
}
}
}
dom.DDL().SetHook(customHook)
tk.MustGetErrCode("alter table t add index idx(b);", errno.ErrCancelledDDLJob)
require.True(t, cancelled)
dom.DDL().SetHook(defHook)
require.Empty(t, ingest.LitBackCtxMgr.Keys())
}

type testCallback struct {
ddl.Callback
OnJobRunBeforeExported func(job *model.Job)
}

func newTestCallBack(t *testing.T, dom *domain.Domain) *testCallback {
defHookFactory, err := ddl.GetCustomizedHook("default_hook")
require.NoError(t, err)
return &testCallback{
Callback: defHookFactory(dom),
}
}

func (c *testCallback) OnJobRunBefore(job *model.Job) {
if c.OnJobRunBeforeExported != nil {
c.OnJobRunBeforeExported(job)
}
}

func findIdxInfo(dom *domain.Domain, dbName, tbName, idxName string) *model.IndexInfo {
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(dbName), model.NewCIStr(tbName))
if err != nil {
logutil.BgLogger().Warn("cannot find table", zap.String("dbName", dbName), zap.String("tbName", tbName))
return nil
}
return tbl.Meta().FindIndexByName(idxName)
}

0 comments on commit 2a928d8

Please sign in to comment.