From d358c28c61ca997c5aba8882cf8715a7ca68e185 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 21 Dec 2021 18:11:47 +0800 Subject: [PATCH] errorutil,sink,syncer: add errorutil to handle ignorable error (#3264) (#3995) --- cdc/sink/mysql.go | 41 ++++++++++------------------ cdc/sink/simple_mysql_tester.go | 6 +++-- dm/syncer/error.go | 25 ++--------------- dm/syncer/error_test.go | 18 ------------- dm/syncer/syncer.go | 3 ++- dm/syncer/syncer_test.go | 3 ++- pkg/errorutil/ignore.go | 48 +++++++++++++++++++++++++++++++++ pkg/errorutil/ignore_test.go | 47 ++++++++++++++++++++++++++++++++ 8 files changed, 119 insertions(+), 72 deletions(-) create mode 100644 pkg/errorutil/ignore.go create mode 100644 pkg/errorutil/ignore_test.go diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index aeab222697d..044face3385 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -29,8 +29,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" - tddl "github.com/pingcap/tidb/ddl" - "github.com/pingcap/tidb/infoschema" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" @@ -39,7 +37,7 @@ import ( "github.com/pingcap/tiflow/pkg/cyclic" "github.com/pingcap/tiflow/pkg/cyclic/mark" cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/errorutil" tifilter "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/notify" "github.com/pingcap/tiflow/pkg/quotes" @@ -91,7 +89,7 @@ type mysqlSink struct { db *sql.DB params *sinkParams - filter *filter.Filter + filter *tifilter.Filter cyclic *cyclic.Cyclic txnCache *common.UnresolvedTxnCache @@ -114,6 +112,16 @@ type mysqlSink struct { cancel func() } +func needSwitchDB(ddl *model.DDLEvent) bool { + if len(ddl.TableInfo.Schema) > 0 { + if ddl.Type == timodel.ActionCreateSchema || ddl.Type == timodel.ActionDropSchema { + return false + } + return true + } + return false +} + func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { count := s.txnCache.Append(s.filter, rows...) s.statistics.AddRowsCount(count) @@ -214,7 +222,7 @@ func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error { return retry.Do(ctx, func() error { err := s.execDDL(ctx, ddl) - if isIgnorableDDLError(err) { + if errorutil.IsIgnorableMySQLDDLError(err) { log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err)) return nil } @@ -226,7 +234,7 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEve } func (s *mysqlSink) execDDL(ctx context.Context, ddl *model.DDLEvent) error { - shouldSwitchDB := len(ddl.TableInfo.Schema) > 0 && ddl.Type != timodel.ActionCreateSchema + shouldSwitchDB := needSwitchDB(ddl) failpoint.Inject("MySQLSinkExecDDLDelay", func() { select { @@ -1288,27 +1296,6 @@ func whereSlice(cols []*model.Column, forceReplicate bool) (colNames []string, a return } -func isIgnorableDDLError(err error) bool { - errCode, ok := getSQLErrCode(err) - if !ok { - return false - } - // we can get error code from: - // infoschema's error definition: https://github.com/pingcap/tidb/blob/master/infoschema/infoschema.go - // DDL's error definition: https://github.com/pingcap/tidb/blob/master/ddl/ddl.go - // tidb/mysql error code definition: https://github.com/pingcap/tidb/blob/master/mysql/errcode.go - switch errCode { - case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseNotExists.Code(), infoschema.ErrDatabaseDropExists.Code(), - infoschema.ErrTableExists.Code(), infoschema.ErrTableNotExists.Code(), infoschema.ErrTableDropExists.Code(), - infoschema.ErrColumnExists.Code(), infoschema.ErrColumnNotExists.Code(), infoschema.ErrIndexExists.Code(), - infoschema.ErrKeyNotExists.Code(), tddl.ErrCantDropFieldOrKey.Code(), mysql.ErrDupKeyName, mysql.ErrSameNamePartition, - mysql.ErrDropPartitionNonExistent, mysql.ErrMultiplePriKey: - return true - default: - return false - } -} - func getSQLErrCode(err error) (errors.ErrCode, bool) { mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError) if !ok { diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index 4bfd5abf7ae..95a8eb9cba4 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -25,12 +25,14 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "go.uber.org/zap" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/quotes" - "go.uber.org/zap" ) func init() { @@ -176,7 +178,7 @@ func (s *simpleMySQLSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) sql = fmt.Sprintf("use %s;%s", ddl.TableInfo.Schema, ddl.Query) } _, err := s.db.ExecContext(ctx, sql) - if err != nil && isIgnorableDDLError(err) { + if err != nil && errorutil.IsIgnorableMySQLDDLError(err) { log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err)) return nil } diff --git a/dm/syncer/error.go b/dm/syncer/error.go index a1638b932a5..c1aec276d8e 100644 --- a/dm/syncer/error.go +++ b/dm/syncer/error.go @@ -21,9 +21,7 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/tidb-tools/pkg/dbutil" - tddl "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/errno" - "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" tmysql "github.com/pingcap/tidb/parser/mysql" @@ -32,28 +30,9 @@ import ( tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/syncer/dbconn" + "github.com/pingcap/tiflow/pkg/errorutil" ) -func ignoreDDLError(err error) bool { - err = errors.Cause(err) - mysqlErr, ok := err.(*mysql.MySQLError) - if !ok { - return false - } - - errCode := errors.ErrCode(mysqlErr.Number) - switch errCode { - case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseDropExists.Code(), - infoschema.ErrTableExists.Code(), infoschema.ErrTableDropExists.Code(), - infoschema.ErrColumnExists.Code(), - infoschema.ErrIndexExists.Code(), - infoschema.ErrKeyNameDuplicate.Code(), tddl.ErrCantDropFieldOrKey.Code(): - return true - default: - return false - } -} - func isDropColumnWithIndexError(err error) bool { mysqlErr, ok := errors.Cause(err).(*mysql.MySQLError) if !ok { @@ -207,7 +186,7 @@ func (s *Syncer) handleSpecialDDLError(tctx *tcontext.Context, err error, ddls [ } tctx.L().Info("drop index success, now try to drop column", zap.Strings("index", idx2Drop)) - if _, err2 = conn.ExecuteSQLWithIgnore(tctx, ignoreDDLError, ddls[index:]); err2 != nil { + if _, err2 = conn.ExecuteSQLWithIgnore(tctx, errorutil.IsIgnorableMySQLDDLError, ddls[index:]); err2 != nil { return err2 } diff --git a/dm/syncer/error_test.go b/dm/syncer/error_test.go index a6ca84e91f9..9f84997bec3 100644 --- a/dm/syncer/error_test.go +++ b/dm/syncer/error_test.go @@ -21,8 +21,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/tidb/errno" - "github.com/pingcap/tidb/infoschema" - tmysql "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/dm/pkg/conn" tcontext "github.com/pingcap/tiflow/dm/pkg/context" @@ -36,22 +34,6 @@ func newMysqlErr(number uint16, message string) *mysql.MySQLError { } } -func (s *testSyncerSuite) TestIgnoreDDLError(c *C) { - cases := []struct { - err error - ret bool - }{ - {errors.New("raw error"), false}, - {newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), true}, - {newMysqlErr(uint16(infoschema.ErrDatabaseExists.Code()), "Can't create database"), true}, - {newMysqlErr(uint16(infoschema.ErrAccessDenied.Code()), "Access denied for user"), false}, - } - - for _, t := range cases { - c.Assert(ignoreDDLError(t.err), Equals, t.ret) - } -} - func (s *testSyncerSuite) TestHandleSpecialDDLError(c *C) { var ( syncer = NewSyncer(s.cfg, nil, nil) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 585f3cfa674..ea3c22f73f1 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -70,6 +70,7 @@ import ( onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools" sm "github.com/pingcap/tiflow/dm/syncer/safe-mode" "github.com/pingcap/tiflow/dm/syncer/shardddl" + "github.com/pingcap/tiflow/pkg/errorutil" ) var ( @@ -1162,7 +1163,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn. if !ignore { var affected int - affected, err = db.ExecuteSQLWithIgnore(tctx, ignoreDDLError, ddlJob.ddls) + affected, err = db.ExecuteSQLWithIgnore(tctx, errorutil.IsIgnorableMySQLDDLError, ddlJob.ddls) if err != nil { err = s.handleSpecialDDLError(tctx, err, ddlJob.ddls, affected, db) err = terror.WithScope(err, terror.ScopeDownstream) diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index dbec3bbde03..3467cd1e8e9 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -40,6 +40,7 @@ import ( streamer2 "github.com/pingcap/tiflow/dm/pkg/streamer" "github.com/pingcap/tiflow/dm/pkg/utils" "github.com/pingcap/tiflow/dm/syncer/dbconn" + "github.com/pingcap/tiflow/pkg/errorutil" sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/go-mysql-org/go-mysql/mysql" @@ -1644,7 +1645,7 @@ func (s *testSyncerSuite) TestExecuteSQLSWithIgnore(c *C) { mock.ExpectCommit() tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestExecuteSQLSWithIgnore"))) - n, err := conn.ExecuteSQLWithIgnore(tctx, ignoreDDLError, sqls) + n, err := conn.ExecuteSQLWithIgnore(tctx, errorutil.IsIgnorableMySQLDDLError, sqls) c.Assert(err, IsNil) c.Assert(n, Equals, 2) diff --git a/pkg/errorutil/ignore.go b/pkg/errorutil/ignore.go new file mode 100644 index 00000000000..6a48dd203f6 --- /dev/null +++ b/pkg/errorutil/ignore.go @@ -0,0 +1,48 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package errorutil + +import ( + dmysql "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + tddl "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/mysql" +) + +// IsIgnorableMySQLDDLError is used to check what error can be ignored +// we can get error code from: +// infoschema's error definition: https://github.com/pingcap/tidb/blob/master/infoschema/infoschema.go +// DDL's error definition: https://github.com/pingcap/tidb/blob/master/ddl/ddl.go +// tidb/mysql error code definition: https://github.com/pingcap/tidb/blob/master/mysql/errcode.go +func IsIgnorableMySQLDDLError(err error) bool { + err = errors.Cause(err) + mysqlErr, ok := err.(*dmysql.MySQLError) + if !ok { + return false + } + + errCode := errors.ErrCode(mysqlErr.Number) + switch errCode { + case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseDropExists.Code(), + infoschema.ErrTableExists.Code(), infoschema.ErrTableDropExists.Code(), + infoschema.ErrColumnExists.Code(), infoschema.ErrIndexExists.Code(), + infoschema.ErrKeyNotExists.Code(), tddl.ErrCantDropFieldOrKey.Code(), + mysql.ErrDupKeyName, mysql.ErrSameNamePartition, + mysql.ErrDropPartitionNonExistent, mysql.ErrMultiplePriKey: + return true + default: + return false + } +} diff --git a/pkg/errorutil/ignore_test.go b/pkg/errorutil/ignore_test.go new file mode 100644 index 00000000000..7c2dbd0f7e0 --- /dev/null +++ b/pkg/errorutil/ignore_test.go @@ -0,0 +1,47 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package errorutil + +import ( + "errors" + "testing" + + "github.com/go-sql-driver/mysql" + "github.com/pingcap/tidb/infoschema" + tmysql "github.com/pingcap/tidb/parser/mysql" + "github.com/stretchr/testify/assert" +) + +func newMysqlErr(number uint16, message string) *mysql.MySQLError { + return &mysql.MySQLError{ + Number: number, + Message: message, + } +} + +func TestIgnoreMysqlDDLError(t *testing.T) { + cases := []struct { + err error + ret bool + }{ + {errors.New("raw error"), false}, + {newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), true}, + {newMysqlErr(uint16(infoschema.ErrDatabaseExists.Code()), "Can't create database"), true}, + {newMysqlErr(uint16(infoschema.ErrAccessDenied.Code()), "Access denied for user"), false}, + } + + for _, item := range cases { + assert.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err)) + } +}