From cc5c8d578d85f6c706964d81b777e3bc106203b8 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Mon, 27 Sep 2021 19:33:55 +0800 Subject: [PATCH 01/13] fix issue 27015 --- br/pkg/conn/conn.go | 7 ++++ br/pkg/conn/conn_test.go | 85 +++++++++++++++++++++++++++++++++++++++- br/pkg/utils/retry.go | 6 +++ go.mod | 1 + 4 files changed, 97 insertions(+), 2 deletions(-) diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index ac37dc5f582c5..67db05219e510 100755 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -184,6 +184,13 @@ func GetAllTiKVStoresWithRetry(ctx context.Context, } }) + failpoint.Inject("hint-GetAllTiKVStores-cancel", func(val failpoint.Value) { + if val.(bool) { + logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-cancel injected.") + err = status.Error(codes.Canceled, "Cancel Retry") + } + }) + return errors.Trace(err) }, utils.NewPDReqBackoffer(), diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index 22e434d3e8d6f..89f0d80900ce2 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -6,10 +6,14 @@ import ( "context" "testing" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/pdutil" + "github.com/pkg/errors" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type fakePDClient struct { @@ -21,9 +25,86 @@ func (c fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*me return append([]*metapb.Store{}, c.stores...), nil } -func TestCheckStoresAlive(t *testing.T) { - t.Parallel() +func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) { + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-cancel", "return(true)") + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-cancel") + }() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + { + Id: 2, + State: metapb.StoreState_Offline, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + } + + fpdc := fakePDClient{ + stores: stores, + } + + kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) + require.Len(t, kvStores, 0) + require.Equal(t, codes.Canceled, status.Code(errors.Cause(err))) +} +func TestGetAllTiKVStoresWithUnknown(t *testing.T) { + _ = failpoint.Enable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error", "return(true)") + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/conn/hint-GetAllTiKVStores-error") + }() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + { + Id: 2, + State: metapb.StoreState_Offline, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + } + + fpdc := fakePDClient{ + stores: stores, + } + + kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) + require.Len(t, kvStores, 0) + require.Equal(t, codes.Unknown, status.Code(errors.Cause(err))) +} +func TestCheckStoresAlive(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 38e19fbc87e11..2ce397edbd8fd 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -7,7 +7,10 @@ import ( "strings" "time" + serrors "github.com/pingcap/errors" "go.uber.org/multierr" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var retryableServerError = []string{ @@ -47,6 +50,9 @@ func WithRetry( err := retryableFunc() if err != nil { allErrors = multierr.Append(allErrors, err) + if status.Code(serrors.Cause(err)) == codes.Canceled { // current context cancelled, stop retry + return allErrors + } select { case <-ctx.Done(): return allErrors // nolint:wrapcheck diff --git a/go.mod b/go.mod index 707903cfffd65..ac37b562af69e 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 github.com/pingcap/tidb-tools v5.0.3+incompatible github.com/pingcap/tipb v0.0.0-20210802080519-94b831c6db55 + github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 From bd531c8e7cc7ec09e0d2e5ea42806722720a876a Mon Sep 17 00:00:00 2001 From: fengou1 Date: Tue, 28 Sep 2021 09:49:39 +0800 Subject: [PATCH 02/13] fix comments - add general function isRetryableError --- br/pkg/conn/conn_test.go | 8 +++--- br/pkg/utils/retry.go | 55 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 57 insertions(+), 6 deletions(-) diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index 89f0d80900ce2..ceb3b33077549 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -60,8 +60,8 @@ func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) { stores: stores, } - kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) - require.Len(t, kvStores, 0) + _, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) + require.Error(t, err) require.Equal(t, codes.Canceled, status.Code(errors.Cause(err))) } @@ -100,8 +100,8 @@ func TestGetAllTiKVStoresWithUnknown(t *testing.T) { stores: stores, } - kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) - require.Len(t, kvStores, 0) + _, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash) + require.Error(t, err) require.Equal(t, codes.Unknown, status.Code(errors.Cause(err))) } func TestCheckStoresAlive(t *testing.T) { diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 2ce397edbd8fd..1368dd3b92035 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -4,10 +4,14 @@ package utils import ( "context" + "io" + "net" "strings" "time" - serrors "github.com/pingcap/errors" + "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + terror "github.com/pingcap/tidb/errno" "go.uber.org/multierr" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -50,9 +54,11 @@ func WithRetry( err := retryableFunc() if err != nil { allErrors = multierr.Append(allErrors, err) - if status.Code(serrors.Cause(err)) == codes.Canceled { // current context cancelled, stop retry + retry := isRetryableError(err) + if !retry { // exited retry return allErrors } + select { case <-ctx.Done(): return allErrors // nolint:wrapcheck @@ -76,3 +82,48 @@ func MessageIsRetryableStorageError(msg string) bool { } return false } + +// IsRetryableError returns whether the error is transient (e.g. network +// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This +// function returns `false` (irrecoverable) if `err == nil`. +// +// If the error is a multierr, returns true only if all suberrors are retryable. +func isRetryableError(err error) bool { + for _, singleError := range errors.Errors(err) { + if !isSingleRetryableError(singleError) { + return false + } + } + return true +} + +func isSingleRetryableError(err error) bool { + err = errors.Cause(err) + + switch err { + case nil, context.Canceled, context.DeadlineExceeded, io.EOF: + return false + } + + switch nerr := err.(type) { + case net.Error: + return nerr.Timeout() + case *mysql.MySQLError: + switch nerr.Number { + // ErrLockDeadlock can retry to commit while meet deadlock + case terror.ErrUnknown, terror.ErrLockDeadlock, terror.ErrWriteConflictInTiDB, terror.ErrPDServerTimeout, terror.ErrTiKVServerTimeout, terror.ErrTiKVServerBusy, terror.ErrResolveLockTimeout, terror.ErrRegionUnavailable: + return true + default: + return false + } + default: + switch status.Code(err) { + case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: + return true + case codes.Unknown: + return true + default: + return false + } + } +} From 0fbd620dce789a56ec910482bf73ab7c47854728 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Tue, 28 Sep 2021 10:49:52 +0800 Subject: [PATCH 03/13] remote uncessary package errors --- br/pkg/conn/conn_test.go | 2 +- go.mod | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index ceb3b33077549..2f77803fc3f78 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -6,10 +6,10 @@ import ( "context" "testing" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/pdutil" - "github.com/pkg/errors" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" "google.golang.org/grpc/codes" diff --git a/go.mod b/go.mod index ac37b562af69e..707903cfffd65 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,6 @@ require ( github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 github.com/pingcap/tidb-tools v5.0.3+incompatible github.com/pingcap/tipb v0.0.0-20210802080519-94b831c6db55 - github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 github.com/prometheus/common v0.9.1 From 6dcdbdbbb107353d3750d95d145ef3fb017837bc Mon Sep 17 00:00:00 2001 From: fengou1 Date: Wed, 29 Sep 2021 10:18:42 +0800 Subject: [PATCH 04/13] reused the retry code from lightning --- br/pkg/utils/retry.go | 55 ++----------------------------------------- 1 file changed, 2 insertions(+), 53 deletions(-) diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 1368dd3b92035..6b2c787bf060d 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -4,17 +4,11 @@ package utils import ( "context" - "io" - "net" "strings" "time" - "github.com/go-sql-driver/mysql" - "github.com/pingcap/errors" - terror "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/br/pkg/lightning/common" "go.uber.org/multierr" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) var retryableServerError = []string{ @@ -54,7 +48,7 @@ func WithRetry( err := retryableFunc() if err != nil { allErrors = multierr.Append(allErrors, err) - retry := isRetryableError(err) + retry := common.IsRetryableError(err) if !retry { // exited retry return allErrors } @@ -82,48 +76,3 @@ func MessageIsRetryableStorageError(msg string) bool { } return false } - -// IsRetryableError returns whether the error is transient (e.g. network -// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This -// function returns `false` (irrecoverable) if `err == nil`. -// -// If the error is a multierr, returns true only if all suberrors are retryable. -func isRetryableError(err error) bool { - for _, singleError := range errors.Errors(err) { - if !isSingleRetryableError(singleError) { - return false - } - } - return true -} - -func isSingleRetryableError(err error) bool { - err = errors.Cause(err) - - switch err { - case nil, context.Canceled, context.DeadlineExceeded, io.EOF: - return false - } - - switch nerr := err.(type) { - case net.Error: - return nerr.Timeout() - case *mysql.MySQLError: - switch nerr.Number { - // ErrLockDeadlock can retry to commit while meet deadlock - case terror.ErrUnknown, terror.ErrLockDeadlock, terror.ErrWriteConflictInTiDB, terror.ErrPDServerTimeout, terror.ErrTiKVServerTimeout, terror.ErrTiKVServerBusy, terror.ErrResolveLockTimeout, terror.ErrRegionUnavailable: - return true - default: - return false - } - default: - switch status.Code(err) { - case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: - return true - case codes.Unknown: - return true - default: - return false - } - } -} From e97b450976d26decfa4272f889e8b97bd385bb55 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Thu, 7 Oct 2021 14:10:00 +0800 Subject: [PATCH 05/13] refactoring retryable --- br/pkg/lightning/backend/backend.go | 4 +- br/pkg/lightning/backend/importer/importer.go | 3 +- br/pkg/lightning/backend/tidb/tidb.go | 4 +- br/pkg/lightning/common/util.go | 65 +---------------- br/pkg/lightning/common/util_test.go | 49 ------------- br/pkg/lightning/restore/checksum.go | 2 +- br/pkg/utils/retry.go | 69 ++++++++++++++++++- br/pkg/utils/retry_test.go | 62 +++++++++++++++++ 8 files changed, 137 insertions(+), 121 deletions(-) create mode 100644 br/pkg/utils/retry_test.go diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index 839928966d3c2..aa656a6576653 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -26,10 +26,10 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" - "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/table" "go.uber.org/zap" ) @@ -442,7 +442,7 @@ func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize int64) e for i := 0; i < importMaxRetryTimes; i++ { task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import") err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize) - if !common.IsRetryableError(err) { + if !utils.IsRetryableError(err) { task.End(zap.ErrorLevel, err) return err } diff --git a/br/pkg/lightning/backend/importer/importer.go b/br/pkg/lightning/backend/importer/importer.go index 828cb05c9e90e..3e06fa58034be 100644 --- a/br/pkg/lightning/backend/importer/importer.go +++ b/br/pkg/lightning/backend/importer/importer.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/tikv" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/table" "github.com/tikv/client-go/v2/oracle" @@ -249,7 +250,7 @@ outside: switch { case err == nil: continue outside - case common.IsRetryableError(err): + case utils.IsRetryableError(err): // retry next loop default: return err diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 8c8e524933c3c..b592c1acd0dcf 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -416,7 +416,7 @@ rowLoop: switch { case err == nil: continue rowLoop - case common.IsRetryableError(err): + case utils.IsRetryableError(err): // retry next loop default: // WriteBatchRowsToDB failed in the batch mode and can not be retried, @@ -529,7 +529,7 @@ func (be *tidbBackend) execStmts(ctx context.Context, stmtTasks []stmtTask, tabl return errors.Trace(err) } // Retry the non-batch insert here if this is not the last retry. - if common.IsRetryableError(err) && i != writeRowsMaxRetryTimes-1 { + if utils.IsRetryableError(err) && i != writeRowsMaxRetryTimes-1 { continue } firstRow := stmtTask.rows[0] diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index 95b0934ac9843..8e056265b08cf 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -18,28 +18,20 @@ import ( "context" "database/sql" "encoding/json" - stderrors "errors" "fmt" "io" - "net" "net/http" "net/url" "os" - "reflect" - "regexp" "strings" "syscall" "time" - "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/utils" - tmysql "github.com/pingcap/tidb/errno" "go.uber.org/zap" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) const ( @@ -130,7 +122,7 @@ outside: // do not retry NotFound error case errors.IsNotFound(err): break outside - case IsRetryableError(err): + case utils.IsRetryableError(err): logger.Warn(purpose+" failed but going to try again", log.ShortError(err)) continue default: @@ -193,61 +185,6 @@ func (t SQLWithRetry) Exec(ctx context.Context, purpose string, query string, ar }) } -// sqlmock uses fmt.Errorf to produce expectation failures, which will cause -// unnecessary retry if not specially handled >:( -var stdFatalErrorsRegexp = regexp.MustCompile( - `^call to (?s:.*) was not expected|arguments do not match:|could not match actual sql|mock non-retryable error`, -) -var stdErrorType = reflect.TypeOf(stderrors.New("")) - -// IsRetryableError returns whether the error is transient (e.g. network -// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This -// function returns `false` (irrecoverable) if `err == nil`. -// -// If the error is a multierr, returns true only if all suberrors are retryable. -func IsRetryableError(err error) bool { - for _, singleError := range errors.Errors(err) { - if !isSingleRetryableError(singleError) { - return false - } - } - return true -} - -func isSingleRetryableError(err error) bool { - err = errors.Cause(err) - - switch err { - case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows: - return false - } - - switch nerr := err.(type) { - case net.Error: - return nerr.Timeout() - case *mysql.MySQLError: - switch nerr.Number { - // ErrLockDeadlock can retry to commit while meet deadlock - case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrWriteConflictInTiDB, tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout, tmysql.ErrRegionUnavailable: - return true - default: - return false - } - default: - switch status.Code(err) { - case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: - return true - case codes.Unknown: - if reflect.TypeOf(err) == stdErrorType { - return !stdFatalErrorsRegexp.MatchString(err.Error()) - } - return true - default: - return false - } - } -} - // IsContextCanceledError returns whether the error is caused by context // cancellation. This function should only be used when the code logic is // affected by whether the error is canceling or not. diff --git a/br/pkg/lightning/common/util_test.go b/br/pkg/lightning/common/util_test.go index 04a1ceecf45b1..60812841ff259 100644 --- a/br/pkg/lightning/common/util_test.go +++ b/br/pkg/lightning/common/util_test.go @@ -17,23 +17,16 @@ package common_test import ( "context" "encoding/json" - "fmt" "io" - "net" "net/http" "net/http/httptest" "time" sqlmock "github.com/DATA-DOG/go-sqlmock" - "github.com/go-sql-driver/mysql" . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/log" - tmysql "github.com/pingcap/tidb/errno" - "go.uber.org/multierr" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) type utilSuite struct{} @@ -85,48 +78,6 @@ func (s *utilSuite) TestGetJSON(c *C) { c.Assert(err, ErrorMatches, ".*http status code != 200.*") } -func (s *utilSuite) TestIsRetryableError(c *C) { - c.Assert(common.IsRetryableError(context.Canceled), IsFalse) - c.Assert(common.IsRetryableError(context.DeadlineExceeded), IsFalse) - c.Assert(common.IsRetryableError(io.EOF), IsFalse) - c.Assert(common.IsRetryableError(&net.AddrError{}), IsFalse) - c.Assert(common.IsRetryableError(&net.DNSError{}), IsFalse) - c.Assert(common.IsRetryableError(&net.DNSError{IsTimeout: true}), IsTrue) - - // MySQL Errors - c.Assert(common.IsRetryableError(&mysql.MySQLError{}), IsFalse) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrUnknown}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrLockDeadlock}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrPDServerTimeout}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrTiKVServerTimeout}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrTiKVServerBusy}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrResolveLockTimeout}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrRegionUnavailable}), IsTrue) - c.Assert(common.IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrWriteConflictInTiDB}), IsTrue) - - // gRPC Errors - c.Assert(common.IsRetryableError(status.Error(codes.Canceled, "")), IsFalse) - c.Assert(common.IsRetryableError(status.Error(codes.Unknown, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.DeadlineExceeded, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.NotFound, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.AlreadyExists, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.PermissionDenied, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.ResourceExhausted, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.Aborted, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.OutOfRange, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.Unavailable, "")), IsTrue) - c.Assert(common.IsRetryableError(status.Error(codes.DataLoss, "")), IsTrue) - - // sqlmock errors - c.Assert(common.IsRetryableError(fmt.Errorf("call to database Close was not expected")), IsFalse) - c.Assert(common.IsRetryableError(errors.New("call to database Close was not expected")), IsTrue) - - // multierr - c.Assert(common.IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)), IsFalse) - c.Assert(common.IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})), IsTrue) - c.Assert(common.IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})), IsFalse) -} - func (s *utilSuite) TestToDSN(c *C) { param := common.MySQLConnectParam{ Host: "127.0.0.1", diff --git a/br/pkg/lightning/restore/checksum.go b/br/pkg/lightning/restore/checksum.go index c634ee48729f5..5b7a5b1501af4 100644 --- a/br/pkg/lightning/restore/checksum.go +++ b/br/pkg/lightning/restore/checksum.go @@ -314,7 +314,7 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo zap.Int("concurrency", distSQLScanConcurrency), zap.Int("retry", i)) // do not retry context.Canceled error - if !common.IsRetryableError(err) { + if !utils.IsRetryableError(err) { break } if distSQLScanConcurrency > minDistSQLScanConcurrency { diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 6b2c787bf060d..b5898360bf4c5 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -4,11 +4,21 @@ package utils import ( "context" + "database/sql" + stderrors "errors" + "io" + "net" + "reflect" + "regexp" "strings" "time" - "github.com/pingcap/tidb/br/pkg/lightning/common" + "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + tmysql "github.com/pingcap/tidb/errno" "go.uber.org/multierr" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var retryableServerError = []string{ @@ -48,7 +58,7 @@ func WithRetry( err := retryableFunc() if err != nil { allErrors = multierr.Append(allErrors, err) - retry := common.IsRetryableError(err) + retry := IsRetryableError(err) if !retry { // exited retry return allErrors } @@ -76,3 +86,58 @@ func MessageIsRetryableStorageError(msg string) bool { } return false } + +// sqlmock uses fmt.Errorf to produce expectation failures, which will cause +// unnecessary retry if not specially handled >:( +var stdFatalErrorsRegexp = regexp.MustCompile( + `^call to (?s:.*) was not expected|arguments do not match:|could not match actual sql|mock non-retryable error`, +) +var stdErrorType = reflect.TypeOf(stderrors.New("")) + +// IsRetryableError returns whether the error is transient (e.g. network +// connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This +// function returns `false` (irrecoverable) if `err == nil`. +// +// If the error is a multierr, returns true only if all suberrors are retryable. +func IsRetryableError(err error) bool { + for _, singleError := range errors.Errors(err) { + if !isSingleRetryableError(singleError) { + return false + } + } + return true +} + +func isSingleRetryableError(err error) bool { + err = errors.Cause(err) + + switch err { + case nil, context.Canceled, context.DeadlineExceeded, io.EOF, sql.ErrNoRows: + return false + } + + switch nerr := err.(type) { + case net.Error: + return nerr.Timeout() + case *mysql.MySQLError: + switch nerr.Number { + // ErrLockDeadlock can retry to commit while meet deadlock + case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrWriteConflictInTiDB, tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout, tmysql.ErrRegionUnavailable: + return true + default: + return false + } + default: + switch status.Code(err) { + case codes.DeadlineExceeded, codes.NotFound, codes.AlreadyExists, codes.PermissionDenied, codes.ResourceExhausted, codes.Aborted, codes.OutOfRange, codes.Unavailable, codes.DataLoss: + return true + case codes.Unknown: + if reflect.TypeOf(err) == stdErrorType { + return !stdFatalErrorsRegexp.MatchString(err.Error()) + } + return true + default: + return false + } + } +} diff --git a/br/pkg/utils/retry_test.go b/br/pkg/utils/retry_test.go new file mode 100644 index 0000000000000..b5c54287f1cce --- /dev/null +++ b/br/pkg/utils/retry_test.go @@ -0,0 +1,62 @@ +package utils + +import ( + "context" + "fmt" + "io" + "net" + + "github.com/go-sql-driver/mysql" + . "github.com/pingcap/check" + "github.com/pingcap/errors" + tmysql "github.com/pingcap/tidb/errno" + "go.uber.org/multierr" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type utilSuite struct{} + +var _ = Suite(&utilSuite{}) + +func (s *utilSuite) TestIsRetryableError(c *C) { + c.Assert(IsRetryableError(context.Canceled), IsFalse) + c.Assert(IsRetryableError(context.DeadlineExceeded), IsFalse) + c.Assert(IsRetryableError(io.EOF), IsFalse) + c.Assert(IsRetryableError(&net.AddrError{}), IsFalse) + c.Assert(IsRetryableError(&net.DNSError{}), IsFalse) + c.Assert(IsRetryableError(&net.DNSError{IsTimeout: true}), IsTrue) + + // MySQL Errors + c.Assert(IsRetryableError(&mysql.MySQLError{}), IsFalse) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrUnknown}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrLockDeadlock}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrPDServerTimeout}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrTiKVServerTimeout}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrTiKVServerBusy}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrResolveLockTimeout}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrRegionUnavailable}), IsTrue) + c.Assert(IsRetryableError(&mysql.MySQLError{Number: tmysql.ErrWriteConflictInTiDB}), IsTrue) + + // gRPC Errors + c.Assert(IsRetryableError(status.Error(codes.Canceled, "")), IsFalse) + c.Assert(IsRetryableError(status.Error(codes.Unknown, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.DeadlineExceeded, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.NotFound, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.AlreadyExists, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.PermissionDenied, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.ResourceExhausted, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.Aborted, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.OutOfRange, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.Unavailable, "")), IsTrue) + c.Assert(IsRetryableError(status.Error(codes.DataLoss, "")), IsTrue) + + // sqlmock errors + c.Assert(IsRetryableError(fmt.Errorf("call to database Close was not expected")), IsFalse) + c.Assert(IsRetryableError(errors.New("call to database Close was not expected")), IsTrue) + + // multierr + c.Assert(IsRetryableError(multierr.Combine(context.Canceled, context.Canceled)), IsFalse) + c.Assert(IsRetryableError(multierr.Combine(&net.DNSError{IsTimeout: true}, &net.DNSError{IsTimeout: true})), IsTrue) + c.Assert(IsRetryableError(multierr.Combine(context.Canceled, &net.DNSError{IsTimeout: true})), IsFalse) +} From d4004f398d0d3b3c19b4dc3b8aadc4d6a68a3a09 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Tue, 12 Oct 2021 11:27:46 +0800 Subject: [PATCH 06/13] backup failed summary should be quieter Fixes #27015 --- br/pkg/lightning/backend/backend.go | 1 + br/pkg/lightning/common/util.go | 1 + 2 files changed, 2 insertions(+) diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index ad8ccdf936de9..d2d9e64de6c18 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/table" "go.uber.org/zap" ) diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index 509acc5b1b649..f8345b22e6500 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/parser/model" "go.uber.org/zap" ) From a1f292b5a25c5d55011a1cb68be2b37dc2559897 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Wed, 10 Nov 2021 12:11:19 +0800 Subject: [PATCH 07/13] optimize error log --- br/pkg/backup/client.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 225680526cd31..edab440050e08 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -429,7 +429,14 @@ func (bc *Client) BackupRanges( elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id)) err := bc.BackupRange(elctx, sk, ek, req, metaWriter, progressCallBack) if err != nil { - return errors.Trace(err) + // The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear) + if errors.Cause(err) == context.Canceled { + errors.SuspendStack(err) + return err + } else { + return errors.Trace(err) + } + } return nil }) From fbdf057e3aa4b10f68dd47f8f2b34d940ab7ab17 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Wed, 10 Nov 2021 15:17:54 +0800 Subject: [PATCH 08/13] fixed unit test for dumpling --- br/pkg/utils/retry.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index b5898360bf4c5..a076190b953d6 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -58,11 +58,6 @@ func WithRetry( err := retryableFunc() if err != nil { allErrors = multierr.Append(allErrors, err) - retry := IsRetryableError(err) - if !retry { // exited retry - return allErrors - } - select { case <-ctx.Done(): return allErrors // nolint:wrapcheck From 71d1d2821665d83498800ee268d1238524ef6673 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Mon, 22 Nov 2021 15:20:53 +0800 Subject: [PATCH 09/13] add a retry test --- br/pkg/backup/client.go | 3 +-- br/pkg/summary/collector.go | 11 ++++++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index e2bc5cb8fff0b..377bc28f633ca 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -454,8 +454,7 @@ func (bc *Client) BackupRanges( if err != nil { // The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear) if errors.Cause(err) == context.Canceled { - errors.SuspendStack(err) - return err + return errors.SuspendStack(err) } else { return errors.Trace(err) } diff --git a/br/pkg/summary/collector.go b/br/pkg/summary/collector.go index 5493f82f77967..7007a793032f4 100644 --- a/br/pkg/summary/collector.go +++ b/br/pkg/summary/collector.go @@ -3,12 +3,14 @@ package summary import ( + "context" "strings" "sync" "time" "github.com/docker/go-units" "github.com/pingcap/log" + "github.com/pkg/errors" "go.uber.org/zap" ) @@ -188,9 +190,16 @@ func (tc *logCollector) Summary(name string) { } if len(tc.failureReasons) != 0 || !tc.successStatus { + canceledUnits := make([]string, 0) for unitName, reason := range tc.failureReasons { - logFields = append(logFields, zap.String("unit-name", unitName), zap.Error(reason)) + if errors.Cause(reason) != context.Canceled { + logFields = append(logFields, zap.String("unit-name", unitName), zap.Error(reason)) + } else { + canceledUnits = append(canceledUnits, unitName) + } } + // only print total number of cancel unit + log.Info("units canceled", zap.Int("cancel-unit", len(canceledUnits))) tc.log(name+" failed summary", logFields...) return } From dbf2006b0ba8ab207debee522e9ac7996d142b66 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Thu, 25 Nov 2021 10:59:45 +0800 Subject: [PATCH 10/13] fix dumpling unit test error --- br/pkg/summary/collector.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/summary/collector.go b/br/pkg/summary/collector.go index 7007a793032f4..16736cb979a88 100644 --- a/br/pkg/summary/collector.go +++ b/br/pkg/summary/collector.go @@ -9,8 +9,8 @@ import ( "time" "github.com/docker/go-units" + berror "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pkg/errors" "go.uber.org/zap" ) @@ -192,7 +192,7 @@ func (tc *logCollector) Summary(name string) { if len(tc.failureReasons) != 0 || !tc.successStatus { canceledUnits := make([]string, 0) for unitName, reason := range tc.failureReasons { - if errors.Cause(reason) != context.Canceled { + if berror.Cause(reason) != context.Canceled { logFields = append(logFields, zap.String("unit-name", unitName), zap.Error(reason)) } else { canceledUnits = append(canceledUnits, unitName) From dde30dddce117033e9be7838952c113b949f5aa6 Mon Sep 17 00:00:00 2001 From: fengou1 Date: Sat, 27 Nov 2021 11:45:02 +0800 Subject: [PATCH 11/13] Add new collector --- br/pkg/summary/collector.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/br/pkg/summary/collector.go b/br/pkg/summary/collector.go index 16736cb979a88..570d2c76e2b43 100644 --- a/br/pkg/summary/collector.go +++ b/br/pkg/summary/collector.go @@ -190,16 +190,16 @@ func (tc *logCollector) Summary(name string) { } if len(tc.failureReasons) != 0 || !tc.successStatus { - canceledUnits := make([]string, 0) + var canceledUnits int for unitName, reason := range tc.failureReasons { if berror.Cause(reason) != context.Canceled { logFields = append(logFields, zap.String("unit-name", unitName), zap.Error(reason)) } else { - canceledUnits = append(canceledUnits, unitName) + canceledUnits = canceledUnits + 1 } } // only print total number of cancel unit - log.Info("units canceled", zap.Int("cancel-unit", len(canceledUnits))) + log.Info("units canceled", zap.Int("cancel-unit", canceledUnits)) tc.log(name+" failed summary", logFields...) return } From 38aa7a0f1513049dfd86561644df023f9d6e33ce Mon Sep 17 00:00:00 2001 From: fengou1 Date: Mon, 20 Dec 2021 22:41:25 +0800 Subject: [PATCH 12/13] add unit test for download and import back off --- br/pkg/utils/backoff_test.go | 40 +++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index e8af459c78de6..5b67086b4252f 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -102,7 +102,6 @@ func TestBackoffWithRetryableError(t *testing.T) { berrors.ErrKVEpochNotMatch, }, multierr.Errors(err)) } - func TestPdBackoffWithRetryableError(t *testing.T) { t.Parallel() @@ -133,3 +132,42 @@ func TestPdBackoffWithRetryableError(t *testing.T) { gRPCError, }, multierr.Errors(err)) } + +func TestNewImportSSTBackofferWithSucess(t *testing.T) { + t.Parallel() + + var counter int + backoffer := utils.NewImportSSTBackoffer() + err := utils.WithRetry(context.Background(), func() error { + defer func() { counter++ }() + switch counter { + case 15: + return nil + } + return berrors.ErrKVDownloadFailed + }, backoffer) + require.Equal(t, 16, counter) + require.Nil(t, err) +} + +func TestNewDownloadSSTBackofferWithCancel(t *testing.T) { + t.Parallel() + + var counter int + backoffer := utils.NewDownloadSSTBackoffer() + err := utils.WithRetry(context.Background(), func() error { + defer func() { counter++ }() + switch counter { + case 3: + return context.Canceled + } + return berrors.ErrKVIngestFailed + }, backoffer) + require.Equal(t, 4, counter) + require.Equal(t, []error{ + berrors.ErrKVIngestFailed, + berrors.ErrKVIngestFailed, + berrors.ErrKVIngestFailed, + context.Canceled, + }, multierr.Errors(err)) +} From eb7261c71c0a7088a5eac74628c1a28b9489f12c Mon Sep 17 00:00:00 2001 From: fengou1 Date: Mon, 20 Dec 2021 22:50:44 +0800 Subject: [PATCH 13/13] fix pr review comments --- br/pkg/summary/collector.go | 2 +- br/pkg/utils/backoff_test.go | 40 +----------------------------------- 2 files changed, 2 insertions(+), 40 deletions(-) diff --git a/br/pkg/summary/collector.go b/br/pkg/summary/collector.go index 570d2c76e2b43..6c82bf54fba25 100644 --- a/br/pkg/summary/collector.go +++ b/br/pkg/summary/collector.go @@ -195,7 +195,7 @@ func (tc *logCollector) Summary(name string) { if berror.Cause(reason) != context.Canceled { logFields = append(logFields, zap.String("unit-name", unitName), zap.Error(reason)) } else { - canceledUnits = canceledUnits + 1 + canceledUnits++ } } // only print total number of cancel unit diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index 5b67086b4252f..e8af459c78de6 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -102,6 +102,7 @@ func TestBackoffWithRetryableError(t *testing.T) { berrors.ErrKVEpochNotMatch, }, multierr.Errors(err)) } + func TestPdBackoffWithRetryableError(t *testing.T) { t.Parallel() @@ -132,42 +133,3 @@ func TestPdBackoffWithRetryableError(t *testing.T) { gRPCError, }, multierr.Errors(err)) } - -func TestNewImportSSTBackofferWithSucess(t *testing.T) { - t.Parallel() - - var counter int - backoffer := utils.NewImportSSTBackoffer() - err := utils.WithRetry(context.Background(), func() error { - defer func() { counter++ }() - switch counter { - case 15: - return nil - } - return berrors.ErrKVDownloadFailed - }, backoffer) - require.Equal(t, 16, counter) - require.Nil(t, err) -} - -func TestNewDownloadSSTBackofferWithCancel(t *testing.T) { - t.Parallel() - - var counter int - backoffer := utils.NewDownloadSSTBackoffer() - err := utils.WithRetry(context.Background(), func() error { - defer func() { counter++ }() - switch counter { - case 3: - return context.Canceled - } - return berrors.ErrKVIngestFailed - }, backoffer) - require.Equal(t, 4, counter) - require.Equal(t, []error{ - berrors.ErrKVIngestFailed, - berrors.ErrKVIngestFailed, - berrors.ErrKVIngestFailed, - context.Canceled, - }, multierr.Errors(err)) -}