Skip to content

Commit

Permalink
errorutil,sink,syncer: add errorutil to handle ignorable error (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored and 3AceShowHand committed Jan 13, 2022
1 parent d99b5a6 commit d358c28
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 72 deletions.
41 changes: 14 additions & 27 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
tddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -39,7 +37,7 @@ import (
"github.com/pingcap/tiflow/pkg/cyclic"
"github.com/pingcap/tiflow/pkg/cyclic/mark"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/errorutil"
tifilter "github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/notify"
"github.com/pingcap/tiflow/pkg/quotes"
Expand Down Expand Up @@ -91,7 +89,7 @@ type mysqlSink struct {
db *sql.DB
params *sinkParams

filter *filter.Filter
filter *tifilter.Filter
cyclic *cyclic.Cyclic

txnCache *common.UnresolvedTxnCache
Expand All @@ -114,6 +112,16 @@ type mysqlSink struct {
cancel func()
}

func needSwitchDB(ddl *model.DDLEvent) bool {
if len(ddl.TableInfo.Schema) > 0 {
if ddl.Type == timodel.ActionCreateSchema || ddl.Type == timodel.ActionDropSchema {
return false
}
return true
}
return false
}

func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
count := s.txnCache.Append(s.filter, rows...)
s.statistics.AddRowsCount(count)
Expand Down Expand Up @@ -214,7 +222,7 @@ func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab
func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error {
return retry.Do(ctx, func() error {
err := s.execDDL(ctx, ddl)
if isIgnorableDDLError(err) {
if errorutil.IsIgnorableMySQLDDLError(err) {
log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err))
return nil
}
Expand All @@ -226,7 +234,7 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEve
}

func (s *mysqlSink) execDDL(ctx context.Context, ddl *model.DDLEvent) error {
shouldSwitchDB := len(ddl.TableInfo.Schema) > 0 && ddl.Type != timodel.ActionCreateSchema
shouldSwitchDB := needSwitchDB(ddl)

failpoint.Inject("MySQLSinkExecDDLDelay", func() {
select {
Expand Down Expand Up @@ -1288,27 +1296,6 @@ func whereSlice(cols []*model.Column, forceReplicate bool) (colNames []string, a
return
}

func isIgnorableDDLError(err error) bool {
errCode, ok := getSQLErrCode(err)
if !ok {
return false
}
// we can get error code from:
// infoschema's error definition: https://github.com/pingcap/tidb/blob/master/infoschema/infoschema.go
// DDL's error definition: https://github.com/pingcap/tidb/blob/master/ddl/ddl.go
// tidb/mysql error code definition: https://github.com/pingcap/tidb/blob/master/mysql/errcode.go
switch errCode {
case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseNotExists.Code(), infoschema.ErrDatabaseDropExists.Code(),
infoschema.ErrTableExists.Code(), infoschema.ErrTableNotExists.Code(), infoschema.ErrTableDropExists.Code(),
infoschema.ErrColumnExists.Code(), infoschema.ErrColumnNotExists.Code(), infoschema.ErrIndexExists.Code(),
infoschema.ErrKeyNotExists.Code(), tddl.ErrCantDropFieldOrKey.Code(), mysql.ErrDupKeyName, mysql.ErrSameNamePartition,
mysql.ErrDropPartitionNonExistent, mysql.ErrMultiplePriKey:
return true
default:
return false
}
}

func getSQLErrCode(err error) (errors.ErrCode, bool) {
mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError)
if !ok {
Expand Down
6 changes: 4 additions & 2 deletions cdc/sink/simple_mysql_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/quotes"
"go.uber.org/zap"
)

func init() {
Expand Down Expand Up @@ -176,7 +178,7 @@ func (s *simpleMySQLSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent)
sql = fmt.Sprintf("use %s;%s", ddl.TableInfo.Schema, ddl.Query)
}
_, err := s.db.ExecContext(ctx, sql)
if err != nil && isIgnorableDDLError(err) {
if err != nil && errorutil.IsIgnorableMySQLDDLError(err) {
log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err))
return nil
}
Expand Down
25 changes: 2 additions & 23 deletions dm/syncer/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/pkg/dbutil"
tddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
tmysql "github.com/pingcap/tidb/parser/mysql"
Expand All @@ -32,28 +30,9 @@ import (
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/syncer/dbconn"
"github.com/pingcap/tiflow/pkg/errorutil"
)

func ignoreDDLError(err error) bool {
err = errors.Cause(err)
mysqlErr, ok := err.(*mysql.MySQLError)
if !ok {
return false
}

errCode := errors.ErrCode(mysqlErr.Number)
switch errCode {
case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseDropExists.Code(),
infoschema.ErrTableExists.Code(), infoschema.ErrTableDropExists.Code(),
infoschema.ErrColumnExists.Code(),
infoschema.ErrIndexExists.Code(),
infoschema.ErrKeyNameDuplicate.Code(), tddl.ErrCantDropFieldOrKey.Code():
return true
default:
return false
}
}

func isDropColumnWithIndexError(err error) bool {
mysqlErr, ok := errors.Cause(err).(*mysql.MySQLError)
if !ok {
Expand Down Expand Up @@ -207,7 +186,7 @@ func (s *Syncer) handleSpecialDDLError(tctx *tcontext.Context, err error, ddls [
}

tctx.L().Info("drop index success, now try to drop column", zap.Strings("index", idx2Drop))
if _, err2 = conn.ExecuteSQLWithIgnore(tctx, ignoreDDLError, ddls[index:]); err2 != nil {
if _, err2 = conn.ExecuteSQLWithIgnore(tctx, errorutil.IsIgnorableMySQLDDLError, ddls[index:]); err2 != nil {
return err2
}

Expand Down
18 changes: 0 additions & 18 deletions dm/syncer/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/infoschema"
tmysql "github.com/pingcap/tidb/parser/mysql"

"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
Expand All @@ -36,22 +34,6 @@ func newMysqlErr(number uint16, message string) *mysql.MySQLError {
}
}

func (s *testSyncerSuite) TestIgnoreDDLError(c *C) {
cases := []struct {
err error
ret bool
}{
{errors.New("raw error"), false},
{newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), true},
{newMysqlErr(uint16(infoschema.ErrDatabaseExists.Code()), "Can't create database"), true},
{newMysqlErr(uint16(infoschema.ErrAccessDenied.Code()), "Access denied for user"), false},
}

for _, t := range cases {
c.Assert(ignoreDDLError(t.err), Equals, t.ret)
}
}

func (s *testSyncerSuite) TestHandleSpecialDDLError(c *C) {
var (
syncer = NewSyncer(s.cfg, nil, nil)
Expand Down
3 changes: 2 additions & 1 deletion dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools"
sm "github.com/pingcap/tiflow/dm/syncer/safe-mode"
"github.com/pingcap/tiflow/dm/syncer/shardddl"
"github.com/pingcap/tiflow/pkg/errorutil"
)

var (
Expand Down Expand Up @@ -1162,7 +1163,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn.

if !ignore {
var affected int
affected, err = db.ExecuteSQLWithIgnore(tctx, ignoreDDLError, ddlJob.ddls)
affected, err = db.ExecuteSQLWithIgnore(tctx, errorutil.IsIgnorableMySQLDDLError, ddlJob.ddls)
if err != nil {
err = s.handleSpecialDDLError(tctx, err, ddlJob.ddls, affected, db)
err = terror.WithScope(err, terror.ScopeDownstream)
Expand Down
3 changes: 2 additions & 1 deletion dm/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
streamer2 "github.com/pingcap/tiflow/dm/pkg/streamer"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/dm/syncer/dbconn"
"github.com/pingcap/tiflow/pkg/errorutil"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/go-mysql-org/go-mysql/mysql"
Expand Down Expand Up @@ -1644,7 +1645,7 @@ func (s *testSyncerSuite) TestExecuteSQLSWithIgnore(c *C) {
mock.ExpectCommit()

tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestExecuteSQLSWithIgnore")))
n, err := conn.ExecuteSQLWithIgnore(tctx, ignoreDDLError, sqls)
n, err := conn.ExecuteSQLWithIgnore(tctx, errorutil.IsIgnorableMySQLDDLError, sqls)
c.Assert(err, IsNil)
c.Assert(n, Equals, 2)

Expand Down
48 changes: 48 additions & 0 deletions pkg/errorutil/ignore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package errorutil

import (
dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
tddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/mysql"
)

// IsIgnorableMySQLDDLError is used to check what error can be ignored
// we can get error code from:
// infoschema's error definition: https://github.com/pingcap/tidb/blob/master/infoschema/infoschema.go
// DDL's error definition: https://github.com/pingcap/tidb/blob/master/ddl/ddl.go
// tidb/mysql error code definition: https://github.com/pingcap/tidb/blob/master/mysql/errcode.go
func IsIgnorableMySQLDDLError(err error) bool {
err = errors.Cause(err)
mysqlErr, ok := err.(*dmysql.MySQLError)
if !ok {
return false
}

errCode := errors.ErrCode(mysqlErr.Number)
switch errCode {
case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseDropExists.Code(),
infoschema.ErrTableExists.Code(), infoschema.ErrTableDropExists.Code(),
infoschema.ErrColumnExists.Code(), infoschema.ErrIndexExists.Code(),
infoschema.ErrKeyNotExists.Code(), tddl.ErrCantDropFieldOrKey.Code(),
mysql.ErrDupKeyName, mysql.ErrSameNamePartition,
mysql.ErrDropPartitionNonExistent, mysql.ErrMultiplePriKey:
return true
default:
return false
}
}
47 changes: 47 additions & 0 deletions pkg/errorutil/ignore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package errorutil

import (
"errors"
"testing"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb/infoschema"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/assert"
)

func newMysqlErr(number uint16, message string) *mysql.MySQLError {
return &mysql.MySQLError{
Number: number,
Message: message,
}
}

func TestIgnoreMysqlDDLError(t *testing.T) {
cases := []struct {
err error
ret bool
}{
{errors.New("raw error"), false},
{newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), true},
{newMysqlErr(uint16(infoschema.ErrDatabaseExists.Code()), "Can't create database"), true},
{newMysqlErr(uint16(infoschema.ErrAccessDenied.Code()), "Access denied for user"), false},
}

for _, item := range cases {
assert.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err))
}
}

0 comments on commit d358c28

Please sign in to comment.