From e1c4a1b4894aafe07318a07afa4c3471cfbde1cf Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 6 Sep 2023 16:16:58 +0800 Subject: [PATCH] some updates for code Signed-off-by: qupeng --- cdc/processor/sinkmanager/redo_log_worker.go | 15 +++---- .../sinkmanager/redo_log_worker_test.go | 44 +++++++++++++++++++ .../sinkmanager/table_sink_worker.go | 2 + .../sinkmanager/table_sink_worker_test.go | 44 +++++++++++++++++++ 4 files changed, 96 insertions(+), 9 deletions(-) diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 20f5bd18ea9..0a237ae8e87 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -68,22 +68,23 @@ func (w *redoWorker) handleTasks(ctx context.Context, taskChan <-chan *redoTask) } func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr error) { + advancer := newRedoLogAdvancer(task, w.memQuota, requestMemSize, w.redoDMLManager) + // The task is finished and some required memory isn't used. + defer advancer.cleanup() + lowerBound, upperBound := validateAndAdjustBound( w.changefeedID, &task.span, task.lowerBound, task.getUpperBound(task.tableSink.getReceivedSorterResolvedTs()), ) + advancer.lastPos = lowerBound.Prev() var cache *eventAppender if w.eventCache != nil { cache = w.eventCache.maybeCreateAppender(task.span, lowerBound) } - advancer := newRedoLogAdvancer(task, w.memQuota, requestMemSize, w.redoDMLManager) - // The task is finished and some required memory isn't used. - defer advancer.cleanup() - iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.memQuota) allEventCount := 0 cachedSize := uint64(0) @@ -127,11 +128,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e cache.pushBatch(nil, 0, upperBound) } - return advancer.finish( - ctx, - cachedSize, - upperBound, - ) + return advancer.finish(ctx, cachedSize, upperBound) } allEventCount += 1 diff --git a/cdc/processor/sinkmanager/redo_log_worker_test.go b/cdc/processor/sinkmanager/redo_log_worker_test.go index 8ed9af15f60..e0d8a77f0b0 100644 --- a/cdc/processor/sinkmanager/redo_log_worker_test.go +++ b/cdc/processor/sinkmanager/redo_log_worker_test.go @@ -283,3 +283,47 @@ func (suite *redoLogWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceIfNoWorkloa cancel() wg.Wait() } + +// When starts to handle a task, advancer.lastPos should be set to a correct position. +// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an +// invalid `advancer.lastPos`. +func (suite *redoLogWorkerSuite) TestHandleTaskWithoutMemory() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + w, e, _ := suite.createWorker(ctx, 0) + defer w.memQuota.Close() + suite.addEventsToSortEngine(events, e) + + taskChan := make(chan *redoTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos) + close(chShouldBeClosed) + } + taskChan <- &redoTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return true }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +} diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 906571954b4..f86a63d1df8 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -129,6 +129,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e &task.span, task.lowerBound, task.getUpperBound(task.tableSink.getUpperBoundTs())) + advancer.lastPos = lowerBound.Prev() allEventSize := uint64(0) allEventCount := 0 @@ -211,6 +212,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e performCallback(lowerBound.Prev()) return nil } + advancer.lastPos = lowerBound.Prev() } // lowerBound and upperBound are both closed intervals. diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 1fea817b1c5..6c259e9b540 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -716,3 +716,47 @@ func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() { cancel() wg.Wait() } + +// When starts to handle a task, advancer.lastPos should be set to a correct position. +// Otherwise if advancer.lastPos isn't updated during scanning, callback will get an +// invalid `advancer.lastPos`. +func (suite *tableSinkWorkerSuite) TestHandleTaskWithoutMemory() { + ctx, cancel := context.WithCancel(context.Background()) + events := []*model.PolymorphicEvent{ + genPolymorphicEvent(1, 3, suite.testSpan), + genPolymorphicResolvedEvent(4), + } + w, e := suite.createWorker(ctx, 0, true) + defer w.sinkMemQuota.Close() + suite.addEventsToSortEngine(events, e) + + taskChan := make(chan *sinkTask) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := w.handleTasks(ctx, taskChan) + require.Equal(suite.T(), context.Canceled, err) + }() + + wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan) + defer sink.Close() + + chShouldBeClosed := make(chan struct{}, 1) + callback := func(lastWritePos engine.Position) { + require.Equal(suite.T(), genLowerBound().Prev(), lastWritePos) + close(chShouldBeClosed) + } + taskChan <- &sinkTask{ + span: suite.testSpan, + lowerBound: genLowerBound(), + getUpperBound: genUpperBoundGetter(4), + tableSink: wrapper, + callback: callback, + isCanceled: func() bool { return true }, + } + + <-chShouldBeClosed + cancel() + wg.Wait() +}