Skip to content

Commit

Permalink
fix a bug
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu committed Sep 5, 2023
1 parent c682cfb commit 002149b
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
allEventSize := uint64(0)
allEventCount := 0

callbackIsPerformed := false
performCallback := func(pos engine.Position) {
if !callbackIsPerformed {
task.callback(pos)
callbackIsPerformed = true
} else {
panic("should never be performed twice")
}
}

defer func() {
// Collect metrics.
w.metricRedoEventCacheMiss.Add(float64(allEventSize))
Expand All @@ -153,7 +163,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e

// Otherwise we can't ensure all events before `lastPos` are emitted.
if finalErr == nil {
task.callback(advancer.lastPos)
performCallback(advancer.lastPos)
} else {
switch errors.Cause(finalErr).(type) {
// If it's a warning, close the table sink and wait all pending
Expand All @@ -170,7 +180,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
ckpt := checkpointTs.ResolvedMark()
lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt}
task.callback(lastWrittenPos)
performCallback(lastWrittenPos)
log.Info("table sink has been restarted",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
Expand All @@ -195,7 +205,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
// We have drained all events from the cache, we can return directly.
// No need to get events from the source manager again.
if drained {
task.callback(lowerBound.Prev())
performCallback(lowerBound.Prev())
return nil
}
}
Expand Down

0 comments on commit 002149b

Please sign in to comment.