Skip to content

Commit

Permalink
br: replace GetTS with GetTSWithRetry (#38948)
Browse files Browse the repository at this point in the history
ref #36910
  • Loading branch information
MoCuishle28 authored Nov 22, 2022
1 parent f4fa414 commit 74aa891
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 15 deletions.
4 changes: 2 additions & 2 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,7 @@ func (rc *Client) updateMetaAndLoadStats(ctx context.Context, input <-chan *Crea
}

// Not need to return err when failed because of update analysis-meta
restoreTS, err := rc.GetTS(ctx)
restoreTS, err := rc.GetTSWithRetry(ctx)
if err != nil {
log.Error("getTS failed", zap.Error(err))
} else {
Expand Down Expand Up @@ -2399,7 +2399,7 @@ func (rc *Client) RunGCRowsLoader(ctx context.Context) {
func (rc *Client) InsertGCRows(ctx context.Context) error {
close(rc.deleteRangeQueryCh)
rc.deleteRangeQueryWaitGroup.Wait()
ts, err := rc.GetTS(ctx)
ts, err := rc.GetTSWithRetry(ctx)
if err != nil {
return errors.Trace(err)
}
Expand Down
21 changes: 10 additions & 11 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,18 +332,17 @@ type fakePDClient struct {
pd.Client
stores []*metapb.Store

notLeader bool
notLeader bool
retryTimes *int
}

var retryTimes int

func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]*metapb.Store, error) {
return append([]*metapb.Store{}, fpdc.stores...), nil
}

func (fpdc fakePDClient) GetTS(ctx context.Context) (int64, int64, error) {
retryTimes++
if retryTimes >= 3 { // the mock PD leader switched successfully
(*fpdc.retryTimes)++
if *fpdc.retryTimes >= 3 { // the mock PD leader switched successfully
fpdc.notLeader = false
}

Expand All @@ -355,24 +354,24 @@ func (fpdc fakePDClient) GetTS(ctx context.Context) (int64, int64, error) {

func TestGetTSWithRetry(t *testing.T) {
t.Run("PD leader is healthy:", func(t *testing.T) {
retryTimes = -1000
pDClient := fakePDClient{notLeader: false}
retryTimes := -1000
pDClient := fakePDClient{notLeader: false, retryTimes: &retryTimes}
client := restore.NewRestoreClient(pDClient, nil, defaultKeepaliveCfg, false)
_, err := client.GetTSWithRetry(context.Background())
require.NoError(t, err)
})

t.Run("PD leader failure:", func(t *testing.T) {
retryTimes = -1000
pDClient := fakePDClient{notLeader: true}
retryTimes := -1000
pDClient := fakePDClient{notLeader: true, retryTimes: &retryTimes}
client := restore.NewRestoreClient(pDClient, nil, defaultKeepaliveCfg, false)
_, err := client.GetTSWithRetry(context.Background())
require.Error(t, err)
})

t.Run("PD leader switch successfully", func(t *testing.T) {
retryTimes = 0
pDClient := fakePDClient{notLeader: true}
retryTimes := 0
pDClient := fakePDClient{notLeader: true, retryTimes: &retryTimes}
client := restore.NewRestoreClient(pDClient, nil, defaultKeepaliveCfg, false)
_, err := client.GetTSWithRetry(context.Background())
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
g.Record(summary.RestoreDataSize, archiveSize)
//restore from tidb will fetch a general Size issue https://github.com/pingcap/tidb/issues/27247
g.Record("Size", archiveSize)
restoreTS, err := client.GetTS(ctx)
restoreTS, err := client.GetTSWithRetry(ctx)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,7 @@ func restoreStream(
}
defer client.Close()

currentTS, err := client.GetTS(ctx)
currentTS, err := client.GetTSWithRetry(ctx)
if err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit 74aa891

Please sign in to comment.