Skip to content

Commit

Permalink
fix "dead dmlSink" error in sink workers
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
hicqu committed Sep 5, 2023
1 parent e28b004 commit c682cfb
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 23 deletions.
55 changes: 32 additions & 23 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,9 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
&task.span,
task.lowerBound,
task.getUpperBound(task.tableSink.getUpperBoundTs()))
if w.eventCache != nil {
drained, err := w.fetchFromCache(task, &lowerBound, &upperBound)
if err != nil {
return errors.Trace(err)
}
// We have drained all events from the cache, we can return directly.
// No need to get events from the source manager again.
if drained {
task.callback(lowerBound.Prev())
return nil
}
}

allEventSize := uint64(0)
allEventCount := 0
// lowerBound and upperBound are both closed intervals.
iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.sinkMemQuota)

defer func() {
// Collect metrics.
Expand All @@ -153,13 +139,6 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
task.tableSink.updateRangeEventCounts(eventCount)
}

if err := iter.Close(); err != nil {
log.Error("Sink worker fails to close iterator",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Error(err))
}
log.Debug("Sink task finished",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
Expand Down Expand Up @@ -187,7 +166,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
w.sinkMemQuota.ClearTable(task.tableSink.span)

// Restart the table sink based on the checkpoint position.
if finalErr = task.tableSink.restart(ctx); finalErr == nil {
if err := task.tableSink.restart(ctx); err == nil {
checkpointTs, _, _ := task.tableSink.getCheckpointTs()
ckpt := checkpointTs.ResolvedMark()
lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt}
Expand All @@ -196,13 +175,43 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Any("lastWrittenPos", lastWrittenPos))
zap.Any("lastWrittenPos", lastWrittenPos),
zap.String("sinkError", finalErr.Error()))
finalErr = err
}
default:
}
}
}()

if w.eventCache != nil {
drained, err := w.fetchFromCache(task, &lowerBound, &upperBound)
failpoint.Inject("TableSinkWorkerFetchFromCache", func() {
err = tablesink.NewSinkInternalError(errors.New("TableSinkWorkerFetchFromCacheInjected"))
})
if err != nil {
return errors.Trace(err)
}
// We have drained all events from the cache, we can return directly.
// No need to get events from the source manager again.
if drained {
task.callback(lowerBound.Prev())
return nil
}
}

// lowerBound and upperBound are both closed intervals.
iter := w.sourceManager.FetchByTable(task.span, lowerBound, upperBound, w.sinkMemQuota)
defer func() {
if err := iter.Close(); err != nil {
log.Error("Sink worker fails to close iterator",
zap.String("namespace", w.changefeedID.Namespace),
zap.String("changefeed", w.changefeedID.ID),
zap.Stringer("span", &task.span),
zap.Error(err))
}
}()

// 1. We have enough memory to collect events.
// 2. The task is not canceled.
for advancer.hasEnoughMem() && !task.isCanceled() {
Expand Down
51 changes: 51 additions & 0 deletions cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/memquota"
Expand Down Expand Up @@ -666,3 +667,53 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskUseDifferentBatchIDEveryTime()
require.Equal(suite.T(), uint64(5), batchID.Load(), "The batchID should be 5, "+
"because the first task has 3 events, the second task has 1 event")
}

func (suite *tableSinkWorkerSuite) TestFetchFromCacheWithFailure() {
ctx, cancel := context.WithCancel(context.Background())
events := []*model.PolymorphicEvent{
genPolymorphicEvent(1, 3, suite.testSpan),
genPolymorphicEvent(1, 3, suite.testSpan),
genPolymorphicEvent(1, 3, suite.testSpan),
genPolymorphicResolvedEvent(4),
}
// Only for three events.
eventSize := uint64(testEventSize * 3)
w, e := suite.createWorker(ctx, eventSize, true)
w.eventCache = newRedoEventCache(suite.testChangefeedID, 1024*1024)
defer w.sinkMemQuota.Close()
suite.addEventsToSortEngine(events, e)

_ = failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache", "return")
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/TableSinkWorkerFetchFromCache")
}()

taskChan := make(chan *sinkTask)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := w.handleTasks(ctx, taskChan)
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(suite.testChangefeedID, suite.testSpan)
defer sink.Close()

chShouldBeClosed := make(chan struct{}, 1)
callback := func(lastWritePos engine.Position) {
close(chShouldBeClosed)
}
taskChan <- &sinkTask{
span: suite.testSpan,
lowerBound: genLowerBound(),
getUpperBound: genUpperBoundGetter(4),
tableSink: wrapper,
callback: callback,
isCanceled: func() bool { return false },
}

<-chShouldBeClosed
cancel()
wg.Wait()
}

0 comments on commit c682cfb

Please sign in to comment.