Skip to content

Commit

Permalink
sink(cdc): only check sink stuck for MQ sinks (pingcap#9742)
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu committed Sep 26, 2023
1 parent ea75bc5 commit eb670c0
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 15 deletions.
26 changes: 17 additions & 9 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) needsStuckCheck() bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ
}

func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
Expand Down Expand Up @@ -980,15 +986,17 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
}
stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second

isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", version))
if m.needsStuckCheck() {
isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", sinkVersion))
}
}

var resolvedTs model.Ts
Expand Down
15 changes: 15 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,18 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
log.Panic("must get an error instead of a timeout")
}
}

func TestSinkManagerNeedsStuckCheck(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

require.False(t, manager.needsStuckCheck())
}
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"go.uber.org/zap"
)

var version uint64 = 0
var tableSinkWrapperVersion uint64 = 0

// tableSinkWrapper is a wrapper of TableSink, it is used in SinkManager to manage TableSink.
// Because in the SinkManager, we write data to TableSink and RedoManager concurrently,
Expand Down Expand Up @@ -111,7 +111,7 @@ func newTableSinkWrapper(
genReplicateTs func(ctx context.Context) (model.Ts, error),
) *tableSinkWrapper {
res := &tableSinkWrapper{
version: atomic.AddUint64(&version, 1),
version: atomic.AddUint64(&tableSinkWrapperVersion, 1),
changefeed: changefeed,
span: span,
tableSinkCreater: tableSinkCreater,
Expand Down
31 changes: 29 additions & 2 deletions cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,29 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// Category is for different DML sink categories.
type Category = int

const (
// CategoryTxn is for Txn sink.
CategoryTxn Category = 1
// CategoryMQ is for MQ sink.
CategoryMQ = 2
// CategoryCloudStorage is for CloudStorage sink.
CategoryCloudStorage = 3
// CategoryBlackhole is for Blackhole sink.
CategoryBlackhole = 4
)

// SinkFactory is the factory of sink.
// It is responsible for creating sink and closing it.
// Because there is no way to convert the eventsink.EventSink[*model.RowChangedEvent]
// to eventsink.EventSink[eventsink.TableEvent].
// So we have to use this factory to create and store the sink.
type SinkFactory struct {
rowSink dmlsink.EventSink[*model.RowChangedEvent]
txnSink dmlsink.EventSink[*model.SingleTableTxn]
rowSink dmlsink.EventSink[*model.RowChangedEvent]
txnSink dmlsink.EventSink[*model.SingleTableTxn]
category Category
}

// New creates a new SinkFactory by schema.
Expand All @@ -67,6 +82,7 @@ func New(
return nil, err
}
s.txnSink = txnSink
s.category = CategoryTxn
case sink.KafkaScheme, sink.KafkaSSLScheme:
factoryCreator := kafka.NewSaramaFactory
if cfg.Sink.EnableKafkaSinkV2 {
Expand All @@ -78,15 +94,18 @@ func New(
return nil, err
}
s.txnSink = mqs
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
storageSink, err := cloudstorage.NewDMLSink(ctx, sinkURI, cfg, errCh)
if err != nil {
return nil, err
}
s.txnSink = storageSink
s.category = CategoryCloudStorage
case sink.BlackHoleScheme:
bs := blackhole.NewDMLSink()
s.rowSink = bs
s.category = CategoryBlackhole
default:
return nil,
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema)
Expand Down Expand Up @@ -143,3 +162,11 @@ func (s *SinkFactory) Close() {
s.txnSink.Close()
}
}

// Category returns category of s.
func (s *SinkFactory) Category() Category {
if s.category == 0 {
panic("should never happen")
}
return s.category
}
5 changes: 3 additions & 2 deletions tests/integration_tests/hang_sink_suicide/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ function run() {
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
# TODO: update the case to use kafka sink instead of mysql sink.
# run $*
# check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"

0 comments on commit eb670c0

Please sign in to comment.