diff --git a/ddl/index.go b/ddl/index.go index ace2522b89732..931580e586af0 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -709,6 +709,10 @@ func pickBackfillType(w *worker, job *model.Job) model.ReorgType { // canUseIngest indicates whether it can use ingest way to backfill index. func canUseIngest(w *worker) bool { + // We only allow one task to use ingest at the same time, in order to limit the CPU usage. + if len(ingest.LitBackCtxMgr.Keys()) > 0 { + return false + } ctx, err := w.sessPool.get() if err != nil { return false diff --git a/executor/executor.go b/executor/executor.go index 6f2126c63f4a9..80d553c1d8bde 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -560,7 +560,7 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che req.AppendInt64(0, job.ID) req.AppendString(1, schemaName) req.AppendString(2, tableName) - req.AppendString(3, job.Type.String()) + req.AppendString(3, job.Type.String()+showAddIdxReorgTp(job)) req.AppendString(4, job.SchemaState.String()) req.AppendInt64(5, job.SchemaID) req.AppendInt64(6, job.TableID) @@ -595,6 +595,16 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che } } +func showAddIdxReorgTp(job *model.Job) string { + if job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey { + tp := job.ReorgMeta.ReorgTp.String() + if len(tp) > 0 { + return " /* " + tp + " */" + } + } + return "" +} + func ts2Time(timestamp uint64, loc *time.Location) types.Time { duration := time.Duration(math.Pow10(9-types.DefaultFsp)) * time.Nanosecond t := model.TSConvert2Time(timestamp) diff --git a/parser/model/ddl.go b/parser/model/ddl.go index aef0c7a698c87..4f3cb96bb62f0 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -254,6 +254,19 @@ func (tp ReorgType) NeedMergeProcess() bool { return tp == ReorgTypeLitMerge || tp == ReorgTypeTxnMerge } +// String implements fmt.Stringer interface. +func (tp ReorgType) String() string { + switch tp { + case ReorgTypeTxn: + return "txn" + case ReorgTypeLitMerge: + return "ingest" + case ReorgTypeTxnMerge: + return "txn-merge" + } + return "" +} + // TimeZoneLocation represents a single time zone. type TimeZoneLocation struct { Name string `json:"name"` diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go new file mode 100644 index 0000000000000..19c8977242b3b --- /dev/null +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -0,0 +1,99 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package addindextest_test + +import ( + "fmt" + "strings" + "sync" + "testing" + + "github.com/pingcap/tidb/ddl/ingest" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/tests/realtikvtest" + "github.com/stretchr/testify/require" +) + +func TestAddIndexIngestMemoryUsage(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + + tk.MustExec("create table t (a int, b int, c int);") + var sb strings.Builder + sb.WriteString("insert into t values ") + size := 100 + for i := 0; i < size; i++ { + sb.WriteString(fmt.Sprintf("(%d, %d, %d)", i, i, i)) + if i != size-1 { + sb.WriteString(",") + } + } + sb.WriteString(";") + tk.MustExec(sb.String()) + require.Equal(t, int64(0), ingest.LitMemRoot.CurrentUsage()) + tk.MustExec("alter table t add index idx(a);") + tk.MustExec("alter table t add unique index idx1(b);") + tk.MustExec("admin check table t;") + require.Equal(t, int64(0), ingest.LitMemRoot.CurrentUsage()) +} + +func TestAddIndexIngestLimitOneBackend(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk.MustExec("create table t (a int, b int);") + tk.MustExec("insert into t values (1, 1), (2, 2), (3, 3);") + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use addindexlit;") + tk2.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + tk2.MustExec("create table t2 (a int, b int);") + tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);") + + // Mock there is a running ingest job. + ingest.LitBackCtxMgr.Store(65535, &ingest.BackendContext{}) + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + tk.MustExec("alter table t add index idx(a);") + wg.Done() + }() + go func() { + tk2.MustExec("alter table t2 add index idx_b(b);") + wg.Done() + }() + wg.Wait() + rows := tk.MustQuery("admin show ddl jobs 2;").Rows() + require.Len(t, rows, 2) + require.False(t, strings.Contains(rows[0][3].(string) /* job_type */, "ingest")) + require.False(t, strings.Contains(rows[1][3].(string) /* job_type */, "ingest")) + require.Equal(t, rows[0][7].(string) /* row_count */, "3") + require.Equal(t, rows[1][7].(string) /* row_count */, "3") + + // Remove the running ingest job. + ingest.LitBackCtxMgr.Delete(65535) + tk.MustExec("alter table t add index idx_a(a);") + rows = tk.MustQuery("admin show ddl jobs 1;").Rows() + require.Len(t, rows, 1) + require.True(t, strings.Contains(rows[0][3].(string) /* job_type */, "ingest")) + require.Equal(t, rows[0][7].(string) /* row_count */, "3") +} diff --git a/tests/realtikvtest/addindextest/memory_test.go b/tests/realtikvtest/addindextest/memory_test.go deleted file mode 100644 index 8e482458d4955..0000000000000 --- a/tests/realtikvtest/addindextest/memory_test.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package addindextest - -import ( - "fmt" - "strings" - "testing" - - "github.com/pingcap/tidb/ddl/ingest" - "github.com/pingcap/tidb/testkit" - "github.com/pingcap/tidb/tests/realtikvtest" - "github.com/stretchr/testify/require" -) - -func TestLitAddIndexMemoryUsage(t *testing.T) { - store := realtikvtest.CreateMockStoreAndSetup(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("drop database if exists addindexlit;") - tk.MustExec("create database addindexlit;") - tk.MustExec("use addindexlit;") - tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) - - tk.MustExec("create table t (a int, b int, c int);") - var sb strings.Builder - sb.WriteString("insert into t values ") - size := 100 - for i := 0; i < size; i++ { - sb.WriteString(fmt.Sprintf("(%d, %d, %d)", i, i, i)) - if i != size-1 { - sb.WriteString(",") - } - } - sb.WriteString(";") - tk.MustExec(sb.String()) - require.Equal(t, int64(0), ingest.LitMemRoot.CurrentUsage()) - tk.MustExec("alter table t add index idx(a);") - tk.MustExec("alter table t add unique index idx1(b);") - tk.MustExec("admin check table t;") - require.Equal(t, int64(0), ingest.LitMemRoot.CurrentUsage()) -}