Skip to content

Commit

Permalink
sink(cdc): only check sink stuck for MQ sinks (#9742)
Browse files Browse the repository at this point in the history
close #9736
  • Loading branch information
hicqu authored Sep 14, 2023
1 parent a2819ad commit 141c9a7
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 15 deletions.
26 changes: 17 additions & 9 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,12 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) needsStuckCheck() bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ
}

func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
Expand Down Expand Up @@ -981,15 +987,17 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
}
stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second

isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", version))
if m.needsStuckCheck() {
isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", sinkVersion))
}
}

var resolvedTs model.Ts
Expand Down
15 changes: 15 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,18 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
log.Panic("must get an error instead of a timeout")
}
}

func TestSinkManagerNeedsStuckCheck(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

require.False(t, manager.needsStuckCheck())
}
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"go.uber.org/zap"
)

var version uint64 = 0
var tableSinkWrapperVersion uint64 = 0

// tableSinkWrapper is a wrapper of TableSink, it is used in SinkManager to manage TableSink.
// Because in the SinkManager, we write data to TableSink and RedoManager concurrently,
Expand Down Expand Up @@ -111,7 +111,7 @@ func newTableSinkWrapper(
genReplicateTs func(ctx context.Context) (model.Ts, error),
) *tableSinkWrapper {
res := &tableSinkWrapper{
version: atomic.AddUint64(&version, 1),
version: atomic.AddUint64(&tableSinkWrapperVersion, 1),
changefeed: changefeed,
span: span,
tableSinkCreator: tableSinkCreater,
Expand Down
32 changes: 30 additions & 2 deletions cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,29 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// Category is for different DML sink categories.
type Category = int

const (
// CategoryTxn is for Txn sink.
CategoryTxn Category = 1
// CategoryMQ is for MQ sink.
CategoryMQ = 2
// CategoryCloudStorage is for CloudStorage sink.
CategoryCloudStorage = 3
// CategoryBlackhole is for Blackhole sink.
CategoryBlackhole = 4
)

// SinkFactory is the factory of sink.
// It is responsible for creating sink and closing it.
// Because there is no way to convert the eventsink.EventSink[*model.RowChangedEvent]
// to eventsink.EventSink[eventsink.TableEvent].
// So we have to use this factory to create and store the sink.
type SinkFactory struct {
rowSink dmlsink.EventSink[*model.RowChangedEvent]
txnSink dmlsink.EventSink[*model.SingleTableTxn]
rowSink dmlsink.EventSink[*model.RowChangedEvent]
txnSink dmlsink.EventSink[*model.SingleTableTxn]
category Category
}

// New creates a new SinkFactory by schema.
Expand All @@ -71,6 +86,7 @@ func New(
return nil, err
}
s.txnSink = txnSink
s.category = CategoryTxn
case sink.KafkaScheme, sink.KafkaSSLScheme:
factoryCreator := kafka.NewSaramaFactory
if util.GetOrZero(cfg.Sink.EnableKafkaSinkV2) {
Expand All @@ -82,15 +98,18 @@ func New(
return nil, err
}
s.txnSink = mqs
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
storageSink, err := cloudstorage.NewDMLSink(ctx, changefeedID, sinkURI, cfg, errCh)
if err != nil {
return nil, err
}
s.txnSink = storageSink
s.category = CategoryCloudStorage
case sink.BlackHoleScheme:
bs := blackhole.NewDMLSink()
s.rowSink = bs
s.category = CategoryBlackhole
case sink.PulsarScheme:
mqs, err := mq.NewPulsarDMLSink(ctx, changefeedID, sinkURI, cfg, errCh,
manager.NewPulsarTopicManager,
Expand All @@ -99,6 +118,7 @@ func New(
return nil, err
}
s.txnSink = mqs
s.category = CategoryMQ
default:
return nil,
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema)
Expand Down Expand Up @@ -155,3 +175,11 @@ func (s *SinkFactory) Close() {
s.txnSink.Close()
}
}

// Category returns category of s.
func (s *SinkFactory) Category() Category {
if s.category == 0 {
panic("should never happen")
}
return s.category
}
5 changes: 3 additions & 2 deletions tests/integration_tests/hang_sink_suicide/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ function run() {
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
# TODO: update the case to use kafka sink instead of mysql sink.
# run $*
# check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit 141c9a7

Please sign in to comment.