Skip to content

Commit

Permalink
backup: fix retry of fine-grained backup (pingcap#43252) (pingcap#43312)
Browse files Browse the repository at this point in the history
* cherry-pick 1a94e5d

Signed-off-by: hillium <yujuncen@pingcap.com>

* run gofmt

Signed-off-by: hillium <yujuncen@pingcap.com>

* fix a typo

Signed-off-by: hillium <yujuncen@pingcap.com>

---------

Signed-off-by: hillium <yujuncen@pingcap.com>
  • Loading branch information
YuJuncen authored Apr 23, 2023
1 parent 5d2030e commit c90896b
Show file tree
Hide file tree
Showing 131 changed files with 1,249 additions and 948 deletions.
38 changes: 14 additions & 24 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ type Checksum struct {
// ProgressUnit represents the unit of progress.
type ProgressUnit string

// Maximum total sleep time(in ms) for kv/cop commands.
const (
backupFineGrainedMaxBackoff = 80000
// backupFineGrainedMaxBackoff is 1 hour.
// given it begins the fine-grained backup, there must be some problems in the cluster.
// We need to be more patient.
backupFineGrainedMaxBackoff = 3600000
backupRetryTimes = 5
// RangeUnit represents the progress updated counter when a range finished.
RangeUnit ProgressUnit = "range"
Expand Down Expand Up @@ -664,20 +666,20 @@ func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
if err != nil || region == nil {
log.Error("find leader failed", zap.Error(err), zap.Reflect("region", region))
logutil.CL(ctx).Error("find leader failed", zap.Error(err), zap.Reflect("region", region))
time.Sleep(time.Millisecond * time.Duration(100*i))
continue
}
if region.Leader != nil {
log.Info("find leader",
logutil.CL(ctx).Info("find leader",
zap.Reflect("Leader", region.Leader), logutil.Key("key", key))
return region.Leader, nil
}
log.Warn("no region found", logutil.Key("key", key))
logutil.CL(ctx).Warn("no region found", logutil.Key("key", key))
time.Sleep(time.Millisecond * time.Duration(100*i))
continue
}
log.Error("can not find leader", logutil.Key("key", key))
logutil.CL(ctx).Error("can not find leader", logutil.Key("key", key))
return nil, errors.Annotatef(berrors.ErrBackupNoLeader, "can not find leader")
}

Expand Down Expand Up @@ -708,7 +710,7 @@ func (bc *Client) fineGrainedBackup(
}
})

