From c410cff8ecbeb7f1e159af618984441dc1c26ea2 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 11 Sep 2023 14:28:42 +0800 Subject: [PATCH] sink(cdc): improve table sink advance timeout machanism (#9666) close pingcap/tiflow#9695 --- cdc/processor/sinkmanager/manager.go | 26 ++--- cdc/processor/sinkmanager/manager_test.go | 6 +- .../sinkmanager/table_sink_advancer_test.go | 24 ++-- .../sinkmanager/table_sink_worker.go | 2 +- .../sinkmanager/table_sink_worker_test.go | 6 +- .../sinkmanager/table_sink_wrapper.go | 65 +++++++---- .../sinkmanager/table_sink_wrapper_test.go | 108 +++++++++++++++++- 7 files changed, 179 insertions(+), 58 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index cc961ed5371..349f8c2db78 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -268,6 +268,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er zap.Error(err)) m.clearSinkFactory() + // To release memory quota ASAP, close all table sinks manually. start := time.Now() log.Info("Sink manager is closing all table sinks", zap.String("namespace", m.changefeedID.Namespace), @@ -371,22 +372,17 @@ func (m *SinkManager) clearSinkFactory() { } } -func (m *SinkManager) putSinkFactoryError(err error, version uint64) { +func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bool) { m.sinkFactory.Lock() defer m.sinkFactory.Unlock() - skipped := true if version == m.sinkFactory.version { select { case m.sinkFactory.errors <- err: - skipped = false default: } + return true } - log.Info("Sink manager tries to put an sink error", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Bool("skipped", skipped), - zap.String("error", err.Error())) + return false } func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) { @@ -434,7 +430,7 @@ func (m *SinkManager) backgroundGC(errors chan<- error) { if time.Since(sink.lastCleanTime) < cleanTableInterval { return true } - checkpointTs, _, _ := sink.getCheckpointTs() + checkpointTs := sink.getCheckpointTs() resolvedMark := checkpointTs.ResolvedMark() if resolvedMark == 0 { return true @@ -906,7 +902,7 @@ func (m *SinkManager) RemoveTable(span tablepb.Span) { zap.String("changefeed", m.changefeedID.ID), zap.Stringer("span", &span)) } - checkpointTs, _, _ := value.(*tableSinkWrapper).getCheckpointTs() + checkpointTs := value.(*tableSinkWrapper).getCheckpointTs() log.Info("Remove table sink successfully", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), @@ -975,18 +971,18 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { } tableSink := value.(*tableSinkWrapper) - checkpointTs, version, advanced := tableSink.getCheckpointTs() + checkpointTs := tableSink.getCheckpointTs() m.sinkMemQuota.Release(span, checkpointTs) m.redoMemQuota.Release(span, checkpointTs) advanceTimeoutInSec := util.GetOrZero(m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec) if advanceTimeoutInSec <= 0 { - log.Warn("AdvanceTimeoutInSec is not set, use default value", zap.Any("sinkConfig", m.changefeedInfo.Config.Sink)) advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec } stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second - if version > 0 && time.Since(advanced) > stuckCheck && - oracle.GetTimeFromTS(tableSink.getUpperBoundTs()).Sub(oracle.GetTimeFromTS(checkpointTs.Ts)) > stuckCheck { + + isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck) + if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) { log.Warn("Table checkpoint is stuck too long, will restart the sink backend", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), @@ -994,8 +990,6 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats { zap.Any("checkpointTs", checkpointTs), zap.Float64("stuckCheck", stuckCheck.Seconds()), zap.Uint64("factoryVersion", version)) - tableSink.updateTableSinkAdvanced() - m.putSinkFactoryError(errors.New("table sink stuck"), version) } var resolvedTs model.Ts diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index d79657e749f..96b6749f137 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -197,7 +197,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) { require.Eventually(t, func() bool { tableSink, ok := manager.tableSinks.Load(span) require.True(t, ok) - checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs() + checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() return checkpointTS.ResolvedMark() == 4 }, 5*time.Second, 10*time.Millisecond) } @@ -228,7 +228,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) { require.Eventually(t, func() bool { tableSink, ok := manager.tableSinks.Load(span) require.True(t, ok) - checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs() + checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() return checkpointTS.ResolvedMark() == 3 }, 5*time.Second, 10*time.Millisecond) } @@ -283,7 +283,7 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) { tableSink, ok := manager.tableSinks.Load(span) require.True(t, ok) require.NotNil(t, tableSink) - checkpointTS, _, _ := tableSink.(*tableSinkWrapper).getCheckpointTs() + checkpointTS := tableSink.(*tableSinkWrapper).getCheckpointTs() require.Equal(t, uint64(1), checkpointTS.Ts) } diff --git a/cdc/processor/sinkmanager/table_sink_advancer_test.go b/cdc/processor/sinkmanager/table_sink_advancer_test.go index b1c2af8303a..f28c5f9496c 100644 --- a/cdc/processor/sinkmanager/table_sink_advancer_test.go +++ b/cdc/processor/sinkmanager/table_sink_advancer_test.go @@ -136,7 +136,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTableSinkWithBatchID() { expectedResolvedTs := model.NewResolvedTs(2) expectedResolvedTs.Mode = model.BatchResolvedMode expectedResolvedTs.BatchID = 1 - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() require.Equal(suite.T(), expectedResolvedTs, checkpointTs) } @@ -151,7 +151,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTableSink() { require.NoError(suite.T(), err) expectedResolvedTs := model.NewResolvedTs(2) - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() require.Equal(suite.T(), expectedResolvedTs, checkpointTs) } @@ -290,7 +290,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTheSameCommitTsEventsWithCommitF require.Len(suite.T(), sink.GetEvents(), 3) sink.AckAllEvents() require.Eventually(suite.T(), func() bool { - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() return checkpointTs == model.NewResolvedTs(2) }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) @@ -337,7 +337,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceTheSameCommitTsEventsWithoutComm expectedResolvedTs := model.NewResolvedTs(3) expectedResolvedTs.Mode = model.BatchResolvedMode expectedResolvedTs.BatchID = 1 - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) @@ -388,7 +388,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceDifferentCommitTsEventsWithSplit expectedResolvedTs := model.NewResolvedTs(3) expectedResolvedTs.Mode = model.BatchResolvedMode expectedResolvedTs.BatchID = 1 - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) @@ -443,7 +443,7 @@ func (suite *tableSinkAdvancerSuite) TestAdvanceDifferentCommitTsEventsWithoutSp sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(2) - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) @@ -499,7 +499,7 @@ func (suite *tableSinkAdvancerSuite) TestLastTimeAdvanceDifferentCommitTsEventsW sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(2) - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) @@ -557,7 +557,7 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceWhenExceedAvailableMem() { sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(3) - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) @@ -607,7 +607,7 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceWhenReachTheMaxUpdateIntSizeA sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(3) - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) @@ -660,7 +660,7 @@ func (suite *tableSinkAdvancerSuite) TestFinish() { sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(4) - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) @@ -710,7 +710,7 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceAndForceAcquireWithoutSplitTx sink.AckAllEvents() require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(3) - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) @@ -775,7 +775,7 @@ func (suite *tableSinkAdvancerSuite) TestTryAdvanceAndBlockAcquireWithSplitTxn() <-down require.Eventually(suite.T(), func() bool { expectedResolvedTs := model.NewResolvedTs(3) - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() return checkpointTs == expectedResolvedTs }, 5*time.Second, 10*time.Millisecond) require.Equal(suite.T(), uint64(0), advancer.committedTxnSize) diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index b60339b9afa..687583219c0 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -176,7 +176,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // Restart the table sink based on the checkpoint position. if err := task.tableSink.restart(ctx); err == nil { - checkpointTs, _, _ := task.tableSink.getCheckpointTs() + checkpointTs := task.tableSink.getCheckpointTs() ckpt := checkpointTs.ResolvedMark() lastWrittenPos := engine.Position{StartTs: ckpt - 1, CommitTs: ckpt} performCallback(lastWrittenPos) diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 32fd22587e3..6e65c5e97b0 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -535,7 +535,7 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableWhen receivedEvents := sink.GetEvents() receivedEvents[0].Callback() require.Len(suite.T(), sink.GetEvents(), 1, "No more events should be sent to sink") - checkpointTs, _, _ := wrapper.getCheckpointTs() + checkpointTs := wrapper.getCheckpointTs() require.Equal(suite.T(), uint64(4), checkpointTs.ResolvedMark()) } @@ -581,7 +581,7 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableIfNo isCanceled: func() bool { return false }, } require.Eventually(suite.T(), func() bool { - checkpointTs, _, _ := wrapper.getCheckpointTs() + checkpointTs := wrapper.getCheckpointTs() return checkpointTs.ResolvedMark() == 4 }, 5*time.Second, 10*time.Millisecond, "Directly advance resolved mark to 4") cancel() @@ -639,7 +639,7 @@ func (suite *tableSinkWorkerSuite) TestHandleTaskUseDifferentBatchIDEveryTime() require.Equal(suite.T(), uint64(3), batchID.Load()) sink.AckAllEvents() require.Eventually(suite.T(), func() bool { - checkpointTs, _, _ := wrapper.getCheckpointTs() + checkpointTs := wrapper.getCheckpointTs() return checkpointTs.ResolvedMark() == 2 }, 5*time.Second, 10*time.Millisecond) diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 95cdd17c4c5..d41c10c2928 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -51,10 +51,13 @@ type tableSinkWrapper struct { // tableSink is the underlying sink. tableSink struct { sync.RWMutex - s tablesink.TableSink - version uint64 // it's generated by `tableSinkCreator`. + s tablesink.TableSink + version uint64 // it's generated by `tableSinkCreater`. + + innerMu sync.Mutex + advanced time.Time + resolvedTs model.ResolvedTs checkpointTs model.ResolvedTs - advanced atomic.Int64 } // state used to control the lifecycle of the table. @@ -120,7 +123,8 @@ func newTableSinkWrapper( res.tableSink.version = 0 res.tableSink.checkpointTs = model.NewResolvedTs(startTs) - res.updateTableSinkAdvanced() + res.tableSink.resolvedTs = model.NewResolvedTs(startTs) + res.tableSink.advanced = time.Now() res.receivedSorterResolvedTs.Store(startTs) res.barrierTs.Store(startTs) @@ -197,33 +201,28 @@ func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error { // If it's nil it means it's closed. return tablesink.NewSinkInternalError(errors.New("table sink cleared")) } + t.tableSink.innerMu.Lock() + defer t.tableSink.innerMu.Unlock() + t.tableSink.resolvedTs = ts return t.tableSink.s.UpdateResolvedTs(ts) } -// getCheckpointTs returns -// 1. checkpoint timestamp of the table; -// 2. the table sink version, which comes from `tableSinkCreator`; -// 3. recent time of the table is advanced. -func (t *tableSinkWrapper) getCheckpointTs() (model.ResolvedTs, uint64, time.Time) { +func (t *tableSinkWrapper) getCheckpointTs() model.ResolvedTs { t.tableSink.RLock() defer t.tableSink.RUnlock() + t.tableSink.innerMu.Lock() + defer t.tableSink.innerMu.Unlock() + if t.tableSink.s != nil { checkpointTs := t.tableSink.s.GetCheckpointTs() if t.tableSink.checkpointTs.Less(checkpointTs) { t.tableSink.checkpointTs = checkpointTs - t.updateTableSinkAdvanced() + t.tableSink.advanced = time.Now() + } else if !checkpointTs.Less(t.tableSink.resolvedTs) { + t.tableSink.advanced = time.Now() } } - advanced := time.Unix(t.tableSink.advanced.Load(), 0) - return t.tableSink.checkpointTs, t.tableSink.version, advanced -} - -func (t *tableSinkWrapper) updateTableSinkAdvanced() { - curr := t.tableSink.advanced.Load() - now := time.Now().Unix() - if now > curr { - t.tableSink.advanced.CompareAndSwap(curr, now) - } + return t.tableSink.checkpointTs } func (t *tableSinkWrapper) getReceivedSorterResolvedTs() model.Ts { @@ -296,7 +295,7 @@ func (t *tableSinkWrapper) initTableSink() bool { if t.tableSink.s == nil { t.tableSink.s, t.tableSink.version = t.tableSinkCreator() if t.tableSink.s != nil { - t.updateTableSinkAdvanced() + t.tableSink.advanced = time.Now() return true } return false @@ -342,12 +341,15 @@ func (t *tableSinkWrapper) doTableSinkClear() { return } checkpointTs := t.tableSink.s.GetCheckpointTs() + t.tableSink.innerMu.Lock() if t.tableSink.checkpointTs.Less(checkpointTs) { t.tableSink.checkpointTs = checkpointTs } + t.tableSink.resolvedTs = checkpointTs + t.tableSink.advanced = time.Now() + t.tableSink.innerMu.Unlock() t.tableSink.s = nil t.tableSink.version = 0 - t.tableSink.advanced.Store(time.Now().Unix()) } // When the attached sink fail, there can be some events that have already been @@ -417,6 +419,25 @@ func (t *tableSinkWrapper) cleanRangeEventCounts(upperBound engine.Position, min return shouldClean } +func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint64) { + t.getCheckpointTs() + + t.tableSink.RLock() + defer t.tableSink.RUnlock() + t.tableSink.innerMu.Lock() + defer t.tableSink.innerMu.Unlock() + + // What these conditions mean: + // 1. the table sink has been associated with a valid sink; + // 2. its checkpoint hasn't been advanced for a while; + version := t.tableSink.version + advanced := t.tableSink.advanced + if version > 0 && time.Since(advanced) > stuckCheck { + return true, version + } + return false, uint64(0) +} + func handleRowChangedEvents( changefeed model.ChangeFeedID, span tablepb.Span, events ...*model.PolymorphicEvent, diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 3792a85f4ed..b10df73aad4 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -18,6 +18,7 @@ import ( "math" "sync" "testing" + "time" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/tiflow/pkg/spanz" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) type mockSink struct { @@ -314,6 +316,110 @@ func TestNewTableSinkWrapper(t *testing.T) { require.NotNil(t, wrapper) require.Equal(t, uint64(10), wrapper.getUpperBoundTs()) require.Equal(t, uint64(10), wrapper.getReceivedSorterResolvedTs()) - checkpointTs, _, _ := wrapper.getCheckpointTs() + checkpointTs := wrapper.getCheckpointTs() require.Equal(t, uint64(10), checkpointTs.ResolvedMark()) } + +func TestTableSinkWrapperSinkVersion(t *testing.T) { + t.Parallel() + + innerTableSink := tablesink.New[*model.RowChangedEvent]( + model.ChangeFeedID{}, tablepb.Span{}, model.Ts(0), + newMockSink(), &dmlsink.RowChangeEventAppender{}, + prometheus.NewCounter(prometheus.CounterOpts{}), + ) + version := new(uint64) + + wrapper := newTableSinkWrapper( + model.DefaultChangeFeedID("1"), + spanz.TableIDToComparableSpan(1), + func() (tablesink.TableSink, uint64) { return nil, 0 }, + tablepb.TableStatePrepared, + model.Ts(10), + model.Ts(20), + func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil }, + ) + + require.False(t, wrapper.initTableSink()) + + wrapper.tableSinkCreator = func() (tablesink.TableSink, uint64) { + *version += 1 + return innerTableSink, *version + } + + require.True(t, wrapper.initTableSink()) + require.Equal(t, wrapper.tableSink.version, uint64(1)) + + require.True(t, wrapper.asyncCloseTableSink()) + + wrapper.doTableSinkClear() + require.Nil(t, wrapper.tableSink.s) + require.Equal(t, wrapper.tableSink.version, uint64(0)) + + require.True(t, wrapper.initTableSink()) + require.Equal(t, wrapper.tableSink.version, uint64(2)) + + wrapper.closeTableSink() + + wrapper.doTableSinkClear() + require.Nil(t, wrapper.tableSink.s) + require.Equal(t, wrapper.tableSink.version, uint64(0)) +} + +func TestTableSinkWrapperSinkInner(t *testing.T) { + t.Parallel() + + innerTableSink := tablesink.New[*model.RowChangedEvent]( + model.ChangeFeedID{}, tablepb.Span{}, model.Ts(0), + newMockSink(), &dmlsink.RowChangeEventAppender{}, + prometheus.NewCounter(prometheus.CounterOpts{}), + ) + version := new(uint64) + + wrapper := newTableSinkWrapper( + model.DefaultChangeFeedID("1"), + spanz.TableIDToComparableSpan(1), + func() (tablesink.TableSink, uint64) { + *version += 1 + return innerTableSink, *version + }, + tablepb.TableStatePrepared, + oracle.GoTimeToTS(time.Now()), + oracle.GoTimeToTS(time.Now().Add(10000*time.Second)), + func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil }, + ) + + require.True(t, wrapper.initTableSink()) + + wrapper.closeAndClearTableSink() + + // Shouldn't be stuck because version is 0. + require.Equal(t, wrapper.tableSink.version, uint64(0)) + isStuck, _ := wrapper.sinkMaybeStuck(100 * time.Millisecond) + require.False(t, isStuck) + + // Shouldn't be stuck because tableSink.advanced is just updated. + require.True(t, wrapper.initTableSink()) + isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) + require.False(t, isStuck) + + // Shouldn't be stuck because upperbound hasn't been advanced. + time.Sleep(200 * time.Millisecond) + isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) + require.False(t, isStuck) + + // Shouldn't be stuck because `getCheckpointTs` will update tableSink.advanced. + nowTs := oracle.GoTimeToTS(time.Now()) + wrapper.updateReceivedSorterResolvedTs(nowTs) + wrapper.barrierTs.Store(nowTs) + isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) + require.False(t, isStuck) + + time.Sleep(200 * time.Millisecond) + nowTs = oracle.GoTimeToTS(time.Now()) + wrapper.updateReceivedSorterResolvedTs(nowTs) + wrapper.barrierTs.Store(nowTs) + wrapper.updateResolvedTs(model.NewResolvedTs(nowTs)) + isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond) + require.True(t, isStuck) +}