From f144aed969d60c47cfb89ed4da7032754ddc771a Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 16 Nov 2023 13:58:48 +0800 Subject: [PATCH 01/15] backup error refactor --- br/pkg/backup/client.go | 1 + br/pkg/errors/errors.go | 2 + br/pkg/utils/retry.go | 140 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 1721fab2c45ba..c19ea82560b10 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1234,6 +1234,7 @@ func OnBackupResponse( log.Error("backup occur cluster ID error", zap.Reflect("error", v), zap.Uint64("storeID", storeID)) return nil, 0, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v on storeID: %d", resp.Error, storeID) default: + // blcok-list // UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error. if utils.MessageIsRetryableStorageError(resp.GetError().GetMsg()) { log.Warn("backup occur storage error", zap.String("error", resp.GetError().GetMsg())) diff --git a/br/pkg/errors/errors.go b/br/pkg/errors/errors.go index 1c18f0b45fe6e..9c032bbbb3259 100644 --- a/br/pkg/errors/errors.go +++ b/br/pkg/errors/errors.go @@ -49,6 +49,8 @@ var ( ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange")) ErrBackupNoLeader = errors.Normalize("backup no leader", errors.RFCCodeText("BR:Backup:ErrBackupNoLeader")) ErrBackupGCSafepointExceeded = errors.Normalize("backup GC safepoint exceeded", errors.RFCCodeText("BR:Backup:ErrBackupGCSafepointExceeded")) + ErrBackupKeyIsLocked = errors.Normalize("backup key is locked", errors.RFCCodeText("BR:Backup:ErrBackupKeyIsLocked")) + ErrBackupRegion = errors.Normalize("backup region error", errors.RFCCodeText("BR:Backup:ErrBackupRegion")) ErrRestoreModeMismatch = errors.Normalize("restore mode mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreModeMismatch")) ErrRestoreRangeMismatch = errors.Normalize("restore range mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreRangeMismatch")) diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 670521fc94c02..19f7065fc6bd5 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -4,15 +4,21 @@ package utils import ( "context" + "fmt" "strings" "sync" "time" + "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" + berrors "github.com/pingcap/tidb/br/pkg/errors" tmysql "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/tikv/client-go/v2/tikv" "go.uber.org/multierr" + "go.uber.org/zap" ) var retryableServerError = []string{ @@ -32,6 +38,140 @@ var retryableServerError = []string{ "end of file before message length reached", } +type ErrorStrategy int + +const ( + // This type can be retry but consume the backoffer attempts. + Retry ErrorStrategy = iota + // This type can be retry immediately. + LosslessRetry + // This type means un-recover error and the whole progress should exits + // for example: + // 1. permission not valid. + // 2. data has not found. + // 3. retry too many times + GiveUp + // Do nothing + Ignore + // This type means we need do some resolve work before retry. + // current the work is only resolve locks. + Resolve +) + +type ErrorContext struct { + // encounter times for one context on a store + // we may use this value to determine the retry policy + encounterTimesOnStore int + // whether need retry + needRetry bool + // whether canbe ignore this error + canbeIgnore bool + // whether in backup or restore + scenario string + + errorMsg string + + errorType errors.Error +} + +func (ec *ErrorContext) HandleBackup(err backuppb.Error, storeID int) ErrorStrategy { + if len(err.Msg) != 0 { + return ec.handleErrorMsg(err.Msg, storeID) + } + return ec.handleErrorPb(err, storeID) +} + +func (ec *ErrorContext) handleErrorMsg(msg string, storeID int) ErrorStrategy { + // UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error. + logger := log.L().With(zap.String("scenario", ec.scenario)) + if utils.MessageIsNotFoundStorageError(msg) { + // giveup outside + ec.errorMsg = fmt.Sprintf("File or directory not found on TiKV Node (store id: %v; Address: %s). "+ + "work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.", + storeID) + return GiveUp + } + if utils.MessageIsPermissionDeniedStorageError(msg) { + // giveup outside + ec.errorMsg = fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s). "+ + "work around:please ensure tikv has permission to read from & write to the storage.", + storeID) + return GiveUp + } + + if utils.MessageIsRetryableStorageError(msg) { + logger.Warn("occur storage error", zap.String("error", msg)) + // infinite retry outside + // finite retry outside + ec.errorType = *berrors.ErrStorageUnknown + return Retry + } else { + // retry enough on same store + // if not enough + // return Retry + // if enough + // return GiveUp + } + return GiveUp +} + +func (ec *ErrorContext) handleErrorPb(e backuppb.Error, storeID int) ErrorStrategy { + logger := log.L().With(zap.String("scenario", ec.scenario)) + switch v := e.Detail.(type) { + case *backuppb.Error_KvError: + if ec.canbeIgnore { + return Ignore + } + if lockErr := v.KvError.Locked; lockErr != nil { + logger.Warn("occur kv error", zap.Reflect("error", v)) + // resolve outside X + // ignore outside + ec.errorType = *berrors.ErrBackupKeyIsLocked + return Resolve + } + // Backup should not meet error other than KeyLocked. + // giveup outside X + // ignore outside + ec.errorType = *berrors.ErrKVUnknown + return GiveUp + + case *backuppb.Error_RegionError: + if ec.canbeIgnore { + return Ignore + } + regionErr := v.RegionError + // Ignore following errors. + if !(regionErr.EpochNotMatch != nil || + regionErr.NotLeader != nil || + regionErr.RegionNotFound != nil || + regionErr.ServerIsBusy != nil || + regionErr.StaleCommand != nil || + regionErr.StoreNotMatch != nil || + regionErr.ReadIndexNotReady != nil || + regionErr.ProposalInMergingMode != nil) { + logger.Error("unexpect region error", zap.Reflect("RegionError", regionErr)) + // giveup outside + // ignore outside + ec.errorType = *berrors.ErrKVUnknown + return GiveUp + } + logger.Warn("occur region error", + zap.Reflect("RegionError", regionErr), + zap.Int("storeID", storeID)) + // ignore outside + // finite retry outside + ec.errorType = *berrors.ErrBackupRegion + return Retry + + case *backuppb.Error_ClusterIdError: + logger.Error("occur cluster ID error", zap.Reflect("error", v), zap.Int("storeID", storeID)) + // giveup outside + ec.errorType = *berrors.ErrKVUnknown + return GiveUp + } + return GiveUp +} + // RetryableFunc presents a retryable operation. type RetryableFunc func() error From e7a8eb8a19cfbb8b74c1a59ffd189983f382ad6b Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 16 Nov 2023 15:55:48 +0800 Subject: [PATCH 02/15] import error refactor --- br/pkg/backup/client.go | 51 +++++------------- br/pkg/backup/push.go | 39 ++++---------- br/pkg/utils/backoff.go | 14 +++-- br/pkg/utils/retry.go | 117 ++++++++++++++++------------------------ 4 files changed, 78 insertions(+), 143 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index c19ea82560b10..e7b8d1a595850 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1185,17 +1185,18 @@ func OnBackupResponse( backupTS uint64, lockResolver *txnlock.LockResolver, resp *backuppb.BackupResponse, + errContext *utils.ErrorContext, ) (*backuppb.BackupResponse, int, error) { log.Debug("OnBackupResponse", zap.Reflect("resp", resp)) if resp.Error == nil { return resp, 0, nil } backoffMs := 0 - switch v := resp.Error.Detail.(type) { + + err := resp.Error + switch v := err.Detail.(type) { case *backuppb.Error_KvError: if lockErr := v.KvError.Locked; lockErr != nil { - // Try to resolve lock. - log.Warn("backup occur kv error", zap.Reflect("error", v)) msBeforeExpired, err1 := lockResolver.ResolveLocks( bo, backupTS, []*txnlock.Lock{txnlock.NewLock(lockErr)}) if err1 != nil { @@ -1206,45 +1207,16 @@ func OnBackupResponse( } return nil, backoffMs, nil } - // Backup should not meet error other than KeyLocked. - log.Error("unexpect kv error", zap.Reflect("KvError", v.KvError)) - return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v) - - case *backuppb.Error_RegionError: - regionErr := v.RegionError - // Ignore following errors. - if !(regionErr.EpochNotMatch != nil || - regionErr.NotLeader != nil || - regionErr.RegionNotFound != nil || - regionErr.ServerIsBusy != nil || - regionErr.StaleCommand != nil || - regionErr.StoreNotMatch != nil || - regionErr.ReadIndexNotReady != nil || - regionErr.ProposalInMergingMode != nil) { - log.Error("unexpect region error", zap.Reflect("RegionError", regionErr)) - return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v) - } - log.Warn("backup occur region error", - zap.Reflect("RegionError", regionErr), - zap.Uint64("storeID", storeID)) - // TODO: a better backoff. - backoffMs = 1000 /* 1s */ - return nil, backoffMs, nil - case *backuppb.Error_ClusterIdError: - log.Error("backup occur cluster ID error", zap.Reflect("error", v), zap.Uint64("storeID", storeID)) - return nil, 0, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v on storeID: %d", resp.Error, storeID) default: - // blcok-list - // UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error. - if utils.MessageIsRetryableStorageError(resp.GetError().GetMsg()) { - log.Warn("backup occur storage error", zap.String("error", resp.GetError().GetMsg())) - // back off 3000ms, for S3 is 99.99% available (i.e. the max outage time would less than 52.56mins per year), - // this time would be probably enough for s3 to resume. + res := errContext.HandleErrorOnStore(resp.Error, storeID, false) + switch res.Strategy { + case utils.GiveUp: + return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %s", storeID, res.Reason) + case utils.Retry: return nil, 3000, nil } - log.Error("backup occur unknown error", zap.String("error", resp.Error.GetMsg()), zap.Uint64("storeID", storeID)) - return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "%v on storeID: %d", resp.Error, storeID) } + return nil, 3000, errors.Annotatef(berrors.ErrKVUnknown, "unreachable") } func (bc *Client) handleFineGrained( @@ -1274,12 +1246,13 @@ func (bc *Client) handleFineGrained( } hasProgress := false backoffMill := 0 + errContext := utils.NewErrorContext("handleFineGrainedBackup") err = SendBackup( ctx, storeID, client, req, // Handle responses with the same backoffer. func(resp *backuppb.BackupResponse) error { response, shouldBackoff, err1 := - OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp) + OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp, errContext) if err1 != nil { return err1 } diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 83df6f46ff3a3..d9e0d6b893273 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -4,7 +4,6 @@ package backup import ( "context" - "fmt" "sync" "github.com/opentracing/opentracing-go" @@ -73,6 +72,7 @@ func (push *pushDown) pushBackup( }) wg := new(sync.WaitGroup) + errContext := utils.NewErrorContext("pushBackup") for _, s := range stores { store := s storeID := s.GetId() @@ -183,35 +183,10 @@ func (push *pushDown) pushBackup( progressCallBack(RegionUnit) } else { errPb := resp.GetError() - switch v := errPb.Detail.(type) { - case *backuppb.Error_KvError: - logutil.CL(ctx).Warn("backup occur kv error", zap.Reflect("error", v)) - - case *backuppb.Error_RegionError: - logutil.CL(ctx).Warn("backup occur region error", zap.Reflect("error", v)) - - case *backuppb.Error_ClusterIdError: - logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v)) - return errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb) - default: - if utils.MessageIsRetryableStorageError(errPb.GetMsg()) { - logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg())) - continue - } - var errMsg string - if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) { - errMsg = fmt.Sprintf("File or directory not found on TiKV Node (store id: %v; Address: %s). "+ - "work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.", - store.GetId(), redact.String(store.GetAddress())) - logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg)) - } - if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) { - errMsg = fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s). "+ - "work around:please ensure tikv has permission to read from & write to the storage.", - store.GetId(), redact.String(store.GetAddress())) - logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg)) - } - + res := errContext.HandleErrorOnStore(errPb, store.GetId(), true) + switch res.Strategy { + case utils.GiveUp: + errMsg := res.Reason if len(errMsg) <= 0 { errMsg = errPb.Msg } @@ -220,6 +195,10 @@ func (push *pushDown) pushBackup( redact.String(store.GetAddress()), errMsg, ) + default: + // other type just continue for next response + // and finally handle the range in fineGrainedBackup + continue } } case err := <-push.errCh: diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 8c7712ecb1b1d..584aefaa26365 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -120,28 +120,34 @@ type importerBackoffer struct { attempt int delayTime time.Duration maxDelayTime time.Duration + errContext *ErrorContext } // NewBackoffer creates a new controller regulating a truncated exponential backoff. -func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration) Backoffer { +func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration, errContext *ErrorContext) Backoffer { return &importerBackoffer{ attempt: attempt, delayTime: delayTime, maxDelayTime: maxDelayTime, + errContext: errContext, } } func NewImportSSTBackoffer() Backoffer { - return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval) + errContext := NewErrorContext("import sst", 3) + return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval, errContext) } func NewDownloadSSTBackoffer() Backoffer { - return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval) + errContext := NewErrorContext("download sst", 3) + return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext) } func (bo *importerBackoffer) NextBackoff(err error) time.Duration { log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err)) - if MessageIsRetryableStorageError(err.Error()) { + // we don't care storeID here. + res := bo.errContext.HandleErrorMsg(err.Error(), 0) + if res.Strategy == Retry { bo.delayTime = 2 * bo.delayTime bo.attempt-- } else { diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 19f7065fc6bd5..cf5d357318a2f 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -9,11 +9,9 @@ import ( "sync" "time" - "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" - berrors "github.com/pingcap/tidb/br/pkg/errors" tmysql "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/tikv/client-go/v2/tikv" @@ -38,106 +36,93 @@ var retryableServerError = []string{ "end of file before message length reached", } +type ErrorResult struct { + Strategy ErrorStrategy + Reason string +} + type ErrorStrategy int const ( // This type can be retry but consume the backoffer attempts. Retry ErrorStrategy = iota - // This type can be retry immediately. - LosslessRetry // This type means un-recover error and the whole progress should exits // for example: // 1. permission not valid. // 2. data has not found. // 3. retry too many times GiveUp - // Do nothing - Ignore - // This type means we need do some resolve work before retry. - // current the work is only resolve locks. - Resolve ) type ErrorContext struct { // encounter times for one context on a store // we may use this value to determine the retry policy - encounterTimesOnStore int - // whether need retry - needRetry bool - // whether canbe ignore this error - canbeIgnore bool + encounterTimesOnStore map[uint64]int + // unknown error retry limitation. + // encouter many times error makes Retry to GiveUp. + encounterTimesLimitation int // whether in backup or restore scenario string +} - errorMsg string - - errorType errors.Error +func NewErrorContext(scenario string, limitation int) *ErrorContext { + return &ErrorContext{ + scenario: scenario, + encounterTimesOnStore: make(map[uint64]int), + encounterTimesLimitation: limitation, + } } -func (ec *ErrorContext) HandleBackup(err backuppb.Error, storeID int) ErrorStrategy { +func (ec *ErrorContext) HandleErrorOnStore(err *backuppb.Error, storeID uint64, canIgnore bool) ErrorResult { if len(err.Msg) != 0 { - return ec.handleErrorMsg(err.Msg, storeID) + return ec.HandleErrorMsg(err.Msg, storeID) } - return ec.handleErrorPb(err, storeID) + return ec.handleErrorPb(err, storeID, canIgnore) } -func (ec *ErrorContext) handleErrorMsg(msg string, storeID int) ErrorStrategy { +func (ec *ErrorContext) HandleErrorMsg(msg string, storeID uint64) ErrorResult { // UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error. logger := log.L().With(zap.String("scenario", ec.scenario)) - if utils.MessageIsNotFoundStorageError(msg) { + if MessageIsNotFoundStorageError(msg) { // giveup outside - ec.errorMsg = fmt.Sprintf("File or directory not found on TiKV Node (store id: %v; Address: %s). "+ + reason := fmt.Sprintf("File or directory not found on TiKV Node (store id: %v; Address: %s). "+ "work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.", storeID) - return GiveUp + return ErrorResult{GiveUp, reason} } - if utils.MessageIsPermissionDeniedStorageError(msg) { + if MessageIsPermissionDeniedStorageError(msg) { // giveup outside - ec.errorMsg = fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s). "+ + reason := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s). "+ "work around:please ensure tikv has permission to read from & write to the storage.", storeID) - return GiveUp + return ErrorResult{GiveUp, reason} } - if utils.MessageIsRetryableStorageError(msg) { + if MessageIsRetryableStorageError(msg) { logger.Warn("occur storage error", zap.String("error", msg)) - // infinite retry outside - // finite retry outside - ec.errorType = *berrors.ErrStorageUnknown - return Retry - } else { - // retry enough on same store - // if not enough - // return Retry - // if enough - // return GiveUp + return ErrorResult{Retry, "retrable error"} + } + // retry enough on same store + ec.encounterTimesOnStore[storeID]++ + if ec.encounterTimesOnStore[storeID] < ec.encounterTimesLimitation { + return ErrorResult{Retry, "unknown error, retry it for few times"} } - return GiveUp + return ErrorResult{GiveUp, "unknown error and retry too many times, give up"} } -func (ec *ErrorContext) handleErrorPb(e backuppb.Error, storeID int) ErrorStrategy { +func (ec *ErrorContext) handleErrorPb(e *backuppb.Error, storeID uint64, canIgnore bool) ErrorResult { logger := log.L().With(zap.String("scenario", ec.scenario)) switch v := e.Detail.(type) { case *backuppb.Error_KvError: - if ec.canbeIgnore { - return Ignore + if canIgnore { + return ErrorResult{Retry, "retry it due to setting"} } - if lockErr := v.KvError.Locked; lockErr != nil { - logger.Warn("occur kv error", zap.Reflect("error", v)) - // resolve outside X - // ignore outside - ec.errorType = *berrors.ErrBackupKeyIsLocked - return Resolve - } - // Backup should not meet error other than KeyLocked. - // giveup outside X - // ignore outside - ec.errorType = *berrors.ErrKVUnknown - return GiveUp + // should not meet error other than KeyLocked. + return ErrorResult{GiveUp, "unknown kv error"} case *backuppb.Error_RegionError: - if ec.canbeIgnore { - return Ignore + if canIgnore { + return ErrorResult{Retry, "retry it due to setting"} } regionErr := v.RegionError // Ignore following errors. @@ -150,26 +135,18 @@ func (ec *ErrorContext) handleErrorPb(e backuppb.Error, storeID int) ErrorStrate regionErr.ReadIndexNotReady != nil || regionErr.ProposalInMergingMode != nil) { logger.Error("unexpect region error", zap.Reflect("RegionError", regionErr)) - // giveup outside - // ignore outside - ec.errorType = *berrors.ErrKVUnknown - return GiveUp + return ErrorResult{GiveUp, "unknown kv error"} } logger.Warn("occur region error", zap.Reflect("RegionError", regionErr), - zap.Int("storeID", storeID)) - // ignore outside - // finite retry outside - ec.errorType = *berrors.ErrBackupRegion - return Retry + zap.Uint64("storeID", storeID)) + return ErrorResult{Retry, "retrable error"} case *backuppb.Error_ClusterIdError: - logger.Error("occur cluster ID error", zap.Reflect("error", v), zap.Int("storeID", storeID)) - // giveup outside - ec.errorType = *berrors.ErrKVUnknown - return GiveUp + logger.Error("occur cluster ID error", zap.Reflect("error", v), zap.Uint64("storeID", storeID)) + return ErrorResult{GiveUp, "cluster ID mismatch"} } - return GiveUp + return ErrorResult{GiveUp, "unreachable error"} } // RetryableFunc presents a retryable operation. From 940bb9c8d9c15e695d271de8ae573f62841b8df7 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 16 Nov 2023 16:12:35 +0800 Subject: [PATCH 03/15] update --- br/pkg/backup/client.go | 2 +- br/pkg/backup/push.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index e7b8d1a595850..666a3d480743a 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1246,7 +1246,7 @@ func (bc *Client) handleFineGrained( } hasProgress := false backoffMill := 0 - errContext := utils.NewErrorContext("handleFineGrainedBackup") + errContext := utils.NewErrorContext("handleFineGrainedBackup", 10) err = SendBackup( ctx, storeID, client, req, // Handle responses with the same backoffer. diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index d9e0d6b893273..3f0111409b451 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -72,7 +72,7 @@ func (push *pushDown) pushBackup( }) wg := new(sync.WaitGroup) - errContext := utils.NewErrorContext("pushBackup") + errContext := utils.NewErrorContext("pushBackup", 10) for _, s := range stores { store := s storeID := s.GetId() From b7f1d3a04d9d3fdc7ebc0f74079226ebcb2b494d Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 16 Nov 2023 17:32:41 +0800 Subject: [PATCH 04/15] udpate --- br/pkg/backup/client.go | 2 +- br/pkg/backup/push.go | 2 +- br/pkg/utils/permission.go | 6 +++--- br/pkg/utils/retry.go | 16 +++++++--------- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 666a3d480743a..85f287cc15be3 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1208,7 +1208,7 @@ func OnBackupResponse( return nil, backoffMs, nil } default: - res := errContext.HandleErrorOnStore(resp.Error, storeID, false) + res := errContext.HandleError(resp.Error, storeID, false) switch res.Strategy { case utils.GiveUp: return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %s", storeID, res.Reason) diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 3f0111409b451..9c471c5c7e1c6 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -183,7 +183,7 @@ func (push *pushDown) pushBackup( progressCallBack(RegionUnit) } else { errPb := resp.GetError() - res := errContext.HandleErrorOnStore(errPb, store.GetId(), true) + res := errContext.HandleError(errPb, store.GetId(), true) switch res.Strategy { case utils.GiveUp: errMsg := res.Reason diff --git a/br/pkg/utils/permission.go b/br/pkg/utils/permission.go index e18c28dbbbe1c..3c0795db11c47 100644 --- a/br/pkg/utils/permission.go +++ b/br/pkg/utils/permission.go @@ -7,14 +7,14 @@ var ( permissionDeniedMsg = "permissiondenied" ) -// MessageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error -func MessageIsNotFoundStorageError(msg string) bool { +// messageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error +func messageIsNotFoundStorageError(msg string) bool { msgLower := strings.ToLower(msg) return strings.Contains(msgLower, "io") && strings.Contains(msgLower, ioNotFoundMsg) } // MessageIsPermissionDeniedStorageError checks whether the message returning from TiKV is "PermissionDenied" storage I/O error -func MessageIsPermissionDeniedStorageError(msg string) bool { +func messageIsPermissionDeniedStorageError(msg string) bool { msgLower := strings.ToLower(msg) return strings.Contains(msgLower, permissionDeniedMsg) } diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index cf5d357318a2f..65ac002436373 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -57,7 +57,7 @@ const ( type ErrorContext struct { // encounter times for one context on a store // we may use this value to determine the retry policy - encounterTimesOnStore map[uint64]int + encounterTimes map[uint64]int // unknown error retry limitation. // encouter many times error makes Retry to GiveUp. encounterTimesLimitation int @@ -68,12 +68,12 @@ type ErrorContext struct { func NewErrorContext(scenario string, limitation int) *ErrorContext { return &ErrorContext{ scenario: scenario, - encounterTimesOnStore: make(map[uint64]int), + encounterTimes: make(map[uint64]int), encounterTimesLimitation: limitation, } } -func (ec *ErrorContext) HandleErrorOnStore(err *backuppb.Error, storeID uint64, canIgnore bool) ErrorResult { +func (ec *ErrorContext) HandleError(err *backuppb.Error, storeID uint64, canIgnore bool) ErrorResult { if len(err.Msg) != 0 { return ec.HandleErrorMsg(err.Msg, storeID) } @@ -83,15 +83,13 @@ func (ec *ErrorContext) HandleErrorOnStore(err *backuppb.Error, storeID uint64, func (ec *ErrorContext) HandleErrorMsg(msg string, storeID uint64) ErrorResult { // UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error. logger := log.L().With(zap.String("scenario", ec.scenario)) - if MessageIsNotFoundStorageError(msg) { - // giveup outside + if messageIsNotFoundStorageError(msg) { reason := fmt.Sprintf("File or directory not found on TiKV Node (store id: %v; Address: %s). "+ "work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.", storeID) return ErrorResult{GiveUp, reason} } - if MessageIsPermissionDeniedStorageError(msg) { - // giveup outside + if messageIsPermissionDeniedStorageError(msg) { reason := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s). "+ "work around:please ensure tikv has permission to read from & write to the storage.", storeID) @@ -103,8 +101,8 @@ func (ec *ErrorContext) HandleErrorMsg(msg string, storeID uint64) ErrorResult { return ErrorResult{Retry, "retrable error"} } // retry enough on same store - ec.encounterTimesOnStore[storeID]++ - if ec.encounterTimesOnStore[storeID] < ec.encounterTimesLimitation { + ec.encounterTimes[storeID]++ + if ec.encounterTimes[storeID] < ec.encounterTimesLimitation { return ErrorResult{Retry, "unknown error, retry it for few times"} } return ErrorResult{GiveUp, "unknown error and retry too many times, give up"} From 7912ea0d4e9f8a83b0db0ff32deb9223ba58db7b Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 16 Nov 2023 21:29:58 +0800 Subject: [PATCH 05/15] lock --- br/pkg/utils/retry.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 65ac002436373..f4d286d0f2494 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -55,6 +55,7 @@ const ( ) type ErrorContext struct { + mu sync.Mutex // encounter times for one context on a store // we may use this value to determine the retry policy encounterTimes map[uint64]int @@ -73,26 +74,26 @@ func NewErrorContext(scenario string, limitation int) *ErrorContext { } } -func (ec *ErrorContext) HandleError(err *backuppb.Error, storeID uint64, canIgnore bool) ErrorResult { +func (ec *ErrorContext) HandleError(err *backuppb.Error, uuid uint64, canIgnore bool) ErrorResult { if len(err.Msg) != 0 { - return ec.HandleErrorMsg(err.Msg, storeID) + return ec.HandleErrorMsg(err.Msg, uuid) } - return ec.handleErrorPb(err, storeID, canIgnore) + return ec.handleErrorPb(err, uuid, canIgnore) } -func (ec *ErrorContext) HandleErrorMsg(msg string, storeID uint64) ErrorResult { +func (ec *ErrorContext) HandleErrorMsg(msg string, uuid uint64) ErrorResult { // UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error. logger := log.L().With(zap.String("scenario", ec.scenario)) if messageIsNotFoundStorageError(msg) { - reason := fmt.Sprintf("File or directory not found on TiKV Node (store id: %v; Address: %s). "+ + reason := fmt.Sprintf("File or directory not found on TiKV Node (store id: %v). "+ "work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.", - storeID) + uuid) return ErrorResult{GiveUp, reason} } if messageIsPermissionDeniedStorageError(msg) { - reason := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s). "+ + reason := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v). "+ "work around:please ensure tikv has permission to read from & write to the storage.", - storeID) + uuid) return ErrorResult{GiveUp, reason} } @@ -101,26 +102,31 @@ func (ec *ErrorContext) HandleErrorMsg(msg string, storeID uint64) ErrorResult { return ErrorResult{Retry, "retrable error"} } // retry enough on same store - ec.encounterTimes[storeID]++ - if ec.encounterTimes[storeID] < ec.encounterTimesLimitation { + mu.Lock() + defer mu.Unlock() + ec.encounterTimes[uuid]++ + if ec.encounterTimes[uuid] < ec.encounterTimesLimitation { return ErrorResult{Retry, "unknown error, retry it for few times"} } return ErrorResult{GiveUp, "unknown error and retry too many times, give up"} } -func (ec *ErrorContext) handleErrorPb(e *backuppb.Error, storeID uint64, canIgnore bool) ErrorResult { +func (ec *ErrorContext) handleErrorPb(e *backuppb.Error, uuid uint64, canIgnore bool) ErrorResult { + if e == nil { + return ErrorResult{Retry, "unreachable code"} + } logger := log.L().With(zap.String("scenario", ec.scenario)) switch v := e.Detail.(type) { case *backuppb.Error_KvError: if canIgnore { - return ErrorResult{Retry, "retry it due to setting"} + return ErrorResult{Retry, "retry outside because the error can be ignored"} } // should not meet error other than KeyLocked. return ErrorResult{GiveUp, "unknown kv error"} case *backuppb.Error_RegionError: if canIgnore { - return ErrorResult{Retry, "retry it due to setting"} + return ErrorResult{Retry, "retry outside because the error can be ignored"} } regionErr := v.RegionError // Ignore following errors. @@ -144,7 +150,7 @@ func (ec *ErrorContext) handleErrorPb(e *backuppb.Error, storeID uint64, canIgno logger.Error("occur cluster ID error", zap.Reflect("error", v), zap.Uint64("storeID", storeID)) return ErrorResult{GiveUp, "cluster ID mismatch"} } - return ErrorResult{GiveUp, "unreachable error"} + return ErrorResult{GiveUp, "unreachable code"} } // RetryableFunc presents a retryable operation. From 4a6b938b0fb711ac248b5d0d5bd1220f4ded2415 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 5 Dec 2023 14:24:19 +0800 Subject: [PATCH 06/15] update test --- br/pkg/utils/backoff_test.go | 8 ++++---- br/pkg/utils/retry.go | 15 +++++++++++---- br/pkg/utils/retry_test.go | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 8 deletions(-) diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index bd0675007f6e4..a56c961aa21f0 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -18,7 +18,7 @@ import ( func TestBackoffWithSuccess(t *testing.T) { var counter int - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond) + backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() switch counter { @@ -37,7 +37,7 @@ func TestBackoffWithSuccess(t *testing.T) { func TestBackoffWithFatalError(t *testing.T) { var counter int - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond) + backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) gRPCError := status.Error(codes.Unavailable, "transport is closing") err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() @@ -65,7 +65,7 @@ func TestBackoffWithFatalError(t *testing.T) { func TestBackoffWithFatalRawGRPCError(t *testing.T) { var counter int canceledError := status.Error(codes.Canceled, "context canceled") - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond) + backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() return canceledError // nolint:wrapcheck @@ -76,7 +76,7 @@ func TestBackoffWithFatalRawGRPCError(t *testing.T) { func TestBackoffWithRetryableError(t *testing.T) { var counter int - backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond) + backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) err := utils.WithRetry(context.Background(), func() error { defer func() { counter++ }() return berrors.ErrKVEpochNotMatch diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index f4d286d0f2494..7de207fa3fbc2 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -74,11 +74,18 @@ func NewErrorContext(scenario string, limitation int) *ErrorContext { } } +func NewDefaultContext() *ErrorContext { + return &ErrorContext{ + encounterTimes: make(map[uint64]int), + encounterTimesLimitation: 1, + } +} + func (ec *ErrorContext) HandleError(err *backuppb.Error, uuid uint64, canIgnore bool) ErrorResult { if len(err.Msg) != 0 { return ec.HandleErrorMsg(err.Msg, uuid) } - return ec.handleErrorPb(err, uuid, canIgnore) + return ec.HandleErrorPb(err, uuid, canIgnore) } func (ec *ErrorContext) HandleErrorMsg(msg string, uuid uint64) ErrorResult { @@ -111,7 +118,7 @@ func (ec *ErrorContext) HandleErrorMsg(msg string, uuid uint64) ErrorResult { return ErrorResult{GiveUp, "unknown error and retry too many times, give up"} } -func (ec *ErrorContext) handleErrorPb(e *backuppb.Error, uuid uint64, canIgnore bool) ErrorResult { +func (ec *ErrorContext) HandleErrorPb(e *backuppb.Error, uuid uint64, canIgnore bool) ErrorResult { if e == nil { return ErrorResult{Retry, "unreachable code"} } @@ -143,11 +150,11 @@ func (ec *ErrorContext) handleErrorPb(e *backuppb.Error, uuid uint64, canIgnore } logger.Warn("occur region error", zap.Reflect("RegionError", regionErr), - zap.Uint64("storeID", storeID)) + zap.Uint64("uuid", uuid)) return ErrorResult{Retry, "retrable error"} case *backuppb.Error_ClusterIdError: - logger.Error("occur cluster ID error", zap.Reflect("error", v), zap.Uint64("storeID", storeID)) + logger.Error("occur cluster ID error", zap.Reflect("error", v), zap.Uint64("uuid", uuid)) return ErrorResult{GiveUp, "cluster ID mismatch"} } return ErrorResult{GiveUp, "unreachable code"} diff --git a/br/pkg/utils/retry_test.go b/br/pkg/utils/retry_test.go index eeef8c61c0480..463c45489913b 100644 --- a/br/pkg/utils/retry_test.go +++ b/br/pkg/utils/retry_test.go @@ -9,6 +9,8 @@ import ( "time" "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/tidb/br/pkg/utils" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" @@ -47,3 +49,36 @@ func TestRetryAdapter(t *testing.T) { req.Greater(time.Since(begin), 200*time.Millisecond) } + +func TestHandleErrorPb(t *testing.T) { + ec := utils.NewErrorContext("test", 3) + // Test case 1: Error is nil + result := ec.HandleErrorPb(nil, 123, false) + require.Equal(t, utils.ErrorResult{utils.Retry, "unreachable code"}, result) + + // Test case 2: Error is KvError and can be ignored + kvError := &backuppb.Error_KvError{} + result = ec.HandleErrorPb(&backuppb.Error{Detail: kvError}, 123, true) + require.Equal(t, utils.ErrorResult{utils.Retry, "retry outside because the error can be ignored"}, result) + + // Test case 3: Error is KvError and cannot be ignored + result = ec.HandleErrorPb(&backuppb.Error{Detail: kvError}, 123, false) + require.Equal(t, utils.ErrorResult{utils.GiveUp, "unknown kv error"}, result) + + // Test case 4: Error is RegionError and can be ignored + regionError := &backuppb.Error_RegionError{ + RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{RegionId: 1}}} + result = ec.HandleErrorPb(&backuppb.Error{Detail: regionError}, 123, true) + require.Equal(t, utils.ErrorResult{utils.Retry, "retry outside because the error can be ignored"}, result) + + // Test case 5: Error is RegionError and cannot be ignored + regionError = &backuppb.Error_RegionError{ + RegionError: &errorpb.Error{DiskFull: &errorpb.DiskFull{}}} + result = ec.HandleErrorPb(&backuppb.Error{Detail: regionError}, 123, false) + require.Equal(t, utils.ErrorResult{utils.GiveUp, "unknown kv error"}, result) + + // Test case 6: Error is ClusterIdError + clusterIdError := &backuppb.Error_ClusterIdError{} + result = ec.HandleErrorPb(&backuppb.Error{Detail: clusterIdError}, 123, false) + require.Equal(t, utils.ErrorResult{utils.GiveUp, "cluster ID mismatch"}, result) +} From 10e22af3512d19ea43f9d159ef69c7051509d667 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 5 Dec 2023 14:42:15 +0800 Subject: [PATCH 07/15] add more test --- br/pkg/utils/retry_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/br/pkg/utils/retry_test.go b/br/pkg/utils/retry_test.go index 463c45489913b..172de0e065d66 100644 --- a/br/pkg/utils/retry_test.go +++ b/br/pkg/utils/retry_test.go @@ -82,3 +82,41 @@ func TestHandleErrorPb(t *testing.T) { result = ec.HandleErrorPb(&backuppb.Error{Detail: clusterIdError}, 123, false) require.Equal(t, utils.ErrorResult{utils.GiveUp, "cluster ID mismatch"}, result) } + +func TestHandleErrorMsg(t *testing.T) { + ec := utils.NewErrorContext("test", 3) + + // Test messageIsNotFoundStorageError + msg := "IO: files Notfound error" + uuid := uint64(456) + expectedReason := "File or directory not found on TiKV Node (store id: 456). work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid." + expectedResult := utils.ErrorResult{utils.GiveUp, expectedReason} + actualResult := ec.HandleErrorMsg(msg, uuid) + require.Equal(t, expectedResult, actualResult) + + // Test messageIsPermissionDeniedStorageError + msg = "I/O permissiondenied error occurs on TiKV Node(store id: 456)." + expectedReason = "I/O permission denied error occurs on TiKV Node(store id: 456). work around:please ensure tikv has permission to read from & write to the storage." + expectedResult = utils.ErrorResult{utils.GiveUp, expectedReason} + actualResult = ec.HandleErrorMsg(msg, uuid) + require.Equal(t, expectedResult, actualResult) + + // Test MessageIsRetryableStorageError + msg = "server closed" + expectedResult = utils.ErrorResult{utils.Retry, "retrable error"} + actualResult = ec.HandleErrorMsg(msg, uuid) + require.Equal(t, expectedResult, actualResult) + + // Test unknown error + msg = "unknown error" + expectedResult = utils.ErrorResult{utils.Retry, "unknown error, retry it for few times"} + actualResult = ec.HandleErrorMsg(msg, uuid) + require.Equal(t, expectedResult, actualResult) + + // Test retry too many times + actualResult = ec.HandleErrorMsg(msg, uuid) + actualResult = ec.HandleErrorMsg(msg, uuid) + expectedResult = utils.ErrorResult{utils.GiveUp, "unknown error and retry too many times, give up"} + actualResult = ec.HandleErrorMsg(msg, uuid) + require.Equal(t, expectedResult, actualResult) +} From 1405f58adcc472b736d25cac9f0e8607912cb041 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 5 Dec 2023 15:07:55 +0800 Subject: [PATCH 08/15] more test --- br/pkg/utils/backoff.go | 6 +++--- br/pkg/utils/backoff_test.go | 18 ++++++++++++++++++ br/pkg/utils/retry.go | 3 ++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 584aefaa26365..e6befb9660194 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -157,7 +157,7 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration { bo.delayTime = 2 * bo.delayTime bo.attempt-- case berrors.ErrKVRangeIsEmpty, berrors.ErrKVRewriteRuleNotFound: - // Excepted error, finish the operation + // Expected error, finish the operation bo.delayTime = 0 bo.attempt = 0 default: @@ -166,10 +166,10 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration { bo.delayTime = 2 * bo.delayTime bo.attempt-- default: - // Unexcepted error + // Unexpected error bo.delayTime = 0 bo.attempt = 0 - log.Warn("unexcepted error, stop to retry", zap.Error(err)) + log.Warn("unexpected error, stop retrying", zap.Error(err)) } } } diff --git a/br/pkg/utils/backoff_test.go b/br/pkg/utils/backoff_test.go index a56c961aa21f0..857010bfc871a 100644 --- a/br/pkg/utils/backoff_test.go +++ b/br/pkg/utils/backoff_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/pingcap/errors" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/utils" "github.com/stretchr/testify/require" @@ -35,6 +36,23 @@ func TestBackoffWithSuccess(t *testing.T) { require.NoError(t, err) } +func TestBackoffWithUnknowneErrorSuccess(t *testing.T) { + var counter int + backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) + err := utils.WithRetry(context.Background(), func() error { + defer func() { counter++ }() + switch counter { + case 0: + return errors.New("unknown error: not in the allow list") + case 1: + return berrors.ErrKVEpochNotMatch + } + return nil + }, backoffer) + require.Equal(t, 3, counter) + require.NoError(t, err) +} + func TestBackoffWithFatalError(t *testing.T) { var counter int backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext()) diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 7de207fa3fbc2..77fe16588ad56 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -76,6 +76,7 @@ func NewErrorContext(scenario string, limitation int) *ErrorContext { func NewDefaultContext() *ErrorContext { return &ErrorContext{ + scenario: "default", encounterTimes: make(map[uint64]int), encounterTimesLimitation: 1, } @@ -112,7 +113,7 @@ func (ec *ErrorContext) HandleErrorMsg(msg string, uuid uint64) ErrorResult { mu.Lock() defer mu.Unlock() ec.encounterTimes[uuid]++ - if ec.encounterTimes[uuid] < ec.encounterTimesLimitation { + if ec.encounterTimes[uuid] <= ec.encounterTimesLimitation { return ErrorResult{Retry, "unknown error, retry it for few times"} } return ErrorResult{GiveUp, "unknown error and retry too many times, give up"} From 6a65e406ef3f642c8429bb1dfa66b75c94366729 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 5 Dec 2023 18:12:48 +0800 Subject: [PATCH 09/15] address comment --- br/pkg/backup/client.go | 4 ++-- br/pkg/backup/push.go | 2 +- br/pkg/utils/backoff.go | 2 +- br/pkg/utils/retry.go | 30 +++++++++++++++--------------- br/pkg/utils/retry_test.go | 22 +++++++++++----------- 5 files changed, 30 insertions(+), 30 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 85f287cc15be3..625d16d3ef850 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1210,9 +1210,9 @@ func OnBackupResponse( default: res := errContext.HandleError(resp.Error, storeID, false) switch res.Strategy { - case utils.GiveUp: + case utils.GiveUpStrategy: return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %s", storeID, res.Reason) - case utils.Retry: + case utils.RetryStrategy: return nil, 3000, nil } } diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index 9c471c5c7e1c6..e29f3e1b35754 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -185,7 +185,7 @@ func (push *pushDown) pushBackup( errPb := resp.GetError() res := errContext.HandleError(errPb, store.GetId(), true) switch res.Strategy { - case utils.GiveUp: + case utils.GiveUpStrategy: errMsg := res.Reason if len(errMsg) <= 0 { errMsg = errPb.Msg diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index e6befb9660194..8f872e46a2b3b 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -147,7 +147,7 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration { log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err)) // we don't care storeID here. res := bo.errContext.HandleErrorMsg(err.Error(), 0) - if res.Strategy == Retry { + if res.Strategy == RetryStrategy { bo.delayTime = 2 * bo.delayTime bo.attempt-- } else { diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 77fe16588ad56..9467f33bf966d 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -45,13 +45,13 @@ type ErrorStrategy int const ( // This type can be retry but consume the backoffer attempts. - Retry ErrorStrategy = iota + RetryStrategy ErrorStrategy = iota // This type means un-recover error and the whole progress should exits // for example: // 1. permission not valid. // 2. data has not found. // 3. retry too many times - GiveUp + GiveUpStrategy ) type ErrorContext struct { @@ -96,45 +96,45 @@ func (ec *ErrorContext) HandleErrorMsg(msg string, uuid uint64) ErrorResult { reason := fmt.Sprintf("File or directory not found on TiKV Node (store id: %v). "+ "work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.", uuid) - return ErrorResult{GiveUp, reason} + return ErrorResult{GiveUpStrategy, reason} } if messageIsPermissionDeniedStorageError(msg) { reason := fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v). "+ "work around:please ensure tikv has permission to read from & write to the storage.", uuid) - return ErrorResult{GiveUp, reason} + return ErrorResult{GiveUpStrategy, reason} } if MessageIsRetryableStorageError(msg) { logger.Warn("occur storage error", zap.String("error", msg)) - return ErrorResult{Retry, "retrable error"} + return ErrorResult{RetryStrategy, "retrable error"} } // retry enough on same store mu.Lock() defer mu.Unlock() ec.encounterTimes[uuid]++ if ec.encounterTimes[uuid] <= ec.encounterTimesLimitation { - return ErrorResult{Retry, "unknown error, retry it for few times"} + return ErrorResult{RetryStrategy, "unknown error, retry it for few times"} } - return ErrorResult{GiveUp, "unknown error and retry too many times, give up"} + return ErrorResult{GiveUpStrategy, "unknown error and retry too many times, give up"} } func (ec *ErrorContext) HandleErrorPb(e *backuppb.Error, uuid uint64, canIgnore bool) ErrorResult { if e == nil { - return ErrorResult{Retry, "unreachable code"} + return ErrorResult{RetryStrategy, "unreachable code"} } logger := log.L().With(zap.String("scenario", ec.scenario)) switch v := e.Detail.(type) { case *backuppb.Error_KvError: if canIgnore { - return ErrorResult{Retry, "retry outside because the error can be ignored"} + return ErrorResult{RetryStrategy, "retry outside because the error can be ignored"} } // should not meet error other than KeyLocked. - return ErrorResult{GiveUp, "unknown kv error"} + return ErrorResult{GiveUpStrategy, "unknown kv error"} case *backuppb.Error_RegionError: if canIgnore { - return ErrorResult{Retry, "retry outside because the error can be ignored"} + return ErrorResult{RetryStrategy, "retry outside because the error can be ignored"} } regionErr := v.RegionError // Ignore following errors. @@ -147,18 +147,18 @@ func (ec *ErrorContext) HandleErrorPb(e *backuppb.Error, uuid uint64, canIgnore regionErr.ReadIndexNotReady != nil || regionErr.ProposalInMergingMode != nil) { logger.Error("unexpect region error", zap.Reflect("RegionError", regionErr)) - return ErrorResult{GiveUp, "unknown kv error"} + return ErrorResult{GiveUpStrategy, "unknown kv error"} } logger.Warn("occur region error", zap.Reflect("RegionError", regionErr), zap.Uint64("uuid", uuid)) - return ErrorResult{Retry, "retrable error"} + return ErrorResult{RetryStrategy, "retrable error"} case *backuppb.Error_ClusterIdError: logger.Error("occur cluster ID error", zap.Reflect("error", v), zap.Uint64("uuid", uuid)) - return ErrorResult{GiveUp, "cluster ID mismatch"} + return ErrorResult{GiveUpStrategy, "cluster ID mismatch"} } - return ErrorResult{GiveUp, "unreachable code"} + return ErrorResult{GiveUpStrategy, "unreachable code"} } // RetryableFunc presents a retryable operation. diff --git a/br/pkg/utils/retry_test.go b/br/pkg/utils/retry_test.go index 172de0e065d66..61af1fe3689de 100644 --- a/br/pkg/utils/retry_test.go +++ b/br/pkg/utils/retry_test.go @@ -54,33 +54,33 @@ func TestHandleErrorPb(t *testing.T) { ec := utils.NewErrorContext("test", 3) // Test case 1: Error is nil result := ec.HandleErrorPb(nil, 123, false) - require.Equal(t, utils.ErrorResult{utils.Retry, "unreachable code"}, result) + require.Equal(t, utils.ErrorResult{utils.RetryStrategy, "unreachable code"}, result) // Test case 2: Error is KvError and can be ignored kvError := &backuppb.Error_KvError{} result = ec.HandleErrorPb(&backuppb.Error{Detail: kvError}, 123, true) - require.Equal(t, utils.ErrorResult{utils.Retry, "retry outside because the error can be ignored"}, result) + require.Equal(t, utils.ErrorResult{utils.RetryStrategy, "retry outside because the error can be ignored"}, result) // Test case 3: Error is KvError and cannot be ignored result = ec.HandleErrorPb(&backuppb.Error{Detail: kvError}, 123, false) - require.Equal(t, utils.ErrorResult{utils.GiveUp, "unknown kv error"}, result) + require.Equal(t, utils.ErrorResult{utils.GiveUpStrategy, "unknown kv error"}, result) // Test case 4: Error is RegionError and can be ignored regionError := &backuppb.Error_RegionError{ RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{RegionId: 1}}} result = ec.HandleErrorPb(&backuppb.Error{Detail: regionError}, 123, true) - require.Equal(t, utils.ErrorResult{utils.Retry, "retry outside because the error can be ignored"}, result) + require.Equal(t, utils.ErrorResult{utils.RetryStrategy, "retry outside because the error can be ignored"}, result) // Test case 5: Error is RegionError and cannot be ignored regionError = &backuppb.Error_RegionError{ RegionError: &errorpb.Error{DiskFull: &errorpb.DiskFull{}}} result = ec.HandleErrorPb(&backuppb.Error{Detail: regionError}, 123, false) - require.Equal(t, utils.ErrorResult{utils.GiveUp, "unknown kv error"}, result) + require.Equal(t, utils.ErrorResult{utils.GiveUpStrategy, "unknown kv error"}, result) // Test case 6: Error is ClusterIdError clusterIdError := &backuppb.Error_ClusterIdError{} result = ec.HandleErrorPb(&backuppb.Error{Detail: clusterIdError}, 123, false) - require.Equal(t, utils.ErrorResult{utils.GiveUp, "cluster ID mismatch"}, result) + require.Equal(t, utils.ErrorResult{utils.GiveUpStrategy, "cluster ID mismatch"}, result) } func TestHandleErrorMsg(t *testing.T) { @@ -90,33 +90,33 @@ func TestHandleErrorMsg(t *testing.T) { msg := "IO: files Notfound error" uuid := uint64(456) expectedReason := "File or directory not found on TiKV Node (store id: 456). work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid." - expectedResult := utils.ErrorResult{utils.GiveUp, expectedReason} + expectedResult := utils.ErrorResult{utils.GiveUpStrategy, expectedReason} actualResult := ec.HandleErrorMsg(msg, uuid) require.Equal(t, expectedResult, actualResult) // Test messageIsPermissionDeniedStorageError msg = "I/O permissiondenied error occurs on TiKV Node(store id: 456)." expectedReason = "I/O permission denied error occurs on TiKV Node(store id: 456). work around:please ensure tikv has permission to read from & write to the storage." - expectedResult = utils.ErrorResult{utils.GiveUp, expectedReason} + expectedResult = utils.ErrorResult{utils.GiveUpStrategy, expectedReason} actualResult = ec.HandleErrorMsg(msg, uuid) require.Equal(t, expectedResult, actualResult) // Test MessageIsRetryableStorageError msg = "server closed" - expectedResult = utils.ErrorResult{utils.Retry, "retrable error"} + expectedResult = utils.ErrorResult{utils.RetryStrategy, "retrable error"} actualResult = ec.HandleErrorMsg(msg, uuid) require.Equal(t, expectedResult, actualResult) // Test unknown error msg = "unknown error" - expectedResult = utils.ErrorResult{utils.Retry, "unknown error, retry it for few times"} + expectedResult = utils.ErrorResult{utils.RetryStrategy, "unknown error, retry it for few times"} actualResult = ec.HandleErrorMsg(msg, uuid) require.Equal(t, expectedResult, actualResult) // Test retry too many times actualResult = ec.HandleErrorMsg(msg, uuid) actualResult = ec.HandleErrorMsg(msg, uuid) - expectedResult = utils.ErrorResult{utils.GiveUp, "unknown error and retry too many times, give up"} + expectedResult = utils.ErrorResult{utils.GiveUpStrategy, "unknown error and retry too many times, give up"} actualResult = ec.HandleErrorMsg(msg, uuid) require.Equal(t, expectedResult, actualResult) } From 26819c9435284ea216937c3aa55c5096417e41c3 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 6 Dec 2023 17:59:38 +0800 Subject: [PATCH 10/15] address comment --- br/pkg/backup/client.go | 2 +- br/pkg/backup/push.go | 2 +- br/pkg/utils/retry.go | 50 +++++++++++++++++++++++++++----------- br/pkg/utils/retry_test.go | 16 ++++++------ 4 files changed, 46 insertions(+), 24 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 625d16d3ef850..320a1d8ad9288 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -1208,7 +1208,7 @@ func OnBackupResponse( return nil, backoffMs, nil } default: - res := errContext.HandleError(resp.Error, storeID, false) + res := errContext.HandleError(resp.Error, storeID) switch res.Strategy { case utils.GiveUpStrategy: return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %s", storeID, res.Reason) diff --git a/br/pkg/backup/push.go b/br/pkg/backup/push.go index e29f3e1b35754..302d810951528 100644 --- a/br/pkg/backup/push.go +++ b/br/pkg/backup/push.go @@ -183,7 +183,7 @@ func (push *pushDown) pushBackup( progressCallBack(RegionUnit) } else { errPb := resp.GetError() - res := errContext.HandleError(errPb, store.GetId(), true) + res := errContext.HandleIgnorableError(errPb, store.GetId()) switch res.Strategy { case utils.GiveUpStrategy: errMsg := res.Reason diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 9467f33bf966d..48de42eb02415 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -46,12 +46,14 @@ type ErrorStrategy int const ( // This type can be retry but consume the backoffer attempts. RetryStrategy ErrorStrategy = iota - // This type means un-recover error and the whole progress should exits + // This type means unrecoverable error and the whole progress should exits // for example: // 1. permission not valid. // 2. data has not found. // 3. retry too many times GiveUpStrategy + // This type represents Unknown error + UnknownStrategy ) type ErrorContext struct { @@ -82,11 +84,28 @@ func NewDefaultContext() *ErrorContext { } } -func (ec *ErrorContext) HandleError(err *backuppb.Error, uuid uint64, canIgnore bool) ErrorResult { - if len(err.Msg) != 0 { +func (ec *ErrorContext) HandleError(err *backuppb.Error, uuid uint64) ErrorResult { + if err == nil { + return ErrorResult{RetryStrategy, "unreachable retry"} + } + res := ec.handleErrorPb(err, uuid) + // try the best effort to save progress from error here + if res.Strategy == UnknownStrategy && len(err.Msg) != 0 { return ec.HandleErrorMsg(err.Msg, uuid) } - return ec.HandleErrorPb(err, uuid, canIgnore) + return res +} + +func (ec *ErrorContext) HandleIgnorableError(err *backuppb.Error, uuid uint64) ErrorResult { + if err == nil { + return ErrorResult{RetryStrategy, "unreachable retry"} + } + res := ec.handleIgnorableErrorPb(err, uuid) + // try the best effort to save progress from error here + if res.Strategy == UnknownStrategy && len(err.Msg) != 0 { + return ec.HandleErrorMsg(err.Msg, uuid) + } + return res } func (ec *ErrorContext) HandleErrorMsg(msg string, uuid uint64) ErrorResult { @@ -119,23 +138,26 @@ func (ec *ErrorContext) HandleErrorMsg(msg string, uuid uint64) ErrorResult { return ErrorResult{GiveUpStrategy, "unknown error and retry too many times, give up"} } -func (ec *ErrorContext) HandleErrorPb(e *backuppb.Error, uuid uint64, canIgnore bool) ErrorResult { - if e == nil { - return ErrorResult{RetryStrategy, "unreachable code"} +func (ec *ErrorContext) handleIgnorableErrorPb(e *backuppb.Error, uuid uint64) ErrorResult { + switch e.Detail.(type) { + case *backuppb.Error_KvError: + return ErrorResult{RetryStrategy, "retry outside because the error can be ignored"} + case *backuppb.Error_RegionError: + return ErrorResult{RetryStrategy, "retry outside because the error can be ignored"} + case *backuppb.Error_ClusterIdError: + return ErrorResult{GiveUpStrategy, "cluster ID mismatch"} } + return ErrorResult{UnknownStrategy, "unreachable code"} +} + +func (ec *ErrorContext) handleErrorPb(e *backuppb.Error, uuid uint64) ErrorResult { logger := log.L().With(zap.String("scenario", ec.scenario)) switch v := e.Detail.(type) { case *backuppb.Error_KvError: - if canIgnore { - return ErrorResult{RetryStrategy, "retry outside because the error can be ignored"} - } // should not meet error other than KeyLocked. return ErrorResult{GiveUpStrategy, "unknown kv error"} case *backuppb.Error_RegionError: - if canIgnore { - return ErrorResult{RetryStrategy, "retry outside because the error can be ignored"} - } regionErr := v.RegionError // Ignore following errors. if !(regionErr.EpochNotMatch != nil || @@ -158,7 +180,7 @@ func (ec *ErrorContext) HandleErrorPb(e *backuppb.Error, uuid uint64, canIgnore logger.Error("occur cluster ID error", zap.Reflect("error", v), zap.Uint64("uuid", uuid)) return ErrorResult{GiveUpStrategy, "cluster ID mismatch"} } - return ErrorResult{GiveUpStrategy, "unreachable code"} + return ErrorResult{UnknownStrategy, "unreachable code"} } // RetryableFunc presents a retryable operation. diff --git a/br/pkg/utils/retry_test.go b/br/pkg/utils/retry_test.go index 61af1fe3689de..26aa20f4d1d02 100644 --- a/br/pkg/utils/retry_test.go +++ b/br/pkg/utils/retry_test.go @@ -50,36 +50,36 @@ func TestRetryAdapter(t *testing.T) { req.Greater(time.Since(begin), 200*time.Millisecond) } -func TestHandleErrorPb(t *testing.T) { +func TestHandleError(t *testing.T) { ec := utils.NewErrorContext("test", 3) // Test case 1: Error is nil - result := ec.HandleErrorPb(nil, 123, false) - require.Equal(t, utils.ErrorResult{utils.RetryStrategy, "unreachable code"}, result) + result := ec.HandleError(nil, 123) + require.Equal(t, utils.ErrorResult{utils.RetryStrategy, "unreachable retry"}, result) // Test case 2: Error is KvError and can be ignored kvError := &backuppb.Error_KvError{} - result = ec.HandleErrorPb(&backuppb.Error{Detail: kvError}, 123, true) + result = ec.HandleIgnorableError(&backuppb.Error{Detail: kvError}, 123) require.Equal(t, utils.ErrorResult{utils.RetryStrategy, "retry outside because the error can be ignored"}, result) // Test case 3: Error is KvError and cannot be ignored - result = ec.HandleErrorPb(&backuppb.Error{Detail: kvError}, 123, false) + result = ec.HandleError(&backuppb.Error{Detail: kvError}, 123) require.Equal(t, utils.ErrorResult{utils.GiveUpStrategy, "unknown kv error"}, result) // Test case 4: Error is RegionError and can be ignored regionError := &backuppb.Error_RegionError{ RegionError: &errorpb.Error{NotLeader: &errorpb.NotLeader{RegionId: 1}}} - result = ec.HandleErrorPb(&backuppb.Error{Detail: regionError}, 123, true) + result = ec.HandleIgnorableError(&backuppb.Error{Detail: regionError}, 123) require.Equal(t, utils.ErrorResult{utils.RetryStrategy, "retry outside because the error can be ignored"}, result) // Test case 5: Error is RegionError and cannot be ignored regionError = &backuppb.Error_RegionError{ RegionError: &errorpb.Error{DiskFull: &errorpb.DiskFull{}}} - result = ec.HandleErrorPb(&backuppb.Error{Detail: regionError}, 123, false) + result = ec.HandleError(&backuppb.Error{Detail: regionError}, 123) require.Equal(t, utils.ErrorResult{utils.GiveUpStrategy, "unknown kv error"}, result) // Test case 6: Error is ClusterIdError clusterIdError := &backuppb.Error_ClusterIdError{} - result = ec.HandleErrorPb(&backuppb.Error{Detail: clusterIdError}, 123, false) + result = ec.HandleError(&backuppb.Error{Detail: clusterIdError}, 123) require.Equal(t, utils.ErrorResult{utils.GiveUpStrategy, "cluster ID mismatch"}, result) } From c2df58e1fc34b4e45040aa393a0a3a8c91ac243c Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 6 Dec 2023 18:29:46 +0800 Subject: [PATCH 11/15] fix bazel --- br/pkg/utils/BUILD.bazel | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 092bd33e33366..16f36b6249a5c 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -93,7 +93,7 @@ go_test( ], embed = [":utils"], flaky = True, - shard_count = 34, + shard_count = 37, deps = [ "//br/pkg/errors", "//br/pkg/metautil", @@ -114,6 +114,7 @@ go_test( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/encryptionpb", + "@com_github_pingcap_kvproto//pkg/errorpb", "@com_github_pingcap_kvproto//pkg/import_sstpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_stretchr_testify//require", From c8b880e891d14deb2583867438a61a0b8f047639 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 6 Dec 2023 19:05:24 +0800 Subject: [PATCH 12/15] fix lint --- br/pkg/utils/retry_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/utils/retry_test.go b/br/pkg/utils/retry_test.go index 26aa20f4d1d02..eeaef51ab8dfd 100644 --- a/br/pkg/utils/retry_test.go +++ b/br/pkg/utils/retry_test.go @@ -114,8 +114,8 @@ func TestHandleErrorMsg(t *testing.T) { require.Equal(t, expectedResult, actualResult) // Test retry too many times - actualResult = ec.HandleErrorMsg(msg, uuid) - actualResult = ec.HandleErrorMsg(msg, uuid) + _ = ec.HandleErrorMsg(msg, uuid) + _ = ec.HandleErrorMsg(msg, uuid) expectedResult = utils.ErrorResult{utils.GiveUpStrategy, "unknown error and retry too many times, give up"} actualResult = ec.HandleErrorMsg(msg, uuid) require.Equal(t, expectedResult, actualResult) From 7b495dfeb633218d8766f1430eac36114f8baf24 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 6 Dec 2023 19:21:34 +0800 Subject: [PATCH 13/15] fix lint --- br/pkg/backup/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index 53f0fa7c37ae0..cb144a1856857 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -243,7 +243,7 @@ func TestOnBackupRegionErrorResponse(t *testing.T) { } for _, cs := range cases { t.Log(cs) - _, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp) + _, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp, nil) require.Equal(t, cs.exceptedBackoffMs, backoffMs) if cs.exceptedErr { require.Error(t, err) From dd62063c7633aad6e0d9b200a7f283dcb0c625f6 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 7 Dec 2023 10:52:42 +0800 Subject: [PATCH 14/15] fix test --- br/pkg/utils/retry.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 48de42eb02415..235abe599e4c0 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -123,6 +123,10 @@ func (ec *ErrorContext) HandleErrorMsg(msg string, uuid uint64) ErrorResult { uuid) return ErrorResult{GiveUpStrategy, reason} } + msgLower := strings.ToLower(msg) + if strings.Contains(msgLower, "context canceled") { + return ErrorResult{GiveUpStrategy, "context canceled, give up"} + } if MessageIsRetryableStorageError(msg) { logger.Warn("occur storage error", zap.String("error", msg)) From 7d10267e16f4604e609e566a9c0e2578719778c9 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 7 Dec 2023 11:15:23 +0800 Subject: [PATCH 15/15] address comment --- br/pkg/backup/client_test.go | 19 ++++++++++--------- br/pkg/utils/retry.go | 4 ++-- errors.toml | 10 ++++++++++ 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index cb144a1856857..d0c61022779dd 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/tablecodec" @@ -230,20 +231,20 @@ func TestOnBackupRegionErrorResponse(t *testing.T) { } cases := []Case{ - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 1000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 1000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 3000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 3000, exceptedErr: false}, {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{KeyNotInRegion: &errorpb.KeyNotInRegion{}}), exceptedBackoffMs: 0, exceptedErr: true}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 1000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 1000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 3000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 3000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false}, {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RaftEntryTooLarge: &errorpb.RaftEntryTooLarge{}}), exceptedBackoffMs: 0, exceptedErr: true}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 1000, exceptedErr: false}, - {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 1000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 3000, exceptedErr: false}, + {storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 3000, exceptedErr: false}, } for _, cs := range cases { t.Log(cs) - _, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp, nil) + _, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp, utils.NewErrorContext("test", 1)) require.Equal(t, cs.exceptedBackoffMs, backoffMs) if cs.exceptedErr { require.Error(t, err) diff --git a/br/pkg/utils/retry.go b/br/pkg/utils/retry.go index 235abe599e4c0..cac3312b5948f 100644 --- a/br/pkg/utils/retry.go +++ b/br/pkg/utils/retry.go @@ -133,8 +133,8 @@ func (ec *ErrorContext) HandleErrorMsg(msg string, uuid uint64) ErrorResult { return ErrorResult{RetryStrategy, "retrable error"} } // retry enough on same store - mu.Lock() - defer mu.Unlock() + ec.mu.Lock() + defer ec.mu.Unlock() ec.encounterTimes[uuid]++ if ec.encounterTimes[uuid] <= ec.encounterTimesLimitation { return ErrorResult{RetryStrategy, "unknown error, retry it for few times"} diff --git a/errors.toml b/errors.toml index 65b32c380192c..d7b701ef5620e 100644 --- a/errors.toml +++ b/errors.toml @@ -16,11 +16,21 @@ error = ''' backup range invalid ''' +["BR:Backup:ErrBackupKeyIsLocked"] +error = ''' +backup key is locked +''' + ["BR:Backup:ErrBackupNoLeader"] error = ''' backup no leader ''' +["BR:Backup:ErrBackupRegion"] +error = ''' +backup region error +''' + ["BR:Common:ErrEnvNotSpecified"] error = ''' environment variable not found