From bf7ce3dbf210bd5af7c6622a33cbf7e324a976b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Wed, 8 Nov 2023 21:12:42 +0800 Subject: [PATCH 1/5] This is an automated cherry-pick of #48025 Signed-off-by: ti-chi-bot --- br/pkg/task/BUILD.bazel | 12 ++++++ br/pkg/task/backup.go | 20 +++++++++ br/pkg/task/common.go | 10 +++++ br/pkg/task/common_test.go | 84 ++++++++++++++++++++++++++++++++++++++ br/pkg/task/restore.go | 42 +++++++++++++++++++ executor/brie.go | 30 +++++++------- 6 files changed, 182 insertions(+), 16 deletions(-) diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index e947d84cbe5c4..9b8854549c934 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -93,6 +93,10 @@ go_test( ], embed = [":task"], flaky = True, +<<<<<<< HEAD +======= + shard_count = 21, +>>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025)) deps = [ "//br/pkg/conn", "//br/pkg/errors", @@ -101,10 +105,18 @@ go_test( "//br/pkg/storage", "//br/pkg/stream", "//br/pkg/utils", +<<<<<<< HEAD "//config", "//parser/model", "//statistics/handle", "//tablecodec", +======= + "//pkg/config", + "//pkg/parser/model", + "//pkg/statistics/handle/util", + "//pkg/tablecodec", + "//pkg/util/table-filter", +>>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025)) "@com_github_golang_protobuf//proto", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/brpb", diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index d6d0dea32146a..1ef3d401d6f37 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -36,6 +36,11 @@ import ( "github.com/pingcap/tidb/util/mathutil" "github.com/spf13/pflag" "github.com/tikv/client-go/v2/oracle" +<<<<<<< HEAD +======= + kvutil "github.com/tikv/client-go/v2/util" + "go.uber.org/multierr" +>>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025)) "go.uber.org/zap" ) @@ -711,6 +716,21 @@ func ParseTSString(ts string, tzCheck bool) (uint64, error) { return oracle.GoTimeToTS(t1), nil } +func DefaultBackupConfig() BackupConfig { + fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) + DefineCommonFlags(fs) + DefineBackupFlags(fs) + cfg := BackupConfig{} + err := multierr.Combine( + cfg.ParseFromFlags(fs), + cfg.Config.ParseFromFlags(fs), + ) + if err != nil { + log.Panic("infallible operation failed.", zap.Error(err)) + } + return cfg +} + func parseCompressionType(s string) (backuppb.CompressionType, error) { var ct backuppb.CompressionType switch s { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 26b96d4414318..0708119f5de08 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -334,6 +334,16 @@ func HiddenFlagsForStream(flags *pflag.FlagSet) { storage.HiddenFlagsForStream(flags) } +func DefaultConfig() Config { + fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) + DefineCommonFlags(fs) + cfg := Config{} + if err := cfg.ParseFromFlags(fs); err != nil { + log.Panic("infallible operation failed.", zap.Error(err)) + } + return cfg +} + // DefineDatabaseFlags defines the required --db flag for `db` subcommand. func DefineDatabaseFlags(command *cobra.Command) { command.Flags().String(flagDatabase, "", "database name") diff --git a/br/pkg/task/common_test.go b/br/pkg/task/common_test.go index b124f6977b9fa..be16f87a3aae4 100644 --- a/br/pkg/task/common_test.go +++ b/br/pkg/task/common_test.go @@ -9,7 +9,14 @@ import ( backup "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" +<<<<<<< HEAD "github.com/pingcap/tidb/config" +======= + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/pkg/config" + filter "github.com/pingcap/tidb/pkg/util/table-filter" +>>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025)) "github.com/spf13/pflag" "github.com/stretchr/testify/require" ) @@ -157,3 +164,80 @@ func TestCheckCipherKey(t *testing.T) { } } } + +func must[T any](t T, err error) T { + if err != nil { + panic(err) + } + return t +} + +func expectedDefaultConfig() Config { + return Config{ + BackendOptions: storage.BackendOptions{S3: storage.S3BackendOptions{ForcePathStyle: true}}, + PD: []string{"127.0.0.1:2379"}, + ChecksumConcurrency: 4, + Checksum: true, + SendCreds: true, + CheckRequirements: true, + FilterStr: []string(nil), + TableFilter: filter.CaseInsensitive(must(filter.Parse([]string{"*.*"}))), + Schemas: map[string]struct{}{}, + Tables: map[string]struct{}{}, + SwitchModeInterval: 300000000000, + GRPCKeepaliveTime: 10000000000, + GRPCKeepaliveTimeout: 3000000000, + CipherInfo: backup.CipherInfo{CipherType: 1}, + MetadataDownloadBatchSize: 0x80, + } +} + +func expectedDefaultBackupConfig() BackupConfig { + return BackupConfig{ + Config: expectedDefaultConfig(), + GCTTL: utils.DefaultBRGCSafePointTTL, + CompressionConfig: CompressionConfig{ + CompressionType: backup.CompressionType_ZSTD, + }, + IgnoreStats: true, + UseBackupMetaV2: true, + UseCheckpoint: true, + } +} + +func expectedDefaultRestoreConfig() RestoreConfig { + defaultConfig := expectedDefaultConfig() + defaultConfig.Concurrency = defaultRestoreConcurrency + return RestoreConfig{ + Config: defaultConfig, + RestoreCommonConfig: RestoreCommonConfig{Online: false, + MergeSmallRegionSizeBytes: 0x6000000, + MergeSmallRegionKeyCount: 0xea600, + WithSysTable: false, + ResetSysUsers: []string{"cloud_admin", "root"}}, + NoSchema: false, + PDConcurrency: 0x1, + BatchFlushInterval: 16000000000, + DdlBatchSize: 0x80, + WithPlacementPolicy: "STRICT", + UseCheckpoint: true, + } +} + +func TestDefault(t *testing.T) { + def := DefaultConfig() + defaultConfig := expectedDefaultConfig() + require.Equal(t, defaultConfig, def) +} + +func TestDefaultBackup(t *testing.T) { + def := DefaultBackupConfig() + defaultConfig := expectedDefaultBackupConfig() + require.Equal(t, defaultConfig, def) +} + +func TestDefaultRestore(t *testing.T) { + def := DefaultRestoreConfig() + defaultConfig := expectedDefaultRestoreConfig() + require.Equal(t, defaultConfig, def) +} diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 7a1a043e7c682..dcc4d4cfc50da 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -497,6 +497,48 @@ func IsStreamRestore(cmdName string) bool { return cmdName == PointRestoreCmd } +<<<<<<< HEAD +======= +func registerTaskToPD(ctx context.Context, etcdCLI *clientv3.Client) (closeF func(context.Context) error, err error) { + register := utils.NewTaskRegister(etcdCLI, utils.RegisterRestore, fmt.Sprintf("restore-%s", uuid.New())) + err = register.RegisterTask(ctx) + return register.Close, errors.Trace(err) +} + +func removeCheckpointDataForSnapshotRestore(ctx context.Context, storageName string, taskName string, config *Config) error { + _, s, err := GetStorage(ctx, storageName, config) + if err != nil { + return errors.Trace(err) + } + return errors.Trace(checkpoint.RemoveCheckpointDataForRestore(ctx, s, taskName)) +} + +func removeCheckpointDataForLogRestore(ctx context.Context, storageName string, taskName string, clusterID uint64, config *Config) error { + _, s, err := GetStorage(ctx, storageName, config) + if err != nil { + return errors.Trace(err) + } + return errors.Trace(checkpoint.RemoveCheckpointDataForLogRestore(ctx, s, taskName, clusterID)) +} + +func DefaultRestoreConfig() RestoreConfig { + fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) + DefineCommonFlags(fs) + DefineRestoreFlags(fs) + cfg := RestoreConfig{} + err := multierr.Combine( + cfg.ParseFromFlags(fs), + cfg.RestoreCommonConfig.ParseFromFlags(fs), + cfg.Config.ParseFromFlags(fs), + ) + if err != nil { + log.Panic("infallible failed.", zap.Error(err)) + } + + return cfg +} + +>>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025)) // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { if err := checkTaskExists(c, cfg); err != nil { diff --git a/executor/brie.go b/executor/brie.go index 1d48d98864e43..f5ba3e6b7a3e4 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -219,21 +219,15 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) } tidbCfg := config.GetGlobalConfig() - cfg := task.Config{ - TLS: task.TLSConfig{ - CA: tidbCfg.Security.ClusterSSLCA, - Cert: tidbCfg.Security.ClusterSSLCert, - Key: tidbCfg.Security.ClusterSSLKey, - }, - PD: strings.Split(tidbCfg.Path, ","), - Concurrency: 4, - Checksum: true, - SendCreds: true, - LogProgress: true, - CipherInfo: backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - }, + tlsCfg := task.TLSConfig{ + CA: tidbCfg.Security.ClusterSSLCA, + Cert: tidbCfg.Security.ClusterSSLCert, + Key: tidbCfg.Security.ClusterSSLKey, } + pds := strings.Split(tidbCfg.Path, ",") + cfg := task.DefaultConfig() + cfg.PD = pds + cfg.TLS = tlsCfg storageURL, err := storage.ParseRawURL(s.Storage) if err != nil { @@ -301,7 +295,9 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) switch s.Kind { case ast.BRIEKindBackup: - e.backupCfg = &task.BackupConfig{Config: cfg} + bcfg := task.DefaultBackupConfig() + bcfg.Config = cfg + e.backupCfg = &bcfg for _, opt := range s.Options { switch opt.Tp { @@ -329,7 +325,9 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) } case ast.BRIEKindRestore: - e.restoreCfg = &task.RestoreConfig{Config: cfg} + rcfg := task.DefaultRestoreConfig() + rcfg.Config = cfg + e.restoreCfg = &rcfg for _, opt := range s.Options { switch opt.Tp { case ast.BRIEOptionOnline: From 885d822d6944dbf7b9c50d4dc37b5680392fa187 Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 23 Nov 2023 17:53:43 +0800 Subject: [PATCH 2/5] resolve conflicts Signed-off-by: hillium --- br/pkg/task/BUILD.bazel | 13 +------------ br/pkg/task/backup.go | 4 ---- br/pkg/task/common_test.go | 8 ++------ br/pkg/task/restore.go | 25 ------------------------- 4 files changed, 3 insertions(+), 47 deletions(-) diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index 9b8854549c934..f5f03627f39ba 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -67,6 +67,7 @@ go_library( "@com_github_spf13_pflag//:pflag", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", "@com_google_cloud_go_storage//:storage", "@io_etcd_go_etcd_client_pkg_v3//transport", @@ -93,10 +94,7 @@ go_test( ], embed = [":task"], flaky = True, -<<<<<<< HEAD -======= shard_count = 21, ->>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025)) deps = [ "//br/pkg/conn", "//br/pkg/errors", @@ -105,18 +103,9 @@ go_test( "//br/pkg/storage", "//br/pkg/stream", "//br/pkg/utils", -<<<<<<< HEAD - "//config", "//parser/model", "//statistics/handle", "//tablecodec", -======= - "//pkg/config", - "//pkg/parser/model", - "//pkg/statistics/handle/util", - "//pkg/tablecodec", - "//pkg/util/table-filter", ->>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025)) "@com_github_golang_protobuf//proto", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/brpb", diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 1ef3d401d6f37..eb1b172c52257 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -36,11 +36,7 @@ import ( "github.com/pingcap/tidb/util/mathutil" "github.com/spf13/pflag" "github.com/tikv/client-go/v2/oracle" -<<<<<<< HEAD -======= - kvutil "github.com/tikv/client-go/v2/util" "go.uber.org/multierr" ->>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025)) "go.uber.org/zap" ) diff --git a/br/pkg/task/common_test.go b/br/pkg/task/common_test.go index be16f87a3aae4..858644dc22807 100644 --- a/br/pkg/task/common_test.go +++ b/br/pkg/task/common_test.go @@ -9,14 +9,10 @@ import ( backup "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" -<<<<<<< HEAD - "github.com/pingcap/tidb/config" -======= "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/utils" - "github.com/pingcap/tidb/pkg/config" - filter "github.com/pingcap/tidb/pkg/util/table-filter" ->>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025)) + "github.com/pingcap/tidb/config" + filter "github.com/pingcap/tidb/util/table-filter" "github.com/spf13/pflag" "github.com/stretchr/testify/require" ) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index dcc4d4cfc50da..c3054fd32c2a9 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -497,30 +497,6 @@ func IsStreamRestore(cmdName string) bool { return cmdName == PointRestoreCmd } -<<<<<<< HEAD -======= -func registerTaskToPD(ctx context.Context, etcdCLI *clientv3.Client) (closeF func(context.Context) error, err error) { - register := utils.NewTaskRegister(etcdCLI, utils.RegisterRestore, fmt.Sprintf("restore-%s", uuid.New())) - err = register.RegisterTask(ctx) - return register.Close, errors.Trace(err) -} - -func removeCheckpointDataForSnapshotRestore(ctx context.Context, storageName string, taskName string, config *Config) error { - _, s, err := GetStorage(ctx, storageName, config) - if err != nil { - return errors.Trace(err) - } - return errors.Trace(checkpoint.RemoveCheckpointDataForRestore(ctx, s, taskName)) -} - -func removeCheckpointDataForLogRestore(ctx context.Context, storageName string, taskName string, clusterID uint64, config *Config) error { - _, s, err := GetStorage(ctx, storageName, config) - if err != nil { - return errors.Trace(err) - } - return errors.Trace(checkpoint.RemoveCheckpointDataForLogRestore(ctx, s, taskName, clusterID)) -} - func DefaultRestoreConfig() RestoreConfig { fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) DefineCommonFlags(fs) @@ -538,7 +514,6 @@ func DefaultRestoreConfig() RestoreConfig { return cfg } ->>>>>>> 632cd843b0e (executor/brie: use the default value from flags (#48025)) // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { if err := checkTaskExists(c, cfg); err != nil { From 203c8b4a68446c075f1f0856d203840674d7747f Mon Sep 17 00:00:00 2001 From: hillium Date: Thu, 23 Nov 2023 19:16:41 +0800 Subject: [PATCH 3/5] fix build Signed-off-by: hillium --- executor/brie.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/executor/brie.go b/executor/brie.go index f5ba3e6b7a3e4..0f32cb7c668dc 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -23,8 +23,6 @@ import ( "time" "github.com/pingcap/errors" - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/kvproto/pkg/encryptionpb" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/task" From a0969c57a346b7bbe14462f8ebca199eda192943 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 24 Nov 2023 11:03:41 +0800 Subject: [PATCH 4/5] make bazel_prepare Signed-off-by: hillium --- br/pkg/task/BUILD.bazel | 3 ++- executor/BUILD.bazel | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/br/pkg/task/BUILD.bazel b/br/pkg/task/BUILD.bazel index f5f03627f39ba..219ea0d304aa8 100644 --- a/br/pkg/task/BUILD.bazel +++ b/br/pkg/task/BUILD.bazel @@ -67,7 +67,6 @@ go_library( "@com_github_spf13_pflag//:pflag", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//oracle", - "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", "@com_google_cloud_go_storage//:storage", "@io_etcd_go_etcd_client_pkg_v3//transport", @@ -103,9 +102,11 @@ go_test( "//br/pkg/storage", "//br/pkg/stream", "//br/pkg/utils", + "//config", "//parser/model", "//statistics/handle", "//tablecodec", + "//util/table-filter", "@com_github_golang_protobuf//proto", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/brpb", diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index bee442fef7bc5..459ce00020bc5 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -206,11 +206,9 @@ go_library( "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", - "@com_github_pingcap_kvproto//pkg/brpb", "@com_github_pingcap_kvproto//pkg/coprocessor", "@com_github_pingcap_kvproto//pkg/deadlock", "@com_github_pingcap_kvproto//pkg/diagnosticspb", - "@com_github_pingcap_kvproto//pkg/encryptionpb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/tikvpb", From 90df52bd7e5207374fc0375fc0bfe9f20bfb6571 Mon Sep 17 00:00:00 2001 From: hillium Date: Fri, 24 Nov 2023 11:59:14 +0800 Subject: [PATCH 5/5] fix build Signed-off-by: hillium --- br/pkg/task/common_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/br/pkg/task/common_test.go b/br/pkg/task/common_test.go index 858644dc22807..858c441f18d05 100644 --- a/br/pkg/task/common_test.go +++ b/br/pkg/task/common_test.go @@ -216,7 +216,6 @@ func expectedDefaultRestoreConfig() RestoreConfig { BatchFlushInterval: 16000000000, DdlBatchSize: 0x80, WithPlacementPolicy: "STRICT", - UseCheckpoint: true, } }