From aed5d87ea382e0dc80cb11e5274639dd4fe4940f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 17 Aug 2022 18:50:51 +0800 Subject: [PATCH] sink(ticdc): translate INSERT automatically when mysql sink disables safe mode (#6278) (#6634) close pingcap/tiflow#3589, close pingcap/tiflow#5611 --- cdc/model/sink.go | 2 + cdc/processor/pipeline/sink.go | 1 - cdc/processor/pipeline/sorter.go | 39 ++- cdc/processor/pipeline/sorter_test.go | 97 +++++- cdc/processor/pipeline/table.go | 2 +- cdc/processor/pipeline/table_actor.go | 8 +- cdc/processor/pipeline/table_actor_test.go | 10 +- cdc/sink/mysql/mysql.go | 84 +++-- cdc/sink/mysql/mysql_test.go | 351 ++++++++++++++++++++- cdc/sink/mysql/txns_heap.go | 12 +- dm/pkg/retry/errors.go | 1 + tests/integration_tests/sink_retry/run.sh | 7 +- 12 files changed, 559 insertions(+), 55 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 4d9190b4e8d..418741149b4 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -270,6 +270,8 @@ type RowChangedEvent struct { // SplitTxn marks this RowChangedEvent as the first line of a new txn. SplitTxn bool `json:"-" msg:"-"` + // ReplicatingTs is ts when a table starts replicating events to downstream. + ReplicatingTs Ts `json:"-" msg:"-"` } // IsDelete returns true if the row is a delete event diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 954271480a5..9b0b3514178 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -359,7 +359,6 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo if err := n.verifySplitTxn(event); err != nil { return false, errors.Trace(err) } - if event.IsResolved() { if n.status.Load() == TableStatusInitializing { n.status.Store(TableStatusRunning) diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index dde89e8d13e..02e175ee140 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -34,6 +34,8 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/pipeline" pmessage "github.com/pingcap/tiflow/pkg/pipeline/message" + "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -43,10 +45,12 @@ const ( ) type sorterNode struct { - sorter sorter.EventSorter + changefeed model.ChangeFeedID + pdClient pd.Client + sorter sorter.EventSorter tableID model.TableID - tableName string // quoted schema and table, used in metircs only + tableName string // quoted schema and table, used in metrics only // for per-table flow control flowController tableFlowController @@ -72,6 +76,8 @@ func newSorterNode( tableName string, tableID model.TableID, startTs model.Ts, flowController tableFlowController, mounter entry.Mounter, replConfig *config.ReplicaConfig, + changefeed model.ChangeFeedID, + pdClient pd.Client, ) *sorterNode { return &sorterNode{ tableName: tableName, @@ -81,12 +87,18 @@ func newSorterNode( resolvedTs: startTs, barrierTs: startTs, replConfig: replConfig, + pdClient: pdClient, + changefeed: changefeed, } } func (n *sorterNode) Init(ctx pipeline.NodeContext) error { wg := errgroup.Group{} - return n.start(ctx, false, &wg, 0, nil) + sorter, err := createSorter(ctx, n.tableName, n.tableID) + if err != nil { + return errors.Trace(err) + } + return n.start(ctx, false, &wg, 0, nil, sorter) } func createSorter(ctx pipeline.NodeContext, tableName string, tableID model.TableID) (sorter.EventSorter, error) { @@ -133,17 +145,13 @@ func createSorter(ctx pipeline.NodeContext, tableName string, tableID model.Tabl func (n *sorterNode) start( ctx pipeline.NodeContext, isTableActorMode bool, eg *errgroup.Group, tableActorID actor.ID, tableActorRouter *actor.Router[pmessage.Message], + eventSorter sorter.EventSorter, ) error { n.isTableActorMode = isTableActorMode n.eg = eg stdCtx, cancel := context.WithCancel(ctx) n.cancel = cancel - eventSorter, err := createSorter(ctx, n.tableName, n.tableID) - if err != nil { - return errors.Trace(err) - } - failpoint.Inject("ProcessorAddTableError", func() { failpoint.Return(errors.New("processor add table injected error")) }) @@ -175,6 +183,18 @@ func (n *sorterNode) start( } } + phy, logic, err := n.pdClient.GetTS(ctx) + if err != nil { + return errors.Trace(err) + } + replicateTs := oracle.ComposeTS(phy, logic) + log.Info("table is replicating", + zap.Int64("tableID", n.tableID), + zap.String("tableName", n.tableName), + zap.Uint64("replicateTs", replicateTs), + zap.String("namespace", n.changefeed.Namespace), + zap.String("changefeed", n.changefeed.ID)) + for { // We must call `sorter.Output` before receiving resolved events. // Skip calling `sorter.Output` and caching output channel may fail @@ -205,6 +225,9 @@ func (n *sorterNode) start( resolvedTsInterpolateFunc(commitTs) } + // For all rows, we add table replicate ts, so mysql sink can + // determine when to turn off safe-mode. + msg.Row.ReplicatingTs = replicateTs // We calculate memory consumption by RowChangedEvent size. // It's much larger than RawKVEntry. size := uint64(msg.Row.ApproximateBytes()) diff --git a/cdc/processor/pipeline/sorter_test.go b/cdc/processor/pipeline/sorter_test.go index f669fda304a..9f9b6c31896 100644 --- a/cdc/processor/pipeline/sorter_test.go +++ b/cdc/processor/pipeline/sorter_test.go @@ -15,19 +15,25 @@ package pipeline import ( "context" + "math" "strings" "testing" + "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/sorter" "github.com/pingcap/tiflow/cdc/sorter/memory" "github.com/pingcap/tiflow/cdc/sorter/unified" + "github.com/pingcap/tiflow/pkg/actor" "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" "github.com/pingcap/tiflow/pkg/pipeline" pmessage "github.com/pingcap/tiflow/pkg/pipeline/message" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" + pd "github.com/tikv/pd/client" + "golang.org/x/sync/errgroup" ) func TestUnifiedSorterFileLockConflict(t *testing.T) { @@ -59,7 +65,7 @@ func TestSorterResolvedTs(t *testing.T) { t.Parallel() sn := newSorterNode("tableName", 1, 1, nil, nil, &config.ReplicaConfig{ Consistent: &config.ConsistentConfig{}, - }) + }, model.DefaultChangeFeedID("changefeed-id-test"), &mockPD{}) sn.sorter = memory.NewEntrySorter() require.EqualValues(t, 1, sn.ResolvedTs()) nctx := pipeline.NewNodeContext( @@ -101,13 +107,100 @@ func (c *checkSorter) Output() <-chan *model.PolymorphicEvent { return c.ch } +type mockPD struct { + pd.Client + ts int64 +} + +func (p *mockPD) GetTS(ctx context.Context) (int64, int64, error) { + if p.ts != 0 { + return p.ts, p.ts, nil + } + return math.MaxInt64, math.MaxInt64, nil +} + +type mockSorter struct { + sorter.EventSorter + + outCh chan *model.PolymorphicEvent + expectStartTs model.Ts +} + +func (s *mockSorter) EmitStartTs(ctx context.Context, ts model.Ts) { + if ts != s.expectStartTs { + panic(ts) + } +} + +func (s *mockSorter) Output() <-chan *model.PolymorphicEvent { + return s.outCh +} + +func (s *mockSorter) Run(ctx context.Context) error { + return nil +} + +type mockMounter struct { + entry.Mounter +} + +func (mockMounter) DecodeEvent(ctx context.Context, event *model.PolymorphicEvent) error { + return nil +} + +func TestSorterReplicateTs(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + p := &mockPD{ts: 1} + ts := oracle.ComposeTS(1, 1) + sn := newSorterNode("tableName", 1, 1, &mockFlowController{}, mockMounter{}, + &config.ReplicaConfig{ + Consistent: &config.ConsistentConfig{}, + }, model.DefaultChangeFeedID("changefeed-id-test"), p) + sn.sorter = memory.NewEntrySorter() + + require.Equal(t, model.Ts(1), sn.ResolvedTs()) + + eg := &errgroup.Group{} + router := actor.NewRouter[pmessage.Message](t.Name()) + ctx1 := newContext( + ctx, t.Name(), router, 1, &cdcContext.ChangefeedVars{}, + &cdcContext.GlobalVars{}, func(err error) {}) + s := &mockSorter{ + outCh: make(chan *model.PolymorphicEvent, 1), + expectStartTs: 1, + } + sn.start(ctx1, true, eg, 1, router, s) + + s.outCh <- &model.PolymorphicEvent{ + CRTs: 1, RawKV: &model.RawKVEntry{OpType: model.OpTypePut}, Row: &model.RowChangedEvent{ + Table: &model.TableName{}, + CommitTs: 1, + Columns: []*model.Column{ + { + Name: "col1", + Flag: model.BinaryFlag, + Value: "col1-value-updated", + }, + }, + }, + } + + outM := <-ctx1.outputCh + require.EqualValues(t, ts, outM.PolymorphicEvent.Row.ReplicatingTs) + + cancel() + eg.Wait() +} + func TestSorterResolvedTsLessEqualBarrierTs(t *testing.T) { t.Parallel() sch := make(chan *model.PolymorphicEvent, 1) s := &checkSorter{ch: sch} sn := newSorterNode("tableName", 1, 1, nil, nil, &config.ReplicaConfig{ Consistent: &config.ConsistentConfig{}, - }) + }, model.DefaultChangeFeedID("changefeed-id-test"), &mockPD{}) sn.sorter = s ch := make(chan pmessage.Message, 1) diff --git a/cdc/processor/pipeline/table.go b/cdc/processor/pipeline/table.go index 0a84d1c8291..bc27c1856e3 100644 --- a/cdc/processor/pipeline/table.go +++ b/cdc/processor/pipeline/table.go @@ -219,7 +219,7 @@ func NewTablePipeline(ctx cdcContext.Context, p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize) sorterNode := newSorterNode(tableName, tableID, replicaInfo.StartTs, - flowController, mounter, replConfig) + flowController, mounter, replConfig, changefeed, upstream.PDClient) sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController, splitTxn, redoManager) diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index e4387d56529..b228934ff56 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -288,7 +288,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error { t.redoManager.Enabled(), splitTxn) sorterNode := newSorterNode(t.tableName, t.tableID, t.replicaInfo.StartTs, flowController, - t.mounter, t.replicaConfig, + t.mounter, t.replicaConfig, t.changefeedID, t.upStream.PDClient, ) t.sortNode = sorterNode sortActorNodeContext := newContext(sdtTableContext, t.tableName, @@ -529,5 +529,9 @@ var startPuller = func(t *tableActor, ctx *actorNodeContext) error { } var startSorter = func(t *tableActor, ctx *actorNodeContext) error { - return t.sortNode.start(ctx, true, t.wg, t.actorID, t.router) + eventSorter, err := createSorter(ctx, t.tableName, t.tableID) + if err != nil { + return errors.Trace(err) + } + return t.sortNode.start(ctx, true, t.wg, t.actorID, t.router, eventSorter) } diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index 3f4c8a13984..6961ec513b9 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -29,6 +29,7 @@ import ( serverConfig "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" pmessage "github.com/pingcap/tiflow/pkg/pipeline/message" + "github.com/pingcap/tiflow/pkg/upstream" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -50,6 +51,7 @@ func TestAsyncStopFailed(t *testing.T) { cancel: func() {}, reportErr: func(err error) {}, redoManager: redo.NewDisabledManager(), + upStream: upstream.NewUpstream4Test(&mockPD{}), } tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, false, tbl.redoManager) require.True(t, tbl.AsyncStop(1)) @@ -77,6 +79,7 @@ func TestTableActorInterface(t *testing.T) { Level: "node", }, }, + upStream: upstream.NewUpstream4Test(&mockPD{}), } tableID, markID := tbl.ID() require.Equal(t, int64(1), tableID) @@ -119,6 +122,7 @@ func TestTableActorCancel(t *testing.T) { router: tableActorRouter, cancel: func() {}, reportErr: func(err error) {}, + upStream: upstream.NewUpstream4Test(&mockPD{}), } mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0) tbl.actorID = actor.ID(1) @@ -364,7 +368,7 @@ func TestNewTableActor(t *testing.T) { startSorter = func(t *tableActor, ctx *actorNodeContext) error { return nil } - tbl, err := NewTableActor(cctx, nil, nil, 1, "t1", + tbl, err := NewTableActor(cctx, upstream.NewUpstream4Test(&mockPD{}), nil, 1, "t1", &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, @@ -380,7 +384,7 @@ func TestNewTableActor(t *testing.T) { return errors.New("failed to start puller") } - tbl, err = NewTableActor(cctx, nil, nil, 1, "t1", + tbl, err = NewTableActor(cctx, upstream.NewUpstream4Test(&mockPD{}), nil, 1, "t1", &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, @@ -426,6 +430,7 @@ func TestTableActorStart(t *testing.T) { MarkTableID: 1, }, replicaConfig: config.GetDefaultReplicaConfig(), + upStream: upstream.NewUpstream4Test(&mockPD{}), } require.Nil(t, tbl.start(ctx)) require.Equal(t, 1, len(tbl.nodes)) @@ -445,6 +450,7 @@ func TestTableActorStart(t *testing.T) { MarkTableID: 1, }, replicaConfig: config.GetDefaultReplicaConfig(), + upStream: upstream.NewUpstream4Test(&mockPD{}), } tbl.cyclicEnabled = true require.Nil(t, tbl.start(ctx)) diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index f42f698b9ba..35adb3ba082 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -16,6 +16,7 @@ package mysql import ( "context" "database/sql" + "database/sql/driver" "fmt" "net" "net/url" @@ -30,9 +31,14 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/parser/charset" timodel "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/util/dbutil" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" + "go.uber.org/zap" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/metrics" + dmretry "github.com/pingcap/tiflow/dm/pkg/retry" dmutils "github.com/pingcap/tiflow/dm/pkg/utils" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/cyclic" @@ -43,9 +49,6 @@ import ( "github.com/pingcap/tiflow/pkg/notify" "github.com/pingcap/tiflow/pkg/quotes" "github.com/pingcap/tiflow/pkg/retry" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/atomic" - "go.uber.org/zap" ) const ( @@ -610,9 +613,21 @@ func (s *mysqlSink) getTableResolvedTs(tableID model.TableID) (model.ResolvedTs, return resolved, ok } -func logDMLTxnErr(err error) error { +func logDMLTxnErr( + err error, start time.Time, changefeed model.ChangeFeedID, query string, count int, +) error { if isRetryableDMLError(err) { - log.Warn("execute DMLs with error, retry later", zap.Error(err)) + log.Warn("execute DMLs with error, retry later", + zap.Error(err), zap.Duration("duration", time.Since(start)), + zap.String("query", query), zap.Int("count", count), + zap.String("namespace", changefeed.Namespace), + zap.String("changefeed", changefeed.ID)) + } else { + log.Error("execute DMLs with error, can not retry", + zap.Error(err), zap.Duration("duration", time.Since(start)), + zap.String("query", query), zap.Int("count", count), + zap.String("namespace", changefeed.Namespace), + zap.String("changefeed", changefeed.ID)) } return err } @@ -621,17 +636,12 @@ func isRetryableDMLError(err error) bool { if !cerror.IsRetryableError(err) { return false } - - errCode, ok := getSQLErrCode(err) - if !ok { + // Check if the error is connection errors that can retry safely. + if dmretry.IsConnectionError(err) { return true } - - switch errCode { - case mysql.ErrNoSuchTable, mysql.ErrBadDB: - return false - } - return true + // Check if the error is an retriable TiDB error or MySQL error. + return dbutil.IsRetryableError(err) } func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDMLs, bucket int) error { @@ -641,18 +651,23 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML zap.Any("values", dmls.values)) } + start := time.Now() return retry.Do(ctx, func() error { failpoint.Inject("MySQLSinkTxnRandomError", func() { - failpoint.Return(logDMLTxnErr(errors.Trace(dmysql.ErrInvalidConn))) + failpoint.Return( + logDMLTxnErr( + errors.Trace(driver.ErrBadConn), + start, s.params.changefeedID, "failpoint", 0)) }) failpoint.Inject("MySQLSinkHangLongTime", func() { time.Sleep(time.Hour) }) - err := s.statistics.RecordBatchExecution(func() (int, error) { tx, err := s.db.BeginTx(ctx, nil) if err != nil { - return 0, logDMLTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) + return 0, logDMLTxnErr( + cerror.WrapError(cerror.ErrMySQLTxnError, err), + start, s.params.changefeedID, "BEGIN", dmls.rowCount) } for i, query := range dmls.sqls { @@ -661,8 +676,13 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML if _, err := tx.ExecContext(ctx, query, args...); err != nil { if rbErr := tx.Rollback(); rbErr != nil { log.Warn("failed to rollback txn", zap.Error(err)) + _ = logDMLTxnErr( + cerror.WrapError(cerror.ErrMySQLTxnError, err), + start, s.params.changefeedID, query, dmls.rowCount) } - return 0, logDMLTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) + return 0, logDMLTxnErr( + cerror.WrapError(cerror.ErrMySQLTxnError, err), + start, s.params.changefeedID, query, dmls.rowCount) } } @@ -672,12 +692,16 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML if rbErr := tx.Rollback(); rbErr != nil { log.Warn("failed to rollback txn", zap.Error(err)) } - return 0, logDMLTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) + return 0, logDMLTxnErr( + cerror.WrapError(cerror.ErrMySQLTxnError, err), + start, s.params.changefeedID, dmls.markSQL, dmls.rowCount) } } if err = tx.Commit(); err != nil { - return 0, logDMLTxnErr(cerror.WrapError(cerror.ErrMySQLTxnError, err)) + return 0, logDMLTxnErr( + cerror.WrapError(cerror.ErrMySQLTxnError, err), + start, s.params.changefeedID, "COMMIT", dmls.rowCount) } return dmls.rowCount, nil }) @@ -711,6 +735,15 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64, rowCount := 0 // translateToInsert control the update and insert behavior translateToInsert := s.params.enableOldValue && !s.params.safeMode + for _, row := range rows { + if !translateToInsert { + break + } + // It can be translated in to INSERT, if the row is committed after + // we starting replicating the table, which means it must not be + // replicated before, and there is no such row in downstream MySQL. + translateToInsert = row.CommitTs > row.ReplicatingTs + } // flush cached batch replace or insert, to keep the sequence of DMLs flushCacheDMLs := func() { @@ -1005,15 +1038,6 @@ func whereSlice(cols []*model.Column, forceReplicate bool) (colNames []string, a return } -func getSQLErrCode(err error) (errors.ErrCode, bool) { - mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError) - if !ok { - return -1, false - } - - return errors.ErrCode(mysqlErr.Number), true -} - func buildColumnList(names []string) string { var b strings.Builder for i, name := range names { diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index 618e01e78e8..ba82db98ccd 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -1081,7 +1081,6 @@ func (s *mockUnavailableMySQL) Stop() { } func TestNewMySQLTimeout(t *testing.T) { - t.Parallel() addr := "127.0.0.1:33333" mockMySQL := newMockUnavailableMySQL(addr, t) defer mockMySQL.Stop() @@ -1089,7 +1088,7 @@ func TestNewMySQLTimeout(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() changefeed := "test-changefeed" - sinkURI, err := url.Parse(fmt.Sprintf("mysql://%s/?read-timeout=2s&timeout=2s", addr)) + sinkURI, err := url.Parse(fmt.Sprintf("mysql://%s/?read-timeout=1s&timeout=1s", addr)) require.Nil(t, err) rc := config.GetDefaultReplicaConfig() f, err := filter.NewFilter(rc) @@ -1521,6 +1520,77 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) { require.Nil(t, err) } +func TestMysqlSinkNotRetryErrDupEntry(t *testing.T) { + errDup := mysql.NewErr(mysql.ErrDupEntry) + rows := []*model.RowChangedEvent{ + { + StartTs: 2, + CommitTs: 3, + ReplicatingTs: 1, + Table: &model.TableName{Schema: "s1", Table: "t1", TableID: 1}, + Columns: []*model.Column{ + { + Name: "a", + Type: mysql.TypeLong, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + Value: 1, + }, + }, + }, + } + + dbIndex := 0 + mockDBInsertDupEntry := func(ctx context.Context, dsnStr string) (*sql.DB, error) { + defer func() { + dbIndex++ + }() + if dbIndex == 0 { + // test db + db, err := mockTestDB(true) + require.Nil(t, err) + return db, nil + } + // normal db + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.Nil(t, err) + mock.ExpectBegin() + mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`) VALUES (?)"). + WithArgs(1). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit(). + WillReturnError(errDup) + mock.ExpectClose() + return db, nil + } + backupGetDBConn := GetDBConnImpl + GetDBConnImpl = mockDBInsertDupEntry + backupMaxRetry := defaultDMLMaxRetry + defaultDMLMaxRetry = 2 + defer func() { + GetDBConnImpl = backupGetDBConn + defaultDMLMaxRetry = backupMaxRetry + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + changefeed := "test-changefeed" + sinkURI, err := url.Parse( + "mysql://127.0.0.1:4000/?time-zone=UTC&worker-count=1&safe-mode=false") + require.Nil(t, err) + rc := config.GetDefaultReplicaConfig() + f, err := filter.NewFilter(rc) + require.Nil(t, err) + sink, err := NewMySQLSink( + ctx, model.DefaultChangeFeedID(changefeed), sinkURI, f, rc, map[string]string{}) + require.Nil(t, err) + + err = sink.execDMLs(ctx, rows, 0, 1 /* bucket */) + require.Equal(t, errDup, errors.Cause(err)) + + err = sink.Close(ctx) + require.Nil(t, err) +} + func TestNewMySQLSinkExecDDL(t *testing.T) { dbIndex := 0 mockGetDBConn := func(ctx context.Context, dsnStr string) (*sql.DB, error) { @@ -2146,3 +2216,280 @@ func TestMySQLSinkExecDMLError(t *testing.T) { _ = sink.Close(ctx) } + +func TestMysqlSinkSafeModeOff(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + input []*model.RowChangedEvent + expected *preparedDMLs + }{ + { + name: "empty", + input: []*model.RowChangedEvent{}, + expected: &preparedDMLs{sqls: []string{}, values: [][]interface{}{}}, + }, { + name: "insert without PK", + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + ReplicatingTs: 418658114257813513, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{ + "INSERT INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);", + }, + values: [][]interface{}{{1, 1}}, + rowCount: 1, + }, + }, { + name: "insert with PK", + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813514, + CommitTs: 418658114257813515, + ReplicatingTs: 418658114257813513, + Table: &model.TableName{Schema: "common_1", Table: "pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + Value: 1, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 1, + }}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);"}, + values: [][]interface{}{{1, 1}}, + rowCount: 1, + }, + }, { + name: "update without PK", + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + ReplicatingTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "uk_without_pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 3, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 3, + }}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{ + "UPDATE `common_1`.`uk_without_pk` SET `a1`=?,`a3`=? " + + "WHERE `a1`=? AND `a3`=? LIMIT 1;", + }, + values: [][]interface{}{{3, 3, 2, 2}}, + rowCount: 1, + }, + }, { + name: "update with PK", + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + ReplicatingTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "pk"}, + PreColumns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + Value: 2, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 2, + }}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + Value: 3, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 3, + }}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{"UPDATE `common_1`.`pk` SET `a1`=?,`a3`=? " + + "WHERE `a1`=? AND `a3`=? LIMIT 1;"}, + values: [][]interface{}{{3, 3, 2, 2}}, + rowCount: 1, + }, + }, { + name: "batch insert with PK", + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + ReplicatingTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + Value: 3, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 3, + }}, + }, + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + ReplicatingTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + Value: 5, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 5, + }}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{ + "INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", + "INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", + }, + values: [][]interface{}{{3, 3}, {5, 5}}, + rowCount: 2, + }, + }, { + name: "safe mode on commit ts < replicating ts", + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + ReplicatingTs: 418658114257813518, + Table: &model.TableName{Schema: "common_1", Table: "pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + Value: 3, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 3, + }}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{ + "REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", + }, + values: [][]interface{}{{3, 3}}, + rowCount: 1, + }, + }, { + name: "safe mode on one row commit ts < replicating ts", + input: []*model.RowChangedEvent{ + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + ReplicatingTs: 418658114257813518, + Table: &model.TableName{Schema: "common_1", Table: "pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + Value: 3, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 3, + }}, + }, + { + StartTs: 418658114257813516, + CommitTs: 418658114257813517, + ReplicatingTs: 418658114257813515, + Table: &model.TableName{Schema: "common_1", Table: "pk"}, + Columns: []*model.Column{nil, { + Name: "a1", + Type: mysql.TypeLong, + Flag: model.HandleKeyFlag | model.PrimaryKeyFlag, + Value: 5, + }, { + Name: "a3", + Type: mysql.TypeLong, + Flag: model.BinaryFlag | model.MultipleKeyFlag | model.HandleKeyFlag, + Value: 5, + }}, + }, + }, + expected: &preparedDMLs{ + sqls: []string{ + "REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", + "REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", + }, + values: [][]interface{}{{3, 3}, {5, 5}}, + rowCount: 2, + }, + }, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ms := newMySQLSink4Test(ctx, t) + ms.params.safeMode = false + ms.params.enableOldValue = true + for _, tc := range testCases { + dmls := ms.prepareDMLs(tc.input, 0, 0) + require.Equal(t, tc.expected, dmls, tc.name) + } +} diff --git a/cdc/sink/mysql/txns_heap.go b/cdc/sink/mysql/txns_heap.go index 65ae95650ac..f5ae7eb6058 100644 --- a/cdc/sink/mysql/txns_heap.go +++ b/cdc/sink/mysql/txns_heap.go @@ -22,12 +22,12 @@ import ( type innerTxnsHeap []innerHeapEntry type innerHeapEntry struct { - ts uint64 - bucket int + minCommitTs uint64 + bucket int } func (h innerTxnsHeap) Len() int { return len(h) } -func (h innerTxnsHeap) Less(i, j int) bool { return h[i].ts < h[j].ts } +func (h innerTxnsHeap) Less(i, j int) bool { return h[i].minCommitTs < h[j].minCommitTs } func (h innerTxnsHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *innerTxnsHeap) Push(x interface{}) { @@ -60,7 +60,7 @@ func newTxnsHeap(txnsMap map[model.TableID][]*model.SingleTableTxn) *txnsHeap { if len(txns) == 0 { continue } - entry := innerHeapEntry{ts: txns[0].CommitTs, bucket: bucket} + entry := innerHeapEntry{minCommitTs: txns[0].CommitTs, bucket: bucket} heap.Push(&inner, entry) } return &txnsHeap{inner: &inner, txnsGroup: txnsGroup} @@ -77,8 +77,8 @@ func (h *txnsHeap) iter(fn func(txn *model.SingleTableTxn)) { h.txnsGroup[bucket] = h.txnsGroup[bucket][1:] if len(h.txnsGroup[bucket]) > 0 { heap.Push(h.inner, innerHeapEntry{ - ts: h.txnsGroup[bucket][0].CommitTs, - bucket: bucket, + minCommitTs: h.txnsGroup[bucket][0].CommitTs, + bucket: bucket, }) } } diff --git a/dm/pkg/retry/errors.go b/dm/pkg/retry/errors.go index 8bd7367983d..e9617b136b9 100644 --- a/dm/pkg/retry/errors.go +++ b/dm/pkg/retry/errors.go @@ -81,6 +81,7 @@ var ( ) // IsConnectionError tells whether this error should reconnect to Database. +// Return true also means caller can retry sql safely. func IsConnectionError(err error) bool { err = errors.Cause(err) switch err { diff --git a/tests/integration_tests/sink_retry/run.sh b/tests/integration_tests/sink_retry/run.sh index fd5c17a43b6..c5d91808c63 100755 --- a/tests/integration_tests/sink_retry/run.sh +++ b/tests/integration_tests/sink_retry/run.sh @@ -34,10 +34,15 @@ function run() { run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" fi - check_table_exists "sink_retry.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + run_sql "CREATE TABLE sink_retry.finish_mark_1 (a int primary key);" + sleep 30 + check_table_exists "sink_retry.finish_mark_1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=sink_retry + run_sql "CREATE TABLE sink_retry.finish_mark_2 (a int primary key);" + sleep 30 + check_table_exists "sink_retry.finish_mark_2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 60 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml cleanup_process $CDC_BINARY