diff --git a/pkg/logutil/logging.go b/pkg/logutil/logging.go index 1bd5467d3..7e803ba64 100644 --- a/pkg/logutil/logging.go +++ b/pkg/logutil/logging.go @@ -11,6 +11,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -186,3 +187,21 @@ func Keys(keys [][]byte) zap.Field { func ShortError(err error) zap.Field { return zap.String("error", err.Error()) } + +var loggerToTerm, _, _ = log.InitLogger(new(log.Config), zap.AddCallerSkip(1)) + +// WarnTerm put a log both to terminal and to the log file. +func WarnTerm(message string, fields ...zap.Field) { + log.Warn(message, fields...) + if loggerToTerm != nil { + loggerToTerm.Warn(message, fields...) + } +} + +// RedactAny constructs a redacted field that carries an interface{}. +func RedactAny(fieldKey string, key interface{}) zap.Field { + if redact.NeedRedact() { + return zap.String(fieldKey, "?") + } + return zap.Any(fieldKey, key) +} diff --git a/pkg/task/backup.go b/pkg/task/backup.go index 59f41f93e..abaa68e4c 100644 --- a/pkg/task/backup.go +++ b/pkg/task/backup.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/docker/go-units" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/backup" @@ -24,6 +25,7 @@ import ( "github.com/pingcap/br/pkg/backup" berrors "github.com/pingcap/br/pkg/errors" "github.com/pingcap/br/pkg/glue" + "github.com/pingcap/br/pkg/logutil" "github.com/pingcap/br/pkg/storage" "github.com/pingcap/br/pkg/summary" "github.com/pingcap/br/pkg/utils" @@ -164,12 +166,27 @@ func parseCompressionFlags(flags *pflag.FlagSet) (*CompressionConfig, error) { // so that both binary and TiDB will use same default value. func (cfg *BackupConfig) adjustBackupConfig() { cfg.adjust() + usingDefaultConcurrency := false if cfg.Config.Concurrency == 0 { cfg.Config.Concurrency = defaultBackupConcurrency + usingDefaultConcurrency = true } if cfg.Config.Concurrency > maxBackupConcurrency { cfg.Config.Concurrency = maxBackupConcurrency } + if cfg.RateLimit != unlimited { + // TiKV limits the upload rate by each backup request. + // When the backup requests are sent concurrently, + // the ratelimit couldn't work as intended. + // Degenerating to sequentially sending backup requests to avoid this. + if !usingDefaultConcurrency { + logutil.WarnTerm("setting `--ratelimit` and `--concurrency` at the same time, "+ + "ignoring `--concurrency`: `--ratelimit` forces sequential (i.e. concurrency = 1) backup", + zap.String("ratelimit", units.HumanSize(float64(cfg.RateLimit))+"/s"), + zap.Uint32("concurrency-specified", cfg.Config.Concurrency)) + } + cfg.Config.Concurrency = 1 + } if cfg.GCTTL == 0 { cfg.GCTTL = utils.DefaultBRGCSafePointTTL diff --git a/pkg/task/common.go b/pkg/task/common.go index 0a6959688..cbf99ab7c 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -69,6 +69,8 @@ const ( defaultSwitchInterval = 5 * time.Minute defaultGRPCKeepaliveTime = 10 * time.Second defaultGRPCKeepaliveTimeout = 3 * time.Second + + unlimited = 0 ) // TLSConfig is the common configuration for TLS connection. @@ -146,7 +148,7 @@ func DefineCommonFlags(flags *pflag.FlagSet) { flags.Uint(flagChecksumConcurrency, variable.DefChecksumTableConcurrency, "The concurrency of table checksumming") _ = flags.MarkHidden(flagChecksumConcurrency) - flags.Uint64(flagRateLimit, 0, "The rate limit of the task, MB/s per node") + flags.Uint64(flagRateLimit, unlimited, "The rate limit of the task, MB/s per node") flags.Bool(flagChecksum, true, "Run checksum at end of task") flags.Bool(flagRemoveTiFlash, true, "Remove TiFlash replicas before backup or restore, for unsupported versions of TiFlash")