From c682cfbf51d04061ccc3eda60e45321d1fefa4e0 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 5 Sep 2023 18:10:00 +0800 Subject: [PATCH 1/4] fix "dead dmlSink" error in sink workers Signed-off-by: qupeng --- .../sinkmanager/table_sink_worker.go | 55 +++++++++++-------- .../sinkmanager/table_sink_worker_test.go | 51 +++++++++++++++++ 2 files changed, 83 insertions(+), 23 deletions(-) diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index fa6542c3940..1a171b371cd 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -124,23 +124,9 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e &task.span, task.lowerBound, task.getUpperBound(task.tableSink.getUpperBoundTs())) - if w.eventCache != nil { - drained, err := w.fetchFromCache(task, &lowerBound, &upperBound) - if err != nil { - return errors.Trace(err) - } - // We have drained all events from the cache, we can return directly. - // No need to get events from the source manager again. - if drained { - task.callback(lowerBound.Prev()) - return nil - } - } allEventSize := uint64(0) allEventCount := 0 - // lowerBound and upperBound are both closed intervals. - iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.sinkMemQuota) defer func() { // Collect metrics. @@ -153,13 +139,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e task.tableSink.updateRangeEventCounts(eventCount) } - if err := iter.Close(); err != nil { - log.Error("Sink worker fails to close iterator", - zap.String("namespace", w.changefeedID.Namespace), - zap.String("changefeed", w.changefeedID.ID), - zap.Stringer("span", &task.span), - zap.Error(err)) - } log.Debug("Sink task finished", zap.String("namespace", w.changefeedID.Namespace), zap.String("changefeed", w.changefeedID.ID), @@ -187,7 +166,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e w.sinkMemQuota.ClearTable(task.tableSink.span) // Restart the table sink based on the checkpoint position. - if finalErr = task.tableSink.restart(ctx); finalErr == nil { + if err := task.tableSink.restart(ctx); err == nil { checkpointTs, _, _ := task.tableSink.getCheckpointTs() ckpt := checkpointTs.ResolvedMark() lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} @@ -196,13 +175,43 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e zap.String("namespace", w.changefeedID.Namespace), zap.String("changefeed", w.changefeedID.ID), zap.Stringer("span", &task.span), - zap.Any("lastWrittenPos", lastWrittenPos)) + zap.Any("lastWrittenPos", lastWrittenPos), + zap.String("sinkError", finalErr.Error())) + finalErr = err } default: } } }() + if w.eventCache != nil { + drained, err := w.fetchFromCache(task, &lowerBound, &upperBound) + failpoint.Inject("TableSinkWorkerFetchFromCache", func() { + err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected")) + }) + if err != nil { + return errors.Trace(err) + } + // We have drained all events from the cache, we can return directly. + // No need to get events from the source manager again. + if drained { + task.callback(lowerBound.Prev()) + return nil + } + } + + // lowerBound and upperBound are both closed intervals. + iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.sinkMemQuota) + defer func() { + if err := iter.Close(); err != nil { + log.Error("Sink worker fails to close iterator", + zap.String("namespace", w.changefeedID.Namespace), + zap.String("changefeed", w.changefeedID.ID), + zap.Stringer("span", &task.span), + zap.Error(err)) + } + }() + // 1. We have enough memory to collect events. // 2. The task is not canceled. for advancer.hasEnoughMem() && !task.isCanceled() { diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 031b643026b..6165fbacaf2 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/memquota" @@ -666,3 +667,53 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskUseDifferentBatchIDEveryTime() require.Equal(suite.T(), uint64(5), batchID.Load(), "The batchID should be 5, "+ "because the first task has 3 events, the second task has 1 event") } + +func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + // Only for three events. + eventSize := uint64(testEventSize * 3) + w, e := suite.createWorker(ctx, eventSize, true) + w.eventCache = newRedoEventCache(suite.testChangefeedID, 1024*1024) + defer w.sinkMemQuota.Close() + suite.addEventsToSortEngine(events, e) + + _ = failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache", "return") + defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache") + }() + + taskChan := make(chan *sinkTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWritePos engine.Position) { + close(chShouldBeClosed) + } + taskChan <- &sinkTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return false }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +} From 002149b67a4129740efb0d05c808db2922d5e6f1 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 5 Sep 2023 20:27:08 +0800 Subject: [PATCH 2/4] fix a bug Signed-off-by: qupeng --- cdc/processor/sinkmanager/table_sink_worker.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 1a171b371cd..662849cc320 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -128,6 +128,16 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e allEventSize := uint64(0) allEventCount := 0 + callbackIsPerformed := false + performCallback := func(pos engine.Position) { + if !callbackIsPerformed { + task.callback(pos) + callbackIsPerformed = true + } else { + panic("should never be performed twice") + } + } + defer func() { // Collect metrics. w.metricRedoEventCacheMiss.Add(float64(allEventSize)) @@ -153,7 +163,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // Otherwise we can't ensure all events before `lastPos` are emitted. if finalErr == nil { - task.callback(advancer.lastPos) + performCallback(advancer.lastPos) } else { switch errors.Cause(finalErr).(type) { // If it's a warning, close the table sink and wait all pending @@ -170,7 +180,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e checkpointTs, _, _ := task.tableSink.getCheckpointTs() ckpt := checkpointTs.ResolvedMark() lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} - task.callback(lastWrittenPos) + performCallback(lastWrittenPos) log.Info("table sink has been restarted", zap.String("namespace", w.changefeedID.Namespace), zap.String("changefeed", w.changefeedID.ID), @@ -195,7 +205,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // We have drained all events from the cache, we can return directly. // No need to get events from the source manager again. if drained { - task.callback(lowerBound.Prev()) + performCallback(lowerBound.Prev()) return nil } } From a7935b2a68d9c3589a32a0b169618cab7a010eea Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 6 Sep 2023 14:37:24 +0800 Subject: [PATCH 3/4] fix Signed-off-by: qupeng --- cdc/processor/sinkmanager/table_sink_worker.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 662849cc320..fa164746d45 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -133,8 +133,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e if !callbackIsPerformed { task.callback(pos) callbackIsPerformed = true - } else { - panic("should never be performed twice") } } @@ -202,9 +200,9 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e if err != nil { return errors.Trace(err) } - // We have drained all events from the cache, we can return directly. - // No need to get events from the source manager again. if drained { + // If drained is true it means we have drained all events from the cache, + // we can return directly instead of get events from the source manager again. performCallback(lowerBound.Prev()) return nil } From 396f6e5d2ca13a75163583ba534de230a290f60e Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 6 Sep 2023 16:16:58 +0800 Subject: [PATCH 4/4] some updates for code Signed-off-by: qupeng --- cdc/processor/sinkmanager/redo_log_worker.go | 15 +++---- .../sinkmanager/redo_log_worker_test.go | 44 +++++++++++++++++++ .../sinkmanager/table_sink_worker.go | 2 + .../sinkmanager/table_sink_worker_test.go | 44 +++++++++++++++++++ 4 files changed, 96 insertions(+), 9 deletions(-) diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 9c7373d676e..f2f9fe1aca5 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -65,22 +65,23 @@ func (w *redoWorker) handleTasks(ctx context.Context, taskChan <-chan *redoTask) } func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr error) { + advancer := newRedoLogAdvancer(task, w.memQuota, requestMemSize, w.redoDMLManager) + // The task is finished and some required memory isn't used. + defer advancer.cleanup() + lowerBound, upperBound := validateAndAdjustBound( w.changefeedID, &task.span, task.lowerBound, task.getUpperBound(task.tableSink.getReceivedSorterResolvedTs()), ) + advancer.lastPos = lowerBound.Prev() var cache *eventAppender if w.eventCache != nil { cache = w.eventCache.maybeCreateAppender(task.span, lowerBound) } - advancer := newRedoLogAdvancer(task, w.memQuota, requestMemSize, w.redoDMLManager) - // The task is finished and some required memory isn't used. - defer advancer.cleanup() - iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.memQuota) allEventCount := 0 cachedSize := uint64(0) @@ -124,11 +125,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e cache.pushBatch(nil, 0, upperBound) } - return advancer.finish( - ctx, - cachedSize, - upperBound, - ) + return advancer.finish(ctx, cachedSize, upperBound) } allEventCount += 1 diff --git a/cdc/processor/sinkmanager/redo_log_worker_test.go b/cdc/processor/sinkmanager/redo_log_worker_test.go index 3bcb85021b2..98e107d067d 100644 --- a/cdc/processor/sinkmanager/redo_log_worker_test.go +++ b/cdc/processor/sinkmanager/redo_log_worker_test.go @@ -284,3 +284,47 @@ func (suite *redoLogWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceIfNoWorkloa cancel() wg.Wait() } + +// When starts to handle a task, advancer.lastPos should be set to a correct position. +// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an +// invalid `advancer.lastPos`. +func (suite *redoLogWorkerSuite) TestHandleTaskWithoutMemory() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + w, e, _ := suite.createWorker(ctx, 0) + defer w.memQuota.Close() + suite.addEventsToSortEngine(events, e) + + taskChan := make(chan *redoTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos) + close(chShouldBeClosed) + } + taskChan <- &redoTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return true }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +} diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index fa164746d45..b60339b9afa 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -124,6 +124,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e &task.span, task.lowerBound, task.getUpperBound(task.tableSink.getUpperBoundTs())) + advancer.lastPos = lowerBound.Prev() allEventSize := uint64(0) allEventCount := 0 @@ -206,6 +207,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e performCallback(lowerBound.Prev()) return nil } + advancer.lastPos = lowerBound.Prev() } // lowerBound and upperBound are both closed intervals. diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 6165fbacaf2..32fd22587e3 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -717,3 +717,47 @@ func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() { cancel() wg.Wait() } + +// When starts to handle a task, advancer.lastPos should be set to a correct position. +// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an +// invalid `advancer.lastPos`. +func (suite *tableSinkWorkerSuite) TestHandleTaskWithoutMemory() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + w, e := suite.createWorker(ctx, 0, true) + defer w.sinkMemQuota.Close() + suite.addEventsToSortEngine(events, e) + + taskChan := make(chan *sinkTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos) + close(chShouldBeClosed) + } + taskChan <- &sinkTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return true }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +}