diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 1a171b371cd..662849cc320 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -128,6 +128,16 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e allEventSize := uint64(0) allEventCount := 0 + callbackIsPerformed := false + performCallback := func(pos engine.Position) { + if !callbackIsPerformed { + task.callback(pos) + callbackIsPerformed = true + } else { + panic("should never be performed twice") + } + } + defer func() { // Collect metrics. w.metricRedoEventCacheMiss.Add(float64(allEventSize)) @@ -153,7 +163,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // Otherwise we can't ensure all events before `lastPos` are emitted. if finalErr == nil { - task.callback(advancer.lastPos) + performCallback(advancer.lastPos) } else { switch errors.Cause(finalErr).(type) { // If it's a warning, close the table sink and wait all pending @@ -170,7 +180,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e checkpointTs, _, _ := task.tableSink.getCheckpointTs() ckpt := checkpointTs.ResolvedMark() lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} - task.callback(lastWrittenPos) + performCallback(lastWrittenPos) log.Info("table sink has been restarted", zap.String("namespace", w.changefeedID.Namespace), zap.String("changefeed", w.changefeedID.ID), @@ -195,7 +205,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // We have drained all events from the cache, we can return directly. // No need to get events from the source manager again. if drained { - task.callback(lowerBound.Prev()) + performCallback(lowerBound.Prev()) return nil } }