Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix job's row count for global sort #59898

Merged
merged 2 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 50 additions & 94 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ func NewAddIndexIngestPipeline(
srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, backendCtx)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt,
reorgMeta.GetBatchSize(), rm, backendCtx)
ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool,
tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, rowCntListener)
ingestOp := NewIndexIngestOperator(ctx, copCtx, sessPool,
tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta)
sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, rowCntListener)

operator.Compose[TableScanTask](srcOp, scanOp)
Expand Down Expand Up @@ -667,19 +667,17 @@ func NewWriteExternalStoreOperator(
writers = append(writers, writer)
}

return &indexIngestExternalWorker{
indexIngestBaseWorker: indexIngestBaseWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,
se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
totalCount: totalCount,
},
return &indexIngestWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,
se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
totalCount: totalCount,
}
})
return &WriteExternalStoreOperator{
Expand All @@ -700,7 +698,6 @@ func (o *WriteExternalStoreOperator) Close() error {
type IndexWriteResult struct {
ID int
Added int
Total int
}

// IndexIngestOperator writes index records to ingest engine.
Expand All @@ -712,15 +709,13 @@ type IndexIngestOperator struct {
func NewIndexIngestOperator(
ctx *OperatorCtx,
copCtx copr.CopContext,
backendCtx ingest.BackendCtx,
sessPool opSessPool,
tbl table.PhysicalTable,
indexes []table.Index,
engines []ingest.Engine,
srcChunkPool *sync.Pool,
concurrency int,
reorgMeta *model.DDLReorgMeta,
rowCntListener RowCountListener,
) *IndexIngestOperator {
writerCfg := getLocalWriterConfig(len(indexes), concurrency)

Expand All @@ -742,77 +737,25 @@ func NewIndexIngestOperator(
writers = append(writers, writer)
}

return &indexIngestLocalWorker{
indexIngestBaseWorker: indexIngestBaseWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,

se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
},
backendCtx: backendCtx,
rowCntListener: rowCntListener,
return &indexIngestWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
copCtx: copCtx,

se: nil,
sessPool: sessPool,
writers: writers,
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
}
})
return &IndexIngestOperator{
AsyncOperator: operator.NewAsyncOperator[IndexRecordChunk, IndexWriteResult](ctx, pool),
}
}

type indexIngestExternalWorker struct {
indexIngestBaseWorker
}

func (w *indexIngestExternalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
defer func() {
if ck.Chunk != nil {
w.srcChunkPool.Put(ck.Chunk)
}
}()
rs, err := w.indexIngestBaseWorker.HandleTask(ck)
if err != nil {
w.ctx.onError(err)
return
}
send(rs)
}

type indexIngestLocalWorker struct {
indexIngestBaseWorker
backendCtx ingest.BackendCtx
rowCntListener RowCountListener
}

func (w *indexIngestLocalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
defer func() {
if ck.Chunk != nil {
w.srcChunkPool.Put(ck.Chunk)
}
}()
rs, err := w.indexIngestBaseWorker.HandleTask(ck)
if err != nil {
w.ctx.onError(err)
return
}
if rs.Added == 0 {
return
}
w.rowCntListener.Written(rs.Added)
err = w.backendCtx.IngestIfQuotaExceeded(w.ctx, ck.ID, rs.Added)
if err != nil {
w.ctx.onError(err)
return
}
rs.Total = w.backendCtx.TotalKeyCount()
send(rs)
}

