From 4fe792a5dce9831e7ea8de011ab944f9d4af584f Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 6 Sep 2023 14:37:24 +0800 Subject: [PATCH] fix Signed-off-by: qupeng --- cdc/processor/sinkmanager/table_sink_worker.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 662849cc320..3911b90092d 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -199,14 +199,11 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e failpoint.Inject("TableSinkWorkerFetchFromCache", func() { err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected")) }) - if err != nil { - return errors.Trace(err) - } - // We have drained all events from the cache, we can return directly. - // No need to get events from the source manager again. - if drained { - performCallback(lowerBound.Prev()) - return nil + performCallback(lowerBound.Prev()) + if err != nil || drained { + // If drained is true it means we have drained all events from the cache, + // we can return directly instead of get events from the source manager again. + return err } }