Skip to content

Commit

Permalink
sink(ticdc): translate INSERT automatically when mysql sink disables …
Browse files Browse the repository at this point in the history
…safe mode (#6278) (#6634)

close #3589, close #5611
  • Loading branch information
ti-chi-bot authored Aug 17, 2022
1 parent e0bc0f7 commit aed5d87
Show file tree
Hide file tree
Showing 12 changed files with 559 additions and 55 deletions.
2 changes: 2 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ type RowChangedEvent struct {

// SplitTxn marks this RowChangedEvent as the first line of a new txn.
SplitTxn bool `json:"-" msg:"-"`
// ReplicatingTs is ts when a table starts replicating events to downstream.
ReplicatingTs Ts `json:"-" msg:"-"`
}

// IsDelete returns true if the row is a delete event
Expand Down
1 change: 0 additions & 1 deletion cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
if err := n.verifySplitTxn(event); err != nil {
return false, errors.Trace(err)
}

if event.IsResolved() {
if n.status.Load() == TableStatusInitializing {
n.status.Store(TableStatusRunning)
Expand Down
39 changes: 31 additions & 8 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pipeline"
pmessage "github.com/pingcap/tiflow/pkg/pipeline/message"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -43,10 +45,12 @@ const (
)

type sorterNode struct {
sorter sorter.EventSorter
changefeed model.ChangeFeedID
pdClient pd.Client
sorter sorter.EventSorter

tableID model.TableID
tableName string // quoted schema and table, used in metircs only
tableName string // quoted schema and table, used in metrics only

// for per-table flow control
flowController tableFlowController
Expand All @@ -72,6 +76,8 @@ func newSorterNode(
tableName string, tableID model.TableID, startTs model.Ts,
flowController tableFlowController, mounter entry.Mounter,
replConfig *config.ReplicaConfig,
changefeed model.ChangeFeedID,
pdClient pd.Client,
) *sorterNode {
return &sorterNode{
tableName: tableName,
Expand All @@ -81,12 +87,18 @@ func newSorterNode(
resolvedTs: startTs,
barrierTs: startTs,
replConfig: replConfig,
pdClient: pdClient,
changefeed: changefeed,
}
}

func (n *sorterNode) Init(ctx pipeline.NodeContext) error {
wg := errgroup.Group{}
return n.start(ctx, false, &wg, 0, nil)
sorter, err := createSorter(ctx, n.tableName, n.tableID)
if err != nil {
return errors.Trace(err)
}
return n.start(ctx, false, &wg, 0, nil, sorter)
}

func createSorter(ctx pipeline.NodeContext, tableName string, tableID model.TableID) (sorter.EventSorter, error) {
Expand Down Expand Up @@ -133,17 +145,13 @@ func createSorter(ctx pipeline.NodeContext, tableName string, tableID model.Tabl
func (n *sorterNode) start(
ctx pipeline.NodeContext, isTableActorMode bool, eg *errgroup.Group,
tableActorID actor.ID, tableActorRouter *actor.Router[pmessage.Message],
eventSorter sorter.EventSorter,
) error {
n.isTableActorMode = isTableActorMode
n.eg = eg
stdCtx, cancel := context.WithCancel(ctx)
n.cancel = cancel

eventSorter, err := createSorter(ctx, n.tableName, n.tableID)
if err != nil {
return errors.Trace(err)
}

failpoint.Inject("ProcessorAddTableError", func() {
failpoint.Return(errors.New("processor add table injected error"))
})
Expand Down Expand Up @@ -175,6 +183,18 @@ func (n *sorterNode) start(
}
}

phy, logic, err := n.pdClient.GetTS(ctx)
if err != nil {
return errors.Trace(err)
}
replicateTs := oracle.ComposeTS(phy, logic)
log.Info("table is replicating",
zap.Int64("tableID", n.tableID),
zap.String("tableName", n.tableName),
zap.Uint64("replicateTs", replicateTs),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID))

for {
// We must call `sorter.Output` before receiving resolved events.
// Skip calling `sorter.Output` and caching output channel may fail
Expand Down Expand Up @@ -205,6 +225,9 @@ func (n *sorterNode) start(
resolvedTsInterpolateFunc(commitTs)
}

// For all rows, we add table replicate ts, so mysql sink can
// determine when to turn off safe-mode.
msg.Row.ReplicatingTs = replicateTs
// We calculate memory consumption by RowChangedEvent size.
// It's much larger than RawKVEntry.
size := uint64(msg.Row.ApproximateBytes())
Expand Down
97 changes: 95 additions & 2 deletions cdc/processor/pipeline/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,25 @@ package pipeline

import (
"context"
"math"
"strings"
"testing"

"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/sorter"
"github.com/pingcap/tiflow/cdc/sorter/memory"
"github.com/pingcap/tiflow/cdc/sorter/unified"
"github.com/pingcap/tiflow/pkg/actor"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/pipeline"
pmessage "github.com/pingcap/tiflow/pkg/pipeline/message"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"golang.org/x/sync/errgroup"
)

func TestUnifiedSorterFileLockConflict(t *testing.T) {
Expand Down Expand Up @@ -59,7 +65,7 @@ func TestSorterResolvedTs(t *testing.T) {
t.Parallel()
sn := newSorterNode("tableName", 1, 1, nil, nil, &config.ReplicaConfig{
Consistent: &config.ConsistentConfig{},
})
}, model.DefaultChangeFeedID("changefeed-id-test"), &mockPD{})
sn.sorter = memory.NewEntrySorter()
require.EqualValues(t, 1, sn.ResolvedTs())
nctx := pipeline.NewNodeContext(
Expand Down Expand Up @@ -101,13 +107,100 @@ func (c *checkSorter) Output() <-chan *model.PolymorphicEvent {
return c.ch
}

type mockPD struct {
pd.Client
ts int64
}

func (p *mockPD) GetTS(ctx context.Context) (int64, int64, error) {
if p.ts != 0 {
return p.ts, p.ts, nil
}
return math.MaxInt64, math.MaxInt64, nil
}

type mockSorter struct {
sorter.EventSorter

outCh chan *model.PolymorphicEvent
expectStartTs model.Ts
}

func (s *mockSorter) EmitStartTs(ctx context.Context, ts model.Ts) {
if ts != s.expectStartTs {
panic(ts)
}
}

func (s *mockSorter) Output() <-chan *model.PolymorphicEvent {
return s.outCh
}

func (s *mockSorter) Run(ctx context.Context) error {
return nil
}

type mockMounter struct {
entry.Mounter
}

func (mockMounter) DecodeEvent(ctx context.Context, event *model.PolymorphicEvent) error {
return nil
}

func TestSorterReplicateTs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

p := &mockPD{ts: 1}
ts := oracle.ComposeTS(1, 1)
sn := newSorterNode("tableName", 1, 1, &mockFlowController{}, mockMounter{},
&config.ReplicaConfig{
Consistent: &config.ConsistentConfig{},
}, model.DefaultChangeFeedID("changefeed-id-test"), p)
sn.sorter = memory.NewEntrySorter()

require.Equal(t, model.Ts(1), sn.ResolvedTs())

eg := &errgroup.Group{}
router := actor.NewRouter[pmessage.Message](t.Name())
ctx1 := newContext(
ctx, t.Name(), router, 1, &cdcContext.ChangefeedVars{},
&cdcContext.GlobalVars{}, func(err error) {})
s := &mockSorter{
outCh: make(chan *model.PolymorphicEvent, 1),
expectStartTs: 1,
}
sn.start(ctx1, true, eg, 1, router, s)

s.outCh <- &model.PolymorphicEvent{
CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{
Table: &model.TableName{},
CommitTs: 1,
Columns: []*model.Column{
{
Name: "col1",
Flag: model.BinaryFlag,
Value: "col1-value-updated",
},
},
},
}

outM := <-ctx1.outputCh
require.EqualValues(t, ts, outM.PolymorphicEvent.Row.ReplicatingTs)

cancel()
eg.Wait()
}

func TestSorterResolvedTsLessEqualBarrierTs(t *testing.T) {
t.Parallel()
sch := make(chan *model.PolymorphicEvent, 1)
s := &checkSorter{ch: sch}
sn := newSorterNode("tableName", 1, 1, nil, nil, &config.ReplicaConfig{
Consistent: &config.ConsistentConfig{},
})
}, model.DefaultChangeFeedID("changefeed-id-test"), &mockPD{})
sn.sorter = s

ch := make(chan pmessage.Message, 1)
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func NewTablePipeline(ctx cdcContext.Context,

p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize)
sorterNode := newSorterNode(tableName, tableID, replicaInfo.StartTs,
flowController, mounter, replConfig)
flowController, mounter, replConfig, changefeed, upstream.PDClient)
sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs,
targetTs, flowController, splitTxn, redoManager)

Expand Down
8 changes: 6 additions & 2 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
t.redoManager.Enabled(), splitTxn)
sorterNode := newSorterNode(t.tableName, t.tableID,
t.replicaInfo.StartTs, flowController,
t.mounter, t.replicaConfig,
t.mounter, t.replicaConfig, t.changefeedID, t.upStream.PDClient,
)
t.sortNode = sorterNode
sortActorNodeContext := newContext(sdtTableContext, t.tableName,
Expand Down Expand Up @@ -529,5 +529,9 @@ var startPuller = func(t *tableActor, ctx *actorNodeContext) error {
}

var startSorter = func(t *tableActor, ctx *actorNodeContext) error {
return t.sortNode.start(ctx, true, t.wg, t.actorID, t.router)
eventSorter, err := createSorter(ctx, t.tableName, t.tableID)
if err != nil {
return errors.Trace(err)
}
return t.sortNode.start(ctx, true, t.wg, t.actorID, t.router, eventSorter)
}
10 changes: 8 additions & 2 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
serverConfig "github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
pmessage "github.com/pingcap/tiflow/pkg/pipeline/message"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
Expand All @@ -50,6 +51,7 @@ func TestAsyncStopFailed(t *testing.T) {
cancel: func() {},
reportErr: func(err error) {},
redoManager: redo.NewDisabledManager(),
upStream: upstream.NewUpstream4Test(&mockPD{}),
}
tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, false, tbl.redoManager)
require.True(t, tbl.AsyncStop(1))
Expand Down Expand Up @@ -77,6 +79,7 @@ func TestTableActorInterface(t *testing.T) {
Level: "node",
},
},
upStream: upstream.NewUpstream4Test(&mockPD{}),
}
tableID, markID := tbl.ID()
require.Equal(t, int64(1), tableID)
Expand Down Expand Up @@ -119,6 +122,7 @@ func TestTableActorCancel(t *testing.T) {
router: tableActorRouter,
cancel: func() {},
reportErr: func(err error) {},
upStream: upstream.NewUpstream4Test(&mockPD{}),
}
mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0)
tbl.actorID = actor.ID(1)
Expand Down Expand Up @@ -364,7 +368,7 @@ func TestNewTableActor(t *testing.T) {
startSorter = func(t *tableActor, ctx *actorNodeContext) error {
return nil
}
tbl, err := NewTableActor(cctx, nil, nil, 1, "t1",
tbl, err := NewTableActor(cctx, upstream.NewUpstream4Test(&mockPD{}), nil, 1, "t1",
&model.TableReplicaInfo{
StartTs: 0,
MarkTableID: 1,
Expand All @@ -380,7 +384,7 @@ func TestNewTableActor(t *testing.T) {
return errors.New("failed to start puller")
}

tbl, err = NewTableActor(cctx, nil, nil, 1, "t1",
tbl, err = NewTableActor(cctx, upstream.NewUpstream4Test(&mockPD{}), nil, 1, "t1",
&model.TableReplicaInfo{
StartTs: 0,
MarkTableID: 1,
Expand Down Expand Up @@ -426,6 +430,7 @@ func TestTableActorStart(t *testing.T) {
MarkTableID: 1,
},
replicaConfig: config.GetDefaultReplicaConfig(),
upStream: upstream.NewUpstream4Test(&mockPD{}),
}
require.Nil(t, tbl.start(ctx))
require.Equal(t, 1, len(tbl.nodes))
Expand All @@ -445,6 +450,7 @@ func TestTableActorStart(t *testing.T) {
MarkTableID: 1,
},
replicaConfig: config.GetDefaultReplicaConfig(),
upStream: upstream.NewUpstream4Test(&mockPD{}),
}
tbl.cyclicEnabled = true
require.Nil(t, tbl.start(ctx))
Expand Down
Loading

0 comments on commit aed5d87

Please sign in to comment.