type indexIngestBaseWorker struct {
type indexIngestWorker struct {
ctx *OperatorCtx

tbl table.PhysicalTable
Expand All @@ -830,23 +773,28 @@ type indexIngestBaseWorker struct {
totalCount *atomic.Int64
}

func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk) (IndexWriteResult, error) {
func (w *indexIngestWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
defer func() {
if ck.Chunk != nil {
w.srcChunkPool.Put(ck.Chunk)
}
}()
failpoint.Inject("injectPanicForIndexIngest", func() {
panic("mock panic")
})

result := IndexWriteResult{
ID: rs.ID,
ID: ck.ID,
}
w.initSessCtx()
count, _, err := w.WriteChunk(&rs)
count, _, err := w.WriteChunk(&ck)
if err != nil {
w.ctx.onError(err)
return result, err
return
}
if count == 0 {
logutil.Logger(w.ctx).Info("finish a index ingest task", zap.Int("id", rs.ID))
return result, nil
logutil.Logger(w.ctx).Info("finish a index ingest task", zap.Int("id", ck.ID))
return
}
if w.totalCount != nil {
w.totalCount.Add(int64(count))
Expand All @@ -855,10 +803,10 @@ func (w *indexIngestBaseWorker) HandleTask(rs IndexRecordChunk) (IndexWriteResul
if ResultCounterForTest != nil {
ResultCounterForTest.Add(1)
}
return result, nil
send(result)
}

func (w *indexIngestBaseWorker) initSessCtx() {
func (w *indexIngestWorker) initSessCtx() {
if w.se == nil {
sessCtx, err := w.sessPool.Get()
if err != nil {
Expand All @@ -874,7 +822,7 @@ func (w *indexIngestBaseWorker) initSessCtx() {
}
}

func (w *indexIngestBaseWorker) Close() {
func (w *indexIngestWorker) Close() {
// TODO(lance6716): unify the real write action for engineInfo and external
// writer.
for _, writer := range w.writers {
Expand All @@ -894,7 +842,7 @@ func (w *indexIngestBaseWorker) Close() {
}

// WriteChunk will write index records to lightning engine.
func (w *indexIngestBaseWorker) WriteChunk(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) {
func (w *indexIngestWorker) WriteChunk(rs *IndexRecordChunk) (count int, nextKey kv.Key, err error) {
failpoint.Inject("mockWriteLocalError", func(_ failpoint.Value) {
failpoint.Return(0, nil, errors.New("mock write local error"))
})
Expand Down Expand Up @@ -955,20 +903,28 @@ func (s *indexWriteResultSink) collectResult() error {
select {
case <-s.ctx.Done():
return s.ctx.Err()
case _, ok := <-s.source.Channel():
case rs, ok := <-s.source.Channel():
if !ok {
err := s.flush()
if err != nil {
s.ctx.onError(err)
}
if s.backendCtx != nil {
if s.backendCtx != nil { // for local sort only
total := s.backendCtx.TotalKeyCount()
if total > 0 {
s.rowCntListener.SetTotal(total)
}
}
return err
}
s.rowCntListener.Written(rs.Added)
if s.backendCtx != nil { // for local sort only
err := s.backendCtx.IngestIfQuotaExceeded(s.ctx, rs.ID, rs.Added)
if err != nil {
s.ctx.onError(err)
return err
}
}
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions tests/realtikvtest/addindextest2/global_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ func checkFileCleaned(t *testing.T, jobID int64, sortStorageURI string) {
require.Equal(t, 0, len(statFiles))
}

func checkDataAndShowJobs(t *testing.T, tk *testkit.TestKit, count int) {
tk.MustExec("admin check table t;")
rs := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rs, 1)
require.Contains(t, rs[0][12], "ingest")
require.Contains(t, rs[0][12], "cloud")
require.Equal(t, rs[0][7], strconv.Itoa(count))
}

func TestGlobalSortBasic(t *testing.T) {
gcsHost, gcsPort, cloudStorageURI := genStorageURI(t)
opt := fakestorage.Options{
Expand Down Expand Up @@ -120,18 +129,18 @@ func TestGlobalSortBasic(t *testing.T) {
})

tk.MustExec("alter table t add index idx(a);")
tk.MustExec("admin check table t;")
checkDataAndShowJobs(t, tk, size)
<-ch
checkFileCleaned(t, jobID, cloudStorageURI)

testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/forceMergeSort", "return()")
tk.MustExec("alter table t add index idx1(a);")
tk.MustExec("admin check table t;")
checkDataAndShowJobs(t, tk, size)
<-ch
checkFileCleaned(t, jobID, cloudStorageURI)

tk.MustExec("alter table t add unique index idx2(a);")
tk.MustExec("admin check table t;")
checkDataAndShowJobs(t, tk, size)
<-ch
checkFileCleaned(t, jobID, cloudStorageURI)
}
Expand Down
9 changes: 4 additions & 5 deletions tests/realtikvtest/addindextest3/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func TestBackfillOperators(t *testing.T) {
src := testutil.NewOperatorTestSource(chunkResults...)
reorgMeta := ddl.NewDDLReorgMeta(tk.Session())
ingestOp := ddl.NewIndexIngestOperator(
opCtx, copCtx, bcCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine},
srcChkPool, 3, reorgMeta, &ddl.EmptyRowCntListener{})
opCtx, copCtx, sessPool, pTbl, []table.Index{index}, []ingest.Engine{mockEngine},
srcChkPool, 3, reorgMeta)
sink := testutil.NewOperatorTestSink[ddl.IndexWriteResult]()

operator.Compose[ddl.IndexRecordChunk](src, ingestOp)
Expand Down Expand Up @@ -443,9 +443,8 @@ func TestTuneWorkerPoolSize(t *testing.T) {
require.NoError(t, err)
defer bcCtx.Close()
mockEngine := ingest.NewMockEngineInfo(nil)
ingestOp := ddl.NewIndexIngestOperator(opCtx, copCtx, bcCtx, sessPool, pTbl, []table.Index{index},
[]ingest.Engine{mockEngine}, nil, 2, nil,
&ddl.EmptyRowCntListener{})
ingestOp := ddl.NewIndexIngestOperator(opCtx, copCtx, sessPool, pTbl, []table.Index{index},
[]ingest.Engine{mockEngine}, nil, 2, nil)

ingestOp.Open()
require.Equal(t, ingestOp.GetWorkerPoolSize(), int32(2))
Expand Down