diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index fa6542c3940..1a171b371cd 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -124,23 +124,9 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e &task.span, task.lowerBound, task.getUpperBound(task.tableSink.getUpperBoundTs())) - if w.eventCache != nil { - drained, err := w.fetchFromCache(task, &lowerBound, &upperBound) - if err != nil { - return errors.Trace(err) - } - // We have drained all events from the cache, we can return directly. - // No need to get events from the source manager again. - if drained { - task.callback(lowerBound.Prev()) - return nil - } - } allEventSize := uint64(0) allEventCount := 0 - // lowerBound and upperBound are both closed intervals. - iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.sinkMemQuota) defer func() { // Collect metrics. @@ -153,13 +139,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e task.tableSink.updateRangeEventCounts(eventCount) } - if err := iter.Close(); err != nil { - log.Error("Sink worker fails to close iterator", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Stringer("span", &task.span), - zap.Error(err)) - } log.Debug("Sink task finished", zap.String("namespace", w.changefeedID.Namespace), zap.String("changefeed", w.changefeedID.ID), @@ -187,7 +166,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e w.sinkMemQuota.ClearTable(task.tableSink.span) // Restart the table sink based on the checkpoint position. - if finalErr = task.tableSink.restart(ctx); finalErr == nil { + if err := task.tableSink.restart(ctx); err == nil { checkpointTs, _, _ := task.tableSink.getCheckpointTs() ckpt := checkpointTs.ResolvedMark() lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} @@ -196,13 +175,43 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e zap.String("namespace", w.changefeedID.Namespace), zap.String("changefeed", w.changefeedID.ID), zap.Stringer("span", &task.span), - zap.Any("lastWrittenPos", lastWrittenPos)) + zap.Any("lastWrittenPos", lastWrittenPos), + zap.String("sinkError", finalErr.Error())) + finalErr = err } default: } } }() + if w.eventCache != nil { + drained, err := w.fetchFromCache(task, &lowerBound, &upperBound) + failpoint.Inject("TableSinkWorkerFetchFromCache", func() { + err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected")) + }) + if err != nil { + return errors.Trace(err) + } + // We have drained all events from the cache, we can return directly. + // No need to get events from the source manager again. + if drained { + task.callback(lowerBound.Prev()) + return nil + } + } + + // lowerBound and upperBound are both closed intervals. + iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.sinkMemQuota) + defer func() { + if err := iter.Close(); err != nil { + log.Error("Sink worker fails to close iterator", + zap.String("namespace", w.changefeedID.Namespace), + zap.String("changefeed", w.changefeedID.ID), + zap.Stringer("span", &task.span), + zap.Error(err)) + } + }() + // 1. We have enough memory to collect events. // 2. The task is not canceled. for advancer.hasEnoughMem() && !task.isCanceled() { diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 031b643026b..6165fbacaf2 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/memquota" @@ -666,3 +667,53 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskUseDifferentBatchIDEveryTime() require.Equal(suite.T(), uint64(5), batchID.Load(), "The batchID should be 5, "+ "because the first task has 3 events, the second task has 1 event") } + +func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + // Only for three events. + eventSize := uint64(testEventSize * 3) + w, e := suite.createWorker(ctx, eventSize, true) + w.eventCache = newRedoEventCache(suite.testChangefeedID, 1024*1024) + defer w.sinkMemQuota.Close() + suite.addEventsToSortEngine(events, e) + + _ = failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache", "return") + defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache") + }() + + taskChan := make(chan *sinkTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWritePos engine.Position) { + close(chShouldBeClosed) + } + taskChan <- &sinkTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return false }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +}