Skip to content

Commit

Permalink
processor,sink(cdc): let sink report resolved ts and do not skip buff…
Browse files Browse the repository at this point in the history
…er sink flush (#3540)
  • Loading branch information
overvenus authored Nov 22, 2021
1 parent 9cec140 commit 20626ba
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 10 deletions.
14 changes: 9 additions & 5 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/redo"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/cdc/sink/common"
serverConfig "github.com/pingcap/ticdc/pkg/config"
Expand Down Expand Up @@ -89,10 +88,15 @@ func (t *tablePipelineImpl) ResolvedTs() model.Ts {
// will be able to cooperate replication status directly. Then we will add
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
if redo.IsConsistentEnabled(t.replConfig.Consistent.Level) {
return t.sinkNode.ResolvedTs()
}
return t.sorterNode.ResolvedTs()

// Always report resolved ts from sink for resolving #3503.
// TODO uncomment the following lines.
// if redo.IsConsistentEnabled(t.replConfig.Consistent.Level) {
// return t.sinkNode.ResolvedTs()
// }
// return t.sorterNode.ResolvedTs()

return t.sinkNode.ResolvedTs()
}

// CheckpointTs returns the checkpoint ts in this table pipeline
Expand Down
9 changes: 6 additions & 3 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) {
// NOTICE: Because all table sinks will try to flush backend sink,
// which will cause a lot of lock contention and blocking in high concurrency cases.
// So here we use flushing as a lightweight lock to improve the lock competition problem.
if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
return m.getCheckpointTs(), nil
}
//
// Do not skip flushing for resolving #3503.
// TODO uncomment the following return.
// if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
// return m.getCheckpointTs(), nil
// }
m.flushMu.Lock()
defer func() {
m.flushMu.Unlock()
Expand Down
26 changes: 24 additions & 2 deletions cdc/sink/table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/redo"
"go.uber.org/zap"
)

type tableSink struct {
Expand Down Expand Up @@ -55,6 +57,16 @@ func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
// is required to be no more than global resolvedTs, table barrierTs and table
// redo log watermarkTs.
func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
// Log abnormal checkpoint that is large than resolved ts.
logAbnormalCheckpoint := func(ckpt uint64) {
if ckpt > resolvedTs {
log.L().WithOptions(zap.AddCallerSkip(1)).
Warn("checkpoint ts > resolved ts, flushed more than emitted",
zap.Int64("tableID", t.tableID),
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("checkpointTs", ckpt))
}
}
i := sort.Search(len(t.buffer), func(i int) bool {
return t.buffer[i].CommitTs > resolvedTs
})
Expand All @@ -64,7 +76,12 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64
if err != nil {
return ckpt, err
}
return t.manager.flushBackendSink(ctx)
ckpt, err = t.manager.flushBackendSink(ctx)
if err != nil {
return ckpt, err
}
logAbnormalCheckpoint(ckpt)
return ckpt, err
}
resolvedRows := t.buffer[:i]
t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...)
Expand All @@ -78,7 +95,12 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64
if err != nil {
return ckpt, err
}
return t.manager.flushBackendSink(ctx)
ckpt, err = t.manager.flushBackendSink(ctx)
if err != nil {
return ckpt, err
}
logAbnormalCheckpoint(ckpt)
return ckpt, err
}

func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) {
Expand Down

0 comments on commit 20626ba

Please sign in to comment.