From a6094f769e2f6c493d0dbe513506a4ca7a93b7f6 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 4 Mar 2025 20:45:48 +0800 Subject: [PATCH] ddl: fix job's row count for global sort (#59898) close pingcap/tidb#59897 --- pkg/ddl/backfilling_operators.go | 144 ++++++------------ .../addindextest2/global_sort_test.go | 15 +- .../addindextest3/operator_test.go | 9 +- 3 files changed, 66 insertions(+), 102 deletions(-) diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 2c19f8a90800f..05eb333af3031 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -181,8 +181,8 @@ func NewAddIndexIngestPipeline( srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, backendCtx) scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, reorgMeta.GetBatchSize(), rm, backendCtx) - ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool, - tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, rowCntListener) + ingestOp := NewIndexIngestOperator(ctx, copCtx, sessPool, + tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta) sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, rowCntListener) operator.Compose[TableScanTask](srcOp, scanOp) @@ -667,19 +667,17 @@ func NewWriteExternalStoreOperator( writers = append(writers, writer) } - return &indexIngestExternalWorker{ - indexIngestBaseWorker: indexIngestBaseWorker{ - ctx: ctx, - tbl: tbl, - indexes: indexes, - copCtx: copCtx, - se: nil, - sessPool: sessPool, - writers: writers, - srcChunkPool: srcChunkPool, - reorgMeta: reorgMeta, - totalCount: totalCount, - }, + return &indexIngestWorker{ + ctx: ctx, + tbl: tbl, + indexes: indexes, + copCtx: copCtx, + se: nil, + sessPool: sessPool, + writers: writers, + srcChunkPool: srcChunkPool, + reorgMeta: reorgMeta, + totalCount: totalCount, } }) return &WriteExternalStoreOperator{ @@ -700,7 +698,6 @@ func (o *WriteExternalStoreOperator) Close() error { type IndexWriteResult struct { ID int Added int - Total int } // IndexIngestOperator writes index records to ingest engine. @@ -712,7 +709,6 @@ type IndexIngestOperator struct { func NewIndexIngestOperator( ctx *OperatorCtx, copCtx copr.CopContext, - backendCtx ingest.BackendCtx, sessPool opSessPool, tbl table.PhysicalTable, indexes []table.Index, @@ -720,7 +716,6 @@ func NewIndexIngestOperator( srcChunkPool *sync.Pool, concurrency int, reorgMeta *model.DDLReorgMeta, - rowCntListener RowCountListener, ) *IndexIngestOperator { writerCfg := getLocalWriterConfig(len(indexes), concurrency) @@ -742,21 +737,17 @@ func NewIndexIngestOperator( writers = append(writers, writer) } - return &indexIngestLocalWorker{ - indexIngestBaseWorker: indexIngestBaseWorker{ - ctx: ctx, - tbl: tbl, - indexes: indexes, - copCtx: copCtx, - - se: nil, - sessPool: sessPool, - writers: writers, - srcChunkPool: srcChunkPool, - reorgMeta: reorgMeta, - }, - backendCtx: backendCtx, - rowCntListener: rowCntListener, + return &indexIngestWorker{ + ctx: ctx, + tbl: tbl, + indexes: indexes, + copCtx: copCtx, + + se: nil, + sessPool: sessPool, + writers: writers, + srcChunkPool: srcChunkPool, + reorgMeta: reorgMeta, } }) return &IndexIngestOperator{ @@ -764,55 +755,7 @@ func NewIndexIngestOperator( } } -type indexIngestExternalWorker struct { - indexIngestBaseWorker -} - -func (w *indexIngestExternalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) { - defer func() { - if ck.Chunk != nil { - w.srcChunkPool.Put(ck.Chunk) - } - }() - rs, err := w.indexIngestBaseWorker.HandleTask(ck) - if err != nil { - w.ctx.onError(err) - return - } - send(rs) -} - -type indexIngestLocalWorker struct { - indexIngestBaseWorker - backendCtx ingest.BackendCtx - rowCntListener RowCountListener -} - -func (w *indexIngestLocalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) { - defer func() { - if ck.Chunk != nil { - w.srcChunkPool.Put(ck.Chunk) - } - }() - rs, err := w.indexIngestBaseWorker.HandleTask(ck) - if err != nil { - w.ctx.onError(err) - return - } - if rs.Added == 0 { - return - } - w.rowCntListener.Written(rs.Added) - err = w.backendCtx.IngestIfQuotaExceeded(w.ctx, ck.ID, rs.Added) - if err != nil { - w.ctx.onError(err) - return - } - rs.Total = w.backendCtx.TotalKeyCount() - send(rs) -} - -type indexIngestBaseWorker struct { +type indexIngestWorker struct { ctx *OperatorCtx tbl table.PhysicalTable @@ -830,23 +773,28 @@ type indexIngestBaseWorker struct { totalCount *atomic.Int64 } -func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk) (IndexWriteResult, error) { +func (w *indexIngestWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) { + defer func() { + if ck.Chunk != nil { + w.srcChunkPool.Put(ck.Chunk) + } + }() failpoint.Inject("injectPanicForIndexIngest", func() { panic("mock panic") }) result := IndexWriteResult{ - ID: rs.ID, + ID: ck.ID, } w.initSessCtx() - count, _, err := w.WriteChunk(&rs) + count, _, err := w.WriteChunk(&ck) if err != nil { w.ctx.onError(err) - return result, err + return } if count == 0 { - logutil.Logger(w.ctx).Info("finish a index ingest task", zap.Int("id", rs.ID)) - return result, nil + logutil.Logger(w.ctx).Info("finish a index ingest task", zap.Int("id", ck.ID)) + return } if w.totalCount != nil { w.totalCount.Add(int64(count)) @@ -855,10 +803,10 @@ func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk) (IndexWriteResul if ResultCounterForTest != nil { ResultCounterForTest.Add(1) } - return result, nil + send(result) } -func (w *indexIngestBaseWorker) initSessCtx() { +func (w *indexIngestWorker) initSessCtx() { if w.se == nil { sessCtx, err := w.sessPool.Get() if err != nil { @@ -874,7 +822,7 @@ func (w *indexIngestBaseWorker) initSessCtx() { } } -func (w *indexIngestBaseWorker) Close() { +func (w *indexIngestWorker) Close() { // TODO(lance6716): unify the real write action for engineInfo and external // writer. for _, writer := range w.writers { @@ -894,7 +842,7 @@ func (w *indexIngestBaseWorker) Close() { } // WriteChunk will write index records to lightning engine. -func (w *indexIngestBaseWorker) WriteChunk(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) { +func (w *indexIngestWorker) WriteChunk(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) { failpoint.Inject("mockWriteLocalError", func(_ failpoint.Value) { failpoint.Return(0, nil, errors.New("mock write local error")) }) @@ -955,13 +903,13 @@ func (s *indexWriteResultSink) collectResult() error { select { case <-s.ctx.Done(): return s.ctx.Err() - case _, ok := <-s.source.Channel(): + case rs, ok := <-s.source.Channel(): if !ok { err := s.flush() if err != nil { s.ctx.onError(err) } - if s.backendCtx != nil { + if s.backendCtx != nil { // for local sort only total := s.backendCtx.TotalKeyCount() if total > 0 { s.rowCntListener.SetTotal(total) @@ -969,6 +917,14 @@ func (s *indexWriteResultSink) collectResult() error { } return err } + s.rowCntListener.Written(rs.Added) + if s.backendCtx != nil { // for local sort only + err := s.backendCtx.IngestIfQuotaExceeded(s.ctx, rs.ID, rs.Added) + if err != nil { + s.ctx.onError(err) + return err + } + } } } } diff --git a/tests/realtikvtest/addindextest2/global_sort_test.go b/tests/realtikvtest/addindextest2/global_sort_test.go index cea17a1c5d289..6793cc12aae9b 100644 --- a/tests/realtikvtest/addindextest2/global_sort_test.go +++ b/tests/realtikvtest/addindextest2/global_sort_test.go @@ -72,6 +72,15 @@ func checkFileCleaned(t *testing.T, jobID int64, sortStorageURI string) { require.Equal(t, 0, len(statFiles)) } +func checkDataAndShowJobs(t *testing.T, tk *testkit.TestKit, count int) { + tk.MustExec("admin check table t;") + rs := tk.MustQuery("admin show ddl jobs 1;").Rows() + require.Len(t, rs, 1) + require.Contains(t, rs[0][12], "ingest") + require.Contains(t, rs[0][12], "cloud") + require.Equal(t, rs[0][7], strconv.Itoa(count)) +} + func TestGlobalSortBasic(t *testing.T) { gcsHost, gcsPort, cloudStorageURI := genStorageURI(t) opt := fakestorage.Options{ @@ -120,18 +129,18 @@ func TestGlobalSortBasic(t *testing.T) { }) tk.MustExec("alter table t add index idx(a);") - tk.MustExec("admin check table t;") + checkDataAndShowJobs(t, tk, size) <-ch checkFileCleaned(t, jobID, cloudStorageURI) testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/forceMergeSort", "return()") tk.MustExec("alter table t add index idx1(a);") - tk.MustExec("admin check table t;") + checkDataAndShowJobs(t, tk, size) <-ch checkFileCleaned(t, jobID, cloudStorageURI) tk.MustExec("alter table t add unique index idx2(a);") - tk.MustExec("admin check table t;") + checkDataAndShowJobs(t, tk, size) <-ch checkFileCleaned(t, jobID, cloudStorageURI) } diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index 525525b3e2daf..96cfad5627c20 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -171,8 +171,8 @@ func TestBackfillOperators(t *testing.T) { src := testutil.NewOperatorTestSource(chunkResults...) reorgMeta := ddl.NewDDLReorgMeta(tk.Session()) ingestOp := ddl.NewIndexIngestOperator( - opCtx, copCtx, bcCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine}, - srcChkPool, 3, reorgMeta, &ddl.EmptyRowCntListener{}) + opCtx, copCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine}, + srcChkPool, 3, reorgMeta) sink := testutil.NewOperatorTestSink[ddl.IndexWriteResult]() operator.Compose[ddl.IndexRecordChunk](src, ingestOp) @@ -443,9 +443,8 @@ func TestTuneWorkerPoolSize(t *testing.T) { require.NoError(t, err) defer bcCtx.Close() mockEngine := ingest.NewMockEngineInfo(nil) - ingestOp := ddl.NewIndexIngestOperator(opCtx, copCtx, bcCtx, sessPool, pTbl, []table.Index{index}, - []ingest.Engine{mockEngine}, nil, 2, nil, - &ddl.EmptyRowCntListener{}) + ingestOp := ddl.NewIndexIngestOperator(opCtx, copCtx, sessPool, pTbl, []table.Index{index}, + []ingest.Engine{mockEngine}, nil, 2, nil) ingestOp.Open() require.Equal(t, ingestOp.GetWorkerPoolSize(), int32(2))