bo := tikv.NewBackoffer(ctx, backupFineGrainedMaxBackoff)
bo := utils.AdaptTiKVBackoffer(ctx, backupFineGrainedMaxBackoff, berrors.ErrUnknown)
for {
// Step1, check whether there is any incomplete range
incomplete := rangeTree.GetIncompleteRange(req.StartKey, req.EndKey)
Expand All @@ -721,14 +723,10 @@ func (bc *Client) fineGrainedBackup(
errCh := make(chan error, 4)
retry := make(chan rtree.Range, 4)

max := &struct {
ms int
mu sync.Mutex
}{}
wg := new(sync.WaitGroup)
for i := 0; i < 4; i++ {
wg.Add(1)
fork, _ := bo.Fork()
fork, _ := bo.Inner().Fork()
go func(boFork *tikv.Backoffer) {
defer wg.Done()
for rg := range retry {
Expand All @@ -740,11 +738,7 @@ func (bc *Client) fineGrainedBackup(
return
}
if backoffMs != 0 {
max.mu.Lock()
if max.ms < backoffMs {
max.ms = backoffMs
}
max.mu.Unlock()
bo.RequestBackOff(backoffMs)
}
}
}(fork)
Expand Down Expand Up @@ -789,15 +783,11 @@ func (bc *Client) fineGrainedBackup(
}

// Step3. Backoff if needed, then repeat.
max.mu.Lock()
ms := max.ms
max.mu.Unlock()
if ms != 0 {
if ms := bo.NextSleepInMS(); ms != 0 {
log.Info("handle fine grained", zap.Int("backoffMs", ms))
// TODO: fill a meaningful error.
err := bo.BackoffWithMaxSleepTxnLockFast(ms, berrors.ErrUnknown)
err := bo.BackOff()
if err != nil {
return errors.Trace(err)
return errors.Annotatef(err, "at fine-grained backup, remained ranges = %d", rangeTree.Len())
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions br/pkg/glue/console_glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,10 @@ func (t *Table) maxKeyLen() int {

// Print prints the table.
// The format would be like:
// Key1: <Value>
// Other: <Value>
//
// Key1: <Value>
// Other: <Value>
//
// LongKey: <Value>
// The format may change if the terminal size is small.
func (t *Table) Print() {
Expand Down Expand Up @@ -267,7 +269,9 @@ func (ps PrettyString) slicePointOf(s int) (realSlicePoint, endAt int) {
// It is the abstraction of some subarea of the terminal,
// you might imagine it as a panel in the tmux, but with infinity height.
// For example, printing a frame with the width of 10 chars, and 4 chars offset left, would be like:
// v~~~~~~~~~~v Here is the "width of a frame".
//
// v~~~~~~~~~~v Here is the "width of a frame".
//
// +--+----------+--+
// | Hello, wor |
// | ld. |
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type DupKVStream interface {

// LocalDupKVStream implements the interface of DupKVStream.
// It collects duplicate key-value pairs from a pebble.DB.
//
//goland:noinspection GoNameStartsWithPackageName
type LocalDupKVStream struct {
iter Iter
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ func (b noopBackend) CheckRequirements(context.Context, *backend.CheckCtx) error
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
// TablesFromMeta to succeed:
// - Name
// - State (must be model.StatePublic)
// - ID
// - Columns
// * Name
// * State (must be model.StatePublic)
// * Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
// - Name
// - State (must be model.StatePublic)
// - ID
// - Columns
// - Name
// - State (must be model.StatePublic)
// - Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
func (b noopBackend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return nil, nil
}
Expand Down
13 changes: 8 additions & 5 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ type stmtTask struct {
}

// WriteBatchRowsToDB write rows in batch mode, which will insert multiple rows like this:
// insert into t1 values (111), (222), (333), (444);
//
// insert into t1 values (111), (222), (333), (444);
func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, columnNames []string, r kv.Rows) error {
rows := r.(tidbRows)
insertStmt := be.checkAndBuildStmt(rows, tableName, columnNames)
Expand Down Expand Up @@ -500,10 +501,12 @@ func (be *tidbBackend) checkAndBuildStmt(rows tidbRows, tableName string, column
}

// WriteRowsToDB write rows in row-by-row mode, which will insert multiple rows like this:
// insert into t1 values (111);
// insert into t1 values (222);
// insert into t1 values (333);
// insert into t1 values (444);
//
// insert into t1 values (111);
// insert into t1 values (222);
// insert into t1 values (333);
// insert into t1 values (444);
//
// See more details in br#1366: https://github.com/pingcap/br/issues/1366
func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, columnNames []string, r kv.Rows) error {
rows := r.(tidbRows)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func TestWriteRowsErrorOnDup(t *testing.T) {
}

// TODO: temporarily disable this test before we fix strict mode
//
//nolint:unused,deadcode
func testStrictMode(t *testing.T) {
s := createMysqlSuite(t)
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,12 @@ func (l *Lightning) goServe(statusAddr string, realAddrWriter io.Writer) error {
}

// RunOnce is used by binary lightning and host when using lightning as a library.
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. and glue could be nil to let lightning
// use a default glue later.
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and glue could be a
// caller implemented glue.
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. and glue could be nil to let lightning
// use a default glue later.
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and glue could be a
// caller implemented glue.
//
// deprecated: use RunOnceWithOptions instead.
func (l *Lightning) RunOnce(taskCtx context.Context, taskCfg *config.Config, glue glue.Glue) error {
if err := taskCfg.Adjust(taskCtx); err != nil {
Expand Down Expand Up @@ -278,10 +279,10 @@ func (l *Lightning) RunServer() error {
}

// RunOnceWithOptions is used by binary lightning and host when using lightning as a library.
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. No need to set Options
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may
// be used:
// - for binary lightning, taskCtx could be context.Background which means taskCtx wouldn't be canceled directly by its
// cancel function, but only by Lightning.Stop or HTTP DELETE using l.cancel. No need to set Options
// - for lightning as a library, taskCtx could be a meaningful context that get canceled outside, and there Options may
// be used:
// - WithGlue: set a caller implemented glue. Otherwise, lightning will use a default glue later.
// - WithDumpFileStorage: caller has opened an external storage for lightning. Otherwise, lightning will open a
// storage by config
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (m *MDTableMeta) GetSchema(ctx context.Context, store storage.ExternalStora
}

/*
Mydumper File Loader
Mydumper File Loader
*/
type MDLoader struct {
store storage.ExternalStorage
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/parser_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion br/pkg/lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
// }

/*
TODO : test with specified 'regionBlockSize' ...
TODO : test with specified 'regionBlockSize' ...
*/
func TestTableRegion(t *testing.T) {
cfg := newConfigWithSourceDir("./examples")
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,10 +754,10 @@ func (rc *Controller) SchemaIsValid(ctx context.Context, tableInfo *mydump.MDTab
}

// checkCSVHeader try to check whether the csv header config is consistent with the source csv files by:
// 1. pick one table with two CSV files and a unique/primary key
// 2. read the first row of those two CSV files
// 3. checks if the content of those first rows are compatible with the table schema, and whether the
// two rows are identical, to determine if the first rows are a header rows.
// 1. pick one table with two CSV files and a unique/primary key
// 2. read the first row of those two CSV files
// 3. checks if the content of those first rows are compatible with the table schema, and whether the
// two rows are identical, to determine if the first rows are a header rows.
func (rc *Controller) checkCSVHeader(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta) error {
// if cfg set header = ture but source files actually contain not header, former SchemaCheck should
// return error in this situation, so we need do it again.
Expand Down
1 change: 0 additions & 1 deletion br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
return errors.Trace(err)
}

//
func (db *DB) restoreSequence(ctx context.Context, table *metautil.Table) error {
var restoreMetaSQL string
var err error
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,9 @@ func TestScatterFinishInTime(t *testing.T) {
// range: [aaa, aae), [aae, aaz), [ccd, ccf), [ccf, ccj)
// rewrite rules: aa -> xx, cc -> bb
// expected regions after split:
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
//
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func TestSplitAndScatter(t *testing.T) {
t.Run("BatchScatter", func(t *testing.T) {
client := initTestClient()
Expand Down Expand Up @@ -448,8 +449,9 @@ func initRewriteRules() *restore.RewriteRules {
}

// expected regions after split:
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
//
// [, aay), [aay, bba), [bba, bbf), [bbf, bbh), [bbh, bbj),
// [bbj, cca), [cca, xxe), [xxe, xxz), [xxz, )
func validateRegions(regions map[uint64]*restore.RegionInfo) bool {
keys := [...]string{"", "aay", "bba", "bbf", "bbh", "bbj", "cca", "xxe", "xxz", ""}
if len(regions) != len(keys)-1 {
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ func (ms *StreamMetadataSet) iterateDataFiles(f func(d *backuppb.DataFileInfo) (
}

// IterateFilesFullyBefore runs the function over all files contain data before the timestamp only.
// 0 before
// |------------------------------------------|
// |-file1---------------| <- File contains records in this TS range would be found.
// |-file2--------------| <- File contains any record out of this won't be found.
//
// 0 before
// |------------------------------------------|
// |-file1---------------| <- File contains records in this TS range would be found.
// |-file2--------------| <- File contains any record out of this won't be found.
//
// This function would call the `f` over file1 only.
func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *backuppb.DataFileInfo) (shouldBreak bool)) {
ms.iterateDataFiles(func(d *backuppb.DataFileInfo) (shouldBreak bool) {
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/stream/stream_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ var logCountSumRe = regexp.MustCompile(`tikv_stream_handle_kv_batch_sum ([0-9]+)

// MaybeQPS get a number like the QPS of last seconds for each store via the prometheus interface.
// TODO: this is a temporary solution(aha, like in a Hackthon),
// we MUST find a better way for providing this information.
//
// we MUST find a better way for providing this information.
func MaybeQPS(ctx context.Context, mgr *conn.Mgr) (float64, error) {
c := mgr.GetPDClient()
prefix := "http://"
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (s *streamMgr) backupFullSchemas(ctx context.Context, g glue.Glue) error {
return nil
}

// RunStreamCommand run all kinds of `stream task``
// RunStreamCommand run all kinds of `stream task
func RunStreamCommand(
ctx context.Context,
g glue.Glue,
Expand Down
Loading

0 comments on commit c90896b

Please sign in to comment.