diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 7ab9cd1faa76d..e11058c6c7f3c 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -253,7 +253,7 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont }() } -func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) (err error) { +func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) error { if e.runtimeStats != nil { collExec := true e.dagPBs[workID].CollectExecutionSummaries = &collExec @@ -288,6 +288,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, if e.isCorColInPartialFilters[workID] { // We got correlated column, so need to refresh Selection operator. + var err error if e.dagPBs[workID].Executors, _, err = constructDistExec(e.ctx, e.partialPlans[workID]); err != nil { worker.syncErr(e.resultCh, err) return @@ -359,7 +360,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, return nil } -func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) (err error) { +func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, workID int) error { ts := e.partialPlans[workID][0].(*plannercore.PhysicalTableScan) tbls := make([]table.Table, 0, 1) @@ -376,6 +377,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, defer e.idxWorkerWg.Done() util.WithRecovery( func() { + var err error partialTableReader := &TableReaderExecutor{ baseExecutor: newBaseExecutor(e.ctx, ts.Schema(), e.getPartitalPlanID(workID)), dagPB: e.dagPBs[workID], @@ -415,8 +417,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, // init partialTableReader and partialTableWorker again for the next table partialTableReader.table = tbl - err := partialTableReader.Open(ctx) - if err != nil { + if err = partialTableReader.Open(ctx); err != nil { logutil.Logger(ctx).Error("open Select result failed:", zap.Error(err)) worker.syncErr(e.resultCh, err) break @@ -438,7 +439,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, // release related resources cancel() - if err := worker.tableReader.Close(); err != nil { + if err = worker.tableReader.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed:", zap.Error(err)) } e.ctx.StoreQueryFeedback(e.feedbacks[workID])