diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 662849cc320..3911b90092d 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -199,14 +199,11 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e failpoint.Inject("TableSinkWorkerFetchFromCache", func() { err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected")) }) - if err != nil { - return errors.Trace(err) - } - // We have drained all events from the cache, we can return directly. - // No need to get events from the source manager again. - if drained { - performCallback(lowerBound.Prev()) - return nil + performCallback(lowerBound.Prev()) + if err != nil || drained { + // If drained is true it means we have drained all events from the cache, + // we can return directly instead of get events from the source manager again. + return err } }