Skip to content

Commit

Permalink
redo(ticdc): owner and processores shouldn't share one redo log writer (
Browse files Browse the repository at this point in the history
#6671) (#6729)

close #6695
  • Loading branch information
ti-chi-bot authored Aug 17, 2022
1 parent 5e2fdba commit e0bc0f7
Show file tree
Hide file tree
Showing 11 changed files with 440 additions and 314 deletions.
28 changes: 20 additions & 8 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
zap.Uint64("flushedResolvedTs", flushedResolvedTs),
zap.Uint64("flushedCheckpointTs", flushedCheckpointTs),
zap.Uint64("newResolvedTs", newResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs))
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))
if flushedResolvedTs != 0 {
// It's not necessary to replace newCheckpointTs with flushedResolvedTs,
// as cdc can ensure newCheckpointTs can never exceed prevResolvedTs.
Expand All @@ -310,7 +312,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
log.Debug("owner prepares to update status",
zap.Uint64("prevResolvedTs", prevResolvedTs),
zap.Uint64("newResolvedTs", newResolvedTs),
zap.Uint64("newCheckpointTs", newCheckpointTs))
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))
// resolvedTs should never regress but checkpointTs can, as checkpointTs has already
// been decreased when the owner is initialized.
if newResolvedTs < prevResolvedTs {
Expand Down Expand Up @@ -415,12 +419,15 @@ LOOP:
}()

stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
redoManagerOpts := &redo.ManagerOptions{EnableBgRunner: true}
redoManager, err := redo.NewManager(stdCtx, c.state.Info.Config.Consistent, redoManagerOpts)
redoManagerOpts := redo.NewOwnerManagerOptions(c.errCh)
mgr, err := redo.NewManager(stdCtx, c.state.Info.Config.Consistent, redoManagerOpts)
c.redoManager = mgr
if err != nil {
return err
}
c.redoManager = redoManager
log.Info("owner creates redo manager",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))

// init metrics
c.metricsChangefeedBarrierTsGauge = changefeedBarrierTsGauge.
Expand Down Expand Up @@ -629,6 +636,8 @@ func (c *changefeed) asyncExecDDLJob(ctx cdcContext.Context,
zap.Reflect("job", job), zap.Error(err))
return false, errors.Trace(err)
}
c.ddlEventCache = ddlEvents

// We can't use the latest schema directly,
// we need to make sure we receive the ddl before we start or stop broadcasting checkpoint ts.
// So let's remember the name of the table before processing and cache the DDL.
Expand All @@ -640,9 +649,12 @@ func (c *changefeed) asyncExecDDLJob(ctx cdcContext.Context,
if err != nil {
return false, errors.Trace(err)
}
c.ddlEventCache = ddlEvents
for _, ddlEvent := range ddlEvents {
if c.redoManager.Enabled() {

if c.redoManager.Enabled() {
for _, ddlEvent := range c.ddlEventCache {
// FIXME: seems it's not necessary to emit DDL to redo storage,
// because for a given redo meta with range (checkpointTs, resolvedTs],
// there must be no pending DDLs not flushed into DDL sink.
err = c.redoManager.EmitDDLEvent(ctx, ddlEvent)
if err != nil {
return false, err
Expand Down
9 changes: 3 additions & 6 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,17 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
resolved = model.NewResolvedTs(currentBarrierTs)
}
if n.redoManager != nil && n.redoManager.Enabled() {
redoTs := n.redoManager.GetMinResolvedTs()
redoFlushed := n.redoManager.GetResolvedTs(n.tableID)
if currentBarrierTs > redoTs {
if currentBarrierTs > redoFlushed {
// NOTE: How can barrierTs be greater than rodoTs?
// When scheduler moves a table from one place to another place, the table
// start position will be checkpointTs instead of resolvedTs, which means
// redoTs can be less than barrierTs.
log.Info("redoTs is less than current barrierTs",
log.Info("redo flushedTs is less than current barrierTs",
zap.Int64("tableID", n.tableID),
zap.Uint64("redoTs", redoTs),
zap.Uint64("barrierTs", currentBarrierTs),
zap.Uint64("tableRedoFlushed", redoFlushed))
}
if currentBarrierTs > redoFlushed {

// The latest above comment shows why barrierTs can be greater than redoTs.
// So here the aim is to avoid slow tables holding back the whole processor.
resolved = model.NewResolvedTs(redoFlushed)
Expand Down
10 changes: 9 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,11 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
p.doGCSchemaStorage(ctx)
p.metricSyncTableNumGauge.Set(float64(len(p.tables)))

if p.redoManager != nil && p.redoManager.Enabled() {
ckpt := p.changefeed.Status.CheckpointTs
p.redoManager.UpdateCheckpointTs(ckpt)
}

if err := p.agent.Tick(ctx); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -499,11 +504,14 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
log.Info("processor try new sink success",
zap.Duration("duration", time.Since(start)))

redoManagerOpts := &redo.ManagerOptions{EnableBgRunner: true, ErrCh: errCh}
redoManagerOpts := redo.NewProcessorManagerOptions(errCh)
p.redoManager, err = redo.NewManager(stdCtx, p.changefeed.Info.Config.Consistent, redoManagerOpts)
if err != nil {
return err
}
log.Info("processor creates redo manager",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID))

p.agent, err = p.newAgent(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit e0bc0f7

Please sign in to comment.