diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index e947d84cbe5c4..219ea0d304aa8 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -93,6 +93,7 @@ go_test( ], embed = [":task"], flaky = True, + shard_count = 21, deps = [ "//br/pkg/conn", "//br/pkg/errors", @@ -105,6 +106,7 @@ go_test( "//parser/model", "//statistics/handle", "//tablecodec", + "//util/table-filter", "@com_github_golang_protobuf//proto", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/brpb", diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index d6d0dea32146a..eb1b172c52257 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/util/mathutil" "github.com/spf13/pflag" "github.com/tikv/client-go/v2/oracle" + "go.uber.org/multierr" "go.uber.org/zap" ) @@ -711,6 +712,21 @@ func ParseTSString(ts string, tzCheck bool) (uint64, error) { return oracle.GoTimeToTS(t1), nil } +func DefaultBackupConfig() BackupConfig { + fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) + DefineCommonFlags(fs) + DefineBackupFlags(fs) + cfg := BackupConfig{} + err := multierr.Combine( + cfg.ParseFromFlags(fs), + cfg.Config.ParseFromFlags(fs), + ) + if err != nil { + log.Panic("infallible operation failed.", zap.Error(err)) + } + return cfg +} + func parseCompressionType(s string) (backuppb.CompressionType, error) { var ct backuppb.CompressionType switch s { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 26b96d4414318..0708119f5de08 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -334,6 +334,16 @@ func HiddenFlagsForStream(flags *pflag.FlagSet) { storage.HiddenFlagsForStream(flags) } +func DefaultConfig() Config { + fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) + DefineCommonFlags(fs) + cfg := Config{} + if err := cfg.ParseFromFlags(fs); err != nil { + log.Panic("infallible operation failed.", zap.Error(err)) + } + return cfg +} + // DefineDatabaseFlags defines the required --db flag for `db` subcommand. func DefineDatabaseFlags(command *cobra.Command) { command.Flags().String(flagDatabase, "", "database name") diff --git a/br/pkg/task/common_test.go b/br/pkg/task/common_test.go index b124f6977b9fa..858c441f18d05 100644 --- a/br/pkg/task/common_test.go +++ b/br/pkg/task/common_test.go @@ -9,7 +9,10 @@ import ( backup "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" + filter "github.com/pingcap/tidb/util/table-filter" "github.com/spf13/pflag" "github.com/stretchr/testify/require" ) @@ -157,3 +160,79 @@ func TestCheckCipherKey(t *testing.T) { } } } + +func must[T any](t T, err error) T { + if err != nil { + panic(err) + } + return t +} + +func expectedDefaultConfig() Config { + return Config{ + BackendOptions: storage.BackendOptions{S3: storage.S3BackendOptions{ForcePathStyle: true}}, + PD: []string{"127.0.0.1:2379"}, + ChecksumConcurrency: 4, + Checksum: true, + SendCreds: true, + CheckRequirements: true, + FilterStr: []string(nil), + TableFilter: filter.CaseInsensitive(must(filter.Parse([]string{"*.*"}))), + Schemas: map[string]struct{}{}, + Tables: map[string]struct{}{}, + SwitchModeInterval: 300000000000, + GRPCKeepaliveTime: 10000000000, + GRPCKeepaliveTimeout: 3000000000, + CipherInfo: backup.CipherInfo{CipherType: 1}, + MetadataDownloadBatchSize: 0x80, + } +} + +func expectedDefaultBackupConfig() BackupConfig { + return BackupConfig{ + Config: expectedDefaultConfig(), + GCTTL: utils.DefaultBRGCSafePointTTL, + CompressionConfig: CompressionConfig{ + CompressionType: backup.CompressionType_ZSTD, + }, + IgnoreStats: true, + UseBackupMetaV2: true, + UseCheckpoint: true, + } +} + +func expectedDefaultRestoreConfig() RestoreConfig { + defaultConfig := expectedDefaultConfig() + defaultConfig.Concurrency = defaultRestoreConcurrency + return RestoreConfig{ + Config: defaultConfig, + RestoreCommonConfig: RestoreCommonConfig{Online: false, + MergeSmallRegionSizeBytes: 0x6000000, + MergeSmallRegionKeyCount: 0xea600, + WithSysTable: false, + ResetSysUsers: []string{"cloud_admin", "root"}}, + NoSchema: false, + PDConcurrency: 0x1, + BatchFlushInterval: 16000000000, + DdlBatchSize: 0x80, + WithPlacementPolicy: "STRICT", + } +} + +func TestDefault(t *testing.T) { + def := DefaultConfig() + defaultConfig := expectedDefaultConfig() + require.Equal(t, defaultConfig, def) +} + +func TestDefaultBackup(t *testing.T) { + def := DefaultBackupConfig() + defaultConfig := expectedDefaultBackupConfig() + require.Equal(t, defaultConfig, def) +} + +func TestDefaultRestore(t *testing.T) { + def := DefaultRestoreConfig() + defaultConfig := expectedDefaultRestoreConfig() + require.Equal(t, defaultConfig, def) +} diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 7a1a043e7c682..c3054fd32c2a9 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -497,6 +497,23 @@ func IsStreamRestore(cmdName string) bool { return cmdName == PointRestoreCmd } +func DefaultRestoreConfig() RestoreConfig { + fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) + DefineCommonFlags(fs) + DefineRestoreFlags(fs) + cfg := RestoreConfig{} + err := multierr.Combine( + cfg.ParseFromFlags(fs), + cfg.RestoreCommonConfig.ParseFromFlags(fs), + cfg.Config.ParseFromFlags(fs), + ) + if err != nil { + log.Panic("infallible failed.", zap.Error(err)) + } + + return cfg +} + // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { if err := checkTaskExists(c, cfg); err != nil { diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index bee442fef7bc5..459ce00020bc5 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -206,11 +206,9 @@ go_library( "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", - "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/coprocessor", "@com_github_pingcap_kvproto//pkg/deadlock", "@com_github_pingcap_kvproto//pkg/diagnosticspb", - "@com_github_pingcap_kvproto//pkg/encryptionpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/tikvpb", diff --git a/executor/brie.go b/executor/brie.go index 1d48d98864e43..0f32cb7c668dc 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -23,8 +23,6 @@ import ( "time" "github.com/pingcap/errors" - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/kvproto/pkg/encryptionpb" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/task" @@ -219,21 +217,15 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) } tidbCfg := config.GetGlobalConfig() - cfg := task.Config{ - TLS: task.TLSConfig{ - CA: tidbCfg.Security.ClusterSSLCA, - Cert: tidbCfg.Security.ClusterSSLCert, - Key: tidbCfg.Security.ClusterSSLKey, - }, - PD: strings.Split(tidbCfg.Path, ","), - Concurrency: 4, - Checksum: true, - SendCreds: true, - LogProgress: true, - CipherInfo: backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - }, + tlsCfg := task.TLSConfig{ + CA: tidbCfg.Security.ClusterSSLCA, + Cert: tidbCfg.Security.ClusterSSLCert, + Key: tidbCfg.Security.ClusterSSLKey, } + pds := strings.Split(tidbCfg.Path, ",") + cfg := task.DefaultConfig() + cfg.PD = pds + cfg.TLS = tlsCfg storageURL, err := storage.ParseRawURL(s.Storage) if err != nil { @@ -301,7 +293,9 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) switch s.Kind { case ast.BRIEKindBackup: - e.backupCfg = &task.BackupConfig{Config: cfg} + bcfg := task.DefaultBackupConfig() + bcfg.Config = cfg + e.backupCfg = &bcfg for _, opt := range s.Options { switch opt.Tp { @@ -329,7 +323,9 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) } case ast.BRIEKindRestore: - e.restoreCfg = &task.RestoreConfig{Config: cfg} + rcfg := task.DefaultRestoreConfig() + rcfg.Config = cfg + e.restoreCfg = &rcfg for _, opt := range s.Options { switch opt.Tp { case ast.BRIEOptionOnline: