From 4c09f2e57d29414c7d9651dfd6485cc6564e97e3 Mon Sep 17 00:00:00 2001 From: MoCuishle28 <32541204+MoCuishle28@users.noreply.github.com> Date: Tue, 22 Nov 2022 19:19:58 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #39173 Signed-off-by: ti-chi-bot --- br/pkg/gluetidb/glue.go | 133 ++++ br/pkg/restore/BUILD.bazel | 181 ++++++ br/pkg/restore/client.go | 100 +++ br/pkg/restore/client_test.go | 1118 +++++++++++++++++++++++++++++++++ br/pkg/task/backup.go | 4 +- br/pkg/task/common.go | 4 + br/pkg/task/restore.go | 9 +- br/pkg/utils/db.go | 87 +++ 8 files changed, 1633 insertions(+), 3 deletions(-) create mode 100644 br/pkg/restore/BUILD.bazel diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index 05709049b360a..997747c296d8f 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -298,3 +298,136 @@ func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) { func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string { return executor.ConstructResultOfShowCreatePlacementPolicy(policy) } +<<<<<<< HEAD +======= + +// mockSession is used for test. +type mockSession struct { + se session.Session + globalVars map[string]string +} + +// GetSessionCtx implements glue.Glue +func (s *mockSession) GetSessionCtx() sessionctx.Context { + return s.se +} + +// Execute implements glue.Session. +func (s *mockSession) Execute(ctx context.Context, sql string) error { + return s.ExecuteInternal(ctx, sql) +} + +func (s *mockSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error { + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR) + rs, err := s.se.ExecuteInternal(ctx, sql, args...) + if err != nil { + return err + } + // Some of SQLs (like ADMIN RECOVER INDEX) may lazily take effect + // when we polling the result set. + // At least call `next` once for triggering theirs side effect. + // (Maybe we'd better drain all returned rows?) + if rs != nil { + //nolint: errcheck + defer rs.Close() + c := rs.NewChunk(nil) + if err := rs.Next(ctx, c); err != nil { + return nil + } + } + return nil +} + +// CreateDatabase implements glue.Session. +func (s *mockSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// CreatePlacementPolicy implements glue.Session. +func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// CreateTables implements glue.BatchCreateTableSession. +func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// CreateTable implements glue.Session. +func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// Close implements glue.Session. +func (s *mockSession) Close() { + s.se.Close() +} + +// GetGlobalVariables implements glue.Session. +func (s *mockSession) GetGlobalVariable(name string) (string, error) { + if ret, ok := s.globalVars[name]; ok { + return ret, nil + } + return "True", nil +} + +// MockGlue only used for test +type MockGlue struct { + se session.Session + GlobalVars map[string]string +} + +func (m *MockGlue) SetSession(se session.Session) { + m.se = se +} + +// GetDomain implements glue.Glue. +func (*MockGlue) GetDomain(store kv.Storage) (*domain.Domain, error) { + return nil, nil +} + +// CreateSession implements glue.Glue. +func (m *MockGlue) CreateSession(store kv.Storage) (glue.Session, error) { + glueSession := &mockSession{ + se: m.se, + globalVars: m.GlobalVars, + } + return glueSession, nil +} + +// Open implements glue.Glue. +func (*MockGlue) Open(path string, option pd.SecurityOption) (kv.Storage, error) { + return nil, nil +} + +// OwnsStorage implements glue.Glue. +func (*MockGlue) OwnsStorage() bool { + return true +} + +// StartProgress implements glue.Glue. +func (*MockGlue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { + return nil +} + +// Record implements glue.Glue. +func (*MockGlue) Record(name string, value uint64) { +} + +// GetVersion implements glue.Glue. +func (*MockGlue) GetVersion() string { + return "mock glue" +} + +// UseOneShotSession implements glue.Glue. +func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error { + glueSession := &mockSession{ + se: m.se, + } + return fn(glueSession) +} +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel new file mode 100644 index 0000000000000..77c2fc2976570 --- /dev/null +++ b/br/pkg/restore/BUILD.bazel @@ -0,0 +1,181 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "restore", + srcs = [ + "batcher.go", + "client.go", + "data.go", + "db.go", + "import.go", + "import_retry.go", + "log_client.go", + "merge.go", + "pipeline_items.go", + "range.go", + "rawkv_client.go", + "search.go", + "split.go", + "stream_metas.go", + "systable_restore.go", + "util.go", + ], + importpath = "github.com/pingcap/tidb/br/pkg/restore", + visibility = ["//visibility:public"], + deps = [ + "//br/pkg/backup", + "//br/pkg/checksum", + "//br/pkg/common", + "//br/pkg/conn", + "//br/pkg/conn/util", + "//br/pkg/errors", + "//br/pkg/glue", + "//br/pkg/logutil", + "//br/pkg/metautil", + "//br/pkg/pdutil", + "//br/pkg/redact", + "//br/pkg/restore/prealloc_table_id", + "//br/pkg/restore/split", + "//br/pkg/restore/tiflashrec", + "//br/pkg/rtree", + "//br/pkg/storage", + "//br/pkg/stream", + "//br/pkg/summary", + "//br/pkg/utils", + "//br/pkg/utils/iter", + "//br/pkg/version", + "//config", + "//ddl", + "//ddl/util", + "//domain", + "//kv", + "//meta", + "//parser/model", + "//parser/mysql", + "//sessionctx/variable", + "//statistics/handle", + "//store/pdtypes", + "//tablecodec", + "//util", + "//util/codec", + "//util/collate", + "//util/hack", + "//util/mathutil", + "//util/table-filter", + "@com_github_emirpasic_gods//maps/treemap", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_google_uuid//:uuid", + "@com_github_opentracing_opentracing_go//:opentracing-go", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/kvrpcpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_kvproto//pkg/pdpb", + "@com_github_pingcap_kvproto//pkg/recoverdatapb", + "@com_github_pingcap_log//:log", + "@com_github_tikv_client_go_v2//config", + "@com_github_tikv_client_go_v2//kv", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//rawkv", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//txnkv/rangetask", + "@com_github_tikv_pd_client//:client", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//backoff", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//credentials", + "@org_golang_google_grpc//keepalive", + "@org_golang_google_grpc//status", + "@org_golang_x_exp//slices", + "@org_golang_x_sync//errgroup", + "@org_uber_go_multierr//:multierr", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", + ], +) + +go_test( + name = "restore_test", + timeout = "short", + srcs = [ + "batcher_test.go", + "client_test.go", + "data_test.go", + "db_test.go", + "import_retry_test.go", + "log_client_test.go", + "main_test.go", + "merge_fuzz_test.go", + "merge_test.go", + "range_test.go", + "rawkv_client_test.go", + "search_test.go", + "split_test.go", + "stream_metas_test.go", + "util_test.go", + ], + embed = [":restore"], + flaky = True, + race = "on", + shard_count = 20, + deps = [ + "//br/pkg/backup", + "//br/pkg/conn", + "//br/pkg/errors", + "//br/pkg/glue", + "//br/pkg/gluetidb", + "//br/pkg/logutil", + "//br/pkg/metautil", + "//br/pkg/mock", + "//br/pkg/pdutil", + "//br/pkg/restore/split", + "//br/pkg/restore/tiflashrec", + "//br/pkg/rtree", + "//br/pkg/storage", + "//br/pkg/stream", + "//br/pkg/utils", + "//br/pkg/utils/iter", + "//infoschema", + "//kv", + "//meta/autoid", + "//parser/model", + "//parser/mysql", + "//parser/types", + "//sessionctx/stmtctx", + "//store/pdtypes", + "//tablecodec", + "//testkit", + "//testkit/testsetup", + "//types", + "//util/codec", + "@com_github_fsouza_fake_gcs_server//fakestorage", + "@com_github_golang_protobuf//proto", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/encryptionpb", + "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/import_sstpb", + "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_kvproto//pkg/pdpb", + "@com_github_pingcap_kvproto//pkg/recoverdatapb", + "@com_github_pingcap_log//:log", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", + "@com_github_tikv_client_go_v2//rawkv", + "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_pd_client//:client", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//keepalive", + "@org_golang_google_grpc//status", + "@org_golang_x_exp//slices", + "@org_uber_go_goleak//:goleak", + "@org_uber_go_multierr//:multierr", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", + ], +) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 7a2c04285c09f..2c535974956ad 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/tidb/store/pdtypes" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/mathutil" filter "github.com/pingcap/tidb/util/table-filter" "github.com/tikv/client-go/v2/oracle" @@ -2016,3 +2017,102 @@ func (rc *Client) SaveSchemas( } return nil } +<<<<<<< HEAD +======= + +// InitFullClusterRestore init fullClusterRestore and set SkipGrantTable as needed +func (rc *Client) InitFullClusterRestore(explicitFilter bool) { + rc.fullClusterRestore = !explicitFilter && rc.IsFull() + + log.Info("full cluster restore", zap.Bool("value", rc.fullClusterRestore)) + + if rc.fullClusterRestore { + // have to skip grant table, in order to NotifyUpdatePrivilege + config.GetGlobalConfig().Security.SkipGrantTable = true + } +} + +func (rc *Client) IsFullClusterRestore() bool { + return rc.fullClusterRestore +} + +func (rc *Client) SetWithSysTable(withSysTable bool) { + rc.withSysTable = withSysTable +} + +// MockClient create a fake client used to test. +func MockClient(dbs map[string]*utils.Database) *Client { + return &Client{databases: dbs} +} + +// TidyOldSchemas produces schemas information. +func TidyOldSchemas(sr *stream.SchemasReplace) *backup.Schemas { + var schemaIsEmpty bool + schemas := backup.NewBackupSchemas() + + for _, dr := range sr.DbMap { + if dr.OldDBInfo == nil { + continue + } + + schemaIsEmpty = true + for _, tr := range dr.TableMap { + if tr.OldTableInfo == nil { + continue + } + schemas.AddSchema(dr.OldDBInfo, tr.OldTableInfo) + schemaIsEmpty = false + } + + // backup this empty schema if it has nothing table. + if schemaIsEmpty { + schemas.AddSchema(dr.OldDBInfo, nil) + } + } + return schemas +} + +func CheckNewCollationEnable( + backupNewCollationEnable string, + g glue.Glue, + storage kv.Storage, + CheckRequirements bool, +) error { + if backupNewCollationEnable == "" { + if CheckRequirements { + return errors.Annotatef(berrors.ErrUnknown, + "the config 'new_collations_enabled_on_first_bootstrap' not found in backupmeta. "+ + "you can use \"show config WHERE name='new_collations_enabled_on_first_bootstrap';\" to manually check the config. "+ + "if you ensure the config 'new_collations_enabled_on_first_bootstrap' in backup cluster is as same as restore cluster, "+ + "use --check-requirements=false to skip this check") + } + log.Warn("the config 'new_collations_enabled_on_first_bootstrap' is not in backupmeta") + return nil + } + + se, err := g.CreateSession(storage) + if err != nil { + return errors.Trace(err) + } + + newCollationEnable, err := se.GetGlobalVariable(utils.GetTidbNewCollationEnabled()) + if err != nil { + return errors.Trace(err) + } + + if !strings.EqualFold(backupNewCollationEnable, newCollationEnable) { + return errors.Annotatef(berrors.ErrUnknown, + "the config 'new_collations_enabled_on_first_bootstrap' not match, upstream:%v, downstream: %v", + backupNewCollationEnable, newCollationEnable) + } + + // collate.newCollationEnabled is set to 1 when the collate package is initialized, + // so we need to modify this value according to the config of the cluster + // before using the collate package. + enabled := newCollationEnable == "True" + // modify collate.newCollationEnabled according to the config of the cluster + collate.SetNewCollationEnabledForTest(enabled) + log.Info("set new_collation_enabled", zap.Bool("new_collation_enabled", enabled)) + return nil +} +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index 334ac67387f5d..76787c8b0028d 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -240,3 +240,1121 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) { require.Nil(t, tables[i].Info.TiFlashReplica) } } +<<<<<<< HEAD +======= + +// Mock ImporterClient interface +type FakeImporterClient struct { + restore.ImporterClient +} + +// Record the stores that have communicated +type RecordStores struct { + mu sync.Mutex + stores []uint64 +} + +func NewRecordStores() RecordStores { + return RecordStores{stores: make([]uint64, 0)} +} + +func (r *RecordStores) put(id uint64) { + r.mu.Lock() + defer r.mu.Unlock() + r.stores = append(r.stores, id) +} + +func (r *RecordStores) sort() { + r.mu.Lock() + defer r.mu.Unlock() + slices.Sort(r.stores) +} + +func (r *RecordStores) len() int { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.stores) +} + +func (r *RecordStores) get(i int) uint64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.stores[i] +} + +func (r *RecordStores) toString() string { + r.mu.Lock() + defer r.mu.Unlock() + return fmt.Sprintf("%v", r.stores) +} + +var recordStores RecordStores + +const ( + SET_SPEED_LIMIT_ERROR = 999999 + WORKING_TIME = 100 +) + +func (fakeImportCli FakeImporterClient) SetDownloadSpeedLimit( + ctx context.Context, + storeID uint64, + req *import_sstpb.SetDownloadSpeedLimitRequest, +) (*import_sstpb.SetDownloadSpeedLimitResponse, error) { + if storeID == SET_SPEED_LIMIT_ERROR { + return nil, fmt.Errorf("storeID:%v ERROR", storeID) + } + + time.Sleep(WORKING_TIME * time.Millisecond) // simulate doing 100 ms work + recordStores.put(storeID) + return nil, nil +} + +func TestSetSpeedLimit(t *testing.T) { + mockStores := []*metapb.Store{ + {Id: 1}, + {Id: 2}, + {Id: 3}, + {Id: 4}, + {Id: 5}, + {Id: 6}, + {Id: 7}, + {Id: 8}, + {Id: 9}, + {Id: 10}, + } + + // 1. The cost of concurrent communication is expected to be less than the cost of serial communication. + client := restore.NewRestoreClient(fakePDClient{ + stores: mockStores, + }, nil, defaultKeepaliveCfg, false) + ctx := context.Background() + + recordStores = NewRecordStores() + start := time.Now() + err := restore.MockCallSetSpeedLimit(ctx, FakeImporterClient{}, client, 10) + cost := time.Since(start) + require.NoError(t, err) + + recordStores.sort() + t.Logf("Total Cost: %v\n", cost) + t.Logf("Has Communicated: %v\n", recordStores.toString()) + + serialCost := len(mockStores) * WORKING_TIME + require.Less(t, cost, time.Duration(serialCost)*time.Millisecond) + require.Equal(t, len(mockStores), recordStores.len()) + for i := 0; i < recordStores.len(); i++ { + require.Equal(t, mockStores[i].Id, recordStores.get(i)) + } + + // 2. Expect the number of communicated stores to be less than the length of the mockStore + // Because subsequent unstarted communications are aborted when an error is encountered. + recordStores = NewRecordStores() + mockStores[5].Id = SET_SPEED_LIMIT_ERROR // setting a fault store + client = restore.NewRestoreClient(fakePDClient{ + stores: mockStores, + }, nil, defaultKeepaliveCfg, false) + + // Concurrency needs to be less than the number of stores + err = restore.MockCallSetSpeedLimit(ctx, FakeImporterClient{}, client, 2) + require.Error(t, err) + t.Log(err) + + recordStores.sort() + sort.Slice(mockStores, func(i, j int) bool { return mockStores[i].Id < mockStores[j].Id }) + t.Logf("Has Communicated: %v\n", recordStores.toString()) + require.Less(t, recordStores.len(), len(mockStores)) + for i := 0; i < recordStores.len(); i++ { + require.Equal(t, mockStores[i].Id, recordStores.get(i)) + } +} + +func TestDeleteRangeQuery(t *testing.T) { + ctx := context.Background() + m := mc + mockStores := []*metapb.Store{ + { + Id: 1, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + { + Id: 2, + Labels: []*metapb.StoreLabel{ + { + Key: "engine", + Value: "tiflash", + }, + }, + }, + } + + g := gluetidb.New() + client := restore.NewRestoreClient(fakePDClient{ + stores: mockStores, + }, nil, defaultKeepaliveCfg, false) + err := client.Init(g, m.Storage) + require.NoError(t, err) + + client.RunGCRowsLoader(ctx) + + client.InsertDeleteRangeForTable(2, []int64{3}) + client.InsertDeleteRangeForTable(4, []int64{5, 6}) + + elementID := int64(1) + client.InsertDeleteRangeForIndex(7, &elementID, 8, []int64{1}) + client.InsertDeleteRangeForIndex(9, &elementID, 10, []int64{1, 2}) + + querys := client.GetGCRows() + require.Equal(t, querys[0], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (2, 1, '748000000000000003', '748000000000000004', %[1]d)") + require.Equal(t, querys[1], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (4, 1, '748000000000000005', '748000000000000006', %[1]d),(4, 2, '748000000000000006', '748000000000000007', %[1]d)") + require.Equal(t, querys[2], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (7, 1, '7480000000000000085f698000000000000001', '7480000000000000085f698000000000000002', %[1]d)") + require.Equal(t, querys[3], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (9, 2, '74800000000000000a5f698000000000000001', '74800000000000000a5f698000000000000002', %[1]d),(9, 3, '74800000000000000a5f698000000000000002', '74800000000000000a5f698000000000000003', %[1]d)") +} + +func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { + files := []*backuppb.DataFileInfo{} + batchCount := 0 + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files, + files, + nil, + nil, + nil, + func( + ctx context.Context, + defaultFiles []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + batchCount++ + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 0) +} + +func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { + files := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + }, + } + batchCount := 0 + result := make(map[int][]*backuppb.DataFileInfo) + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + files, + nil, + nil, + nil, + nil, + func( + ctx context.Context, + fs []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + if len(fs) > 0 { + result[batchCount] = fs + batchCount++ + } + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, batchCount, 1) + require.Equal(t, len(result), 1) + require.Equal(t, result[0], files) +} + +func TestRestoreMetaKVFilesWithBatchMethod3(t *testing.T) { + defaultFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 140, + MaxTs: 150, + }, + { + Path: "f5", + MinTs: 150, + MaxTs: 160, + }, + } + writeFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 135, + MaxTs: 150, + }, + { + Path: "f5", + MinTs: 150, + MaxTs: 160, + }, + } + + batchCount := 0 + result := make(map[int][]*backuppb.DataFileInfo) + resultKV := make(map[int]int) + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + defaultFiles, + writeFiles, + nil, + nil, + nil, + func( + ctx context.Context, + fs []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + result[batchCount] = fs + t.Log(filterTS) + resultKV[batchCount] = len(entries) + batchCount++ + return make([]*restore.KvEntryWithTS, batchCount), nil + }, + ) + require.Nil(t, err) + require.Equal(t, len(result), 4) + require.Equal(t, result[0], defaultFiles[0:3]) + require.Equal(t, resultKV[0], 0) + require.Equal(t, result[1], writeFiles[0:4]) + require.Equal(t, resultKV[1], 0) + require.Equal(t, result[2], defaultFiles[3:]) + require.Equal(t, resultKV[2], 1) + require.Equal(t, result[3], writeFiles[4:]) + require.Equal(t, resultKV[3], 2) +} + +func TestRestoreMetaKVFilesWithBatchMethod4(t *testing.T) { + defaultFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 110, + MaxTs: 150, + }, + } + + writeFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 110, + MaxTs: 150, + }, + } + batchCount := 0 + result := make(map[int][]*backuppb.DataFileInfo) + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + defaultFiles, + writeFiles, + nil, + nil, + nil, + func( + ctx context.Context, + fs []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + result[batchCount] = fs + batchCount++ + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, len(result), 4) + require.Equal(t, result[0], defaultFiles[0:2]) + require.Equal(t, result[1], writeFiles[0:2]) + require.Equal(t, result[2], defaultFiles[2:]) + require.Equal(t, result[3], writeFiles[2:]) +} + +func TestRestoreMetaKVFilesWithBatchMethod5(t *testing.T) { + defaultFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 110, + MaxTs: 150, + }, + } + + writeFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 100, + }, + { + Path: "f3", + MinTs: 100, + MaxTs: 130, + }, + { + Path: "f4", + MinTs: 100, + MaxTs: 150, + }, + } + batchCount := 0 + result := make(map[int][]*backuppb.DataFileInfo) + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + defaultFiles, + writeFiles, + nil, + nil, + nil, + func( + ctx context.Context, + fs []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + result[batchCount] = fs + batchCount++ + return nil, nil + }, + ) + require.Nil(t, err) + require.Equal(t, len(result), 4) + require.Equal(t, result[0], defaultFiles[0:2]) + require.Equal(t, result[1], writeFiles[0:]) + require.Equal(t, result[2], defaultFiles[2:]) + require.Equal(t, len(result[3]), 0) +} + +func TestRestoreMetaKVFilesWithBatchMethod6(t *testing.T) { + defaultFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + Length: 1, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 120, + Length: restore.MetaKVBatchSize, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 130, + Length: 1, + }, + { + Path: "f4", + MinTs: 140, + MaxTs: 150, + Length: 1, + }, + { + Path: "f5", + MinTs: 150, + MaxTs: 160, + Length: 1, + }, + } + + writeFiles := []*backuppb.DataFileInfo{ + { + Path: "f1", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 120, + }, + { + Path: "f3", + MinTs: 110, + MaxTs: 140, + }, + { + Path: "f4", + MinTs: 120, + MaxTs: 150, + }, + { + Path: "f5", + MinTs: 140, + MaxTs: 160, + }, + } + + batchCount := 0 + result := make(map[int][]*backuppb.DataFileInfo) + resultKV := make(map[int]int) + + client := restore.MockClient(nil) + err := client.RestoreMetaKVFilesWithBatchMethod( + context.Background(), + defaultFiles, + writeFiles, + nil, + nil, + nil, + func( + ctx context.Context, + fs []*backuppb.DataFileInfo, + schemasReplace *stream.SchemasReplace, + entries []*restore.KvEntryWithTS, + filterTS uint64, + updateStats func(kvCount uint64, size uint64), + progressInc func(), + cf string, + ) ([]*restore.KvEntryWithTS, error) { + result[batchCount] = fs + t.Log(filterTS) + resultKV[batchCount] = len(entries) + batchCount++ + return make([]*restore.KvEntryWithTS, batchCount), nil + }, + ) + require.Nil(t, err) + require.Equal(t, len(result), 6) + require.Equal(t, result[0], defaultFiles[0:2]) + require.Equal(t, resultKV[0], 0) + require.Equal(t, result[1], writeFiles[0:2]) + require.Equal(t, resultKV[1], 0) + require.Equal(t, result[2], defaultFiles[2:3]) + require.Equal(t, resultKV[2], 1) + require.Equal(t, result[3], writeFiles[2:4]) + require.Equal(t, resultKV[3], 2) + require.Equal(t, result[4], defaultFiles[3:]) + require.Equal(t, resultKV[4], 3) + require.Equal(t, result[5], writeFiles[4:]) + require.Equal(t, resultKV[5], 4) +} + +func TestSortMetaKVFiles(t *testing.T) { + files := []*backuppb.DataFileInfo{ + { + Path: "f5", + MinTs: 110, + MaxTs: 150, + ResolvedTs: 120, + }, + { + Path: "f1", + MinTs: 100, + MaxTs: 100, + ResolvedTs: 80, + }, + { + Path: "f2", + MinTs: 100, + MaxTs: 100, + ResolvedTs: 90, + }, + { + Path: "f4", + MinTs: 110, + MaxTs: 130, + ResolvedTs: 120, + }, + { + Path: "f3", + MinTs: 105, + MaxTs: 130, + ResolvedTs: 100, + }, + } + + files = restore.SortMetaKVFiles(files) + require.Equal(t, len(files), 5) + require.Equal(t, files[0].Path, "f1") + require.Equal(t, files[1].Path, "f2") + require.Equal(t, files[2].Path, "f3") + require.Equal(t, files[3].Path, "f4") + require.Equal(t, files[4].Path, "f5") +} + +func TestApplyKVFilesWithSingelMethod(t *testing.T) { + var ( + totalKVCount int64 = 0 + totalSize uint64 = 0 + logs = make([]string, 0) + ) + ds := []*backuppb.DataFileInfo{ + { + Path: "log3", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Delete, + }, + { + Path: "log1", + NumberOfEntries: 5, + Length: 100, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + }, { + Path: "log2", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + }, + } + applyFunc := func( + files []*backuppb.DataFileInfo, + kvCount int64, + size uint64, + ) { + totalKVCount += kvCount + totalSize += size + for _, f := range files { + logs = append(logs, f.GetPath()) + } + } + + restore.ApplyKVFilesWithSingelMethod( + context.TODO(), + iter.FromSlice(ds), + applyFunc, + ) + + require.Equal(t, totalKVCount, int64(15)) + require.Equal(t, totalSize, uint64(300)) + require.Equal(t, logs, []string{"log1", "log2", "log3"}) +} + +func TestApplyKVFilesWithBatchMethod1(t *testing.T) { + var ( + runCount = 0 + batchCount int = 3 + batchSize uint64 = 1000 + totalKVCount int64 = 0 + logs = make([][]string, 0) + ) + ds := []*backuppb.DataFileInfo{ + { + Path: "log5", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Delete, + RegionId: 1, + }, { + Path: "log3", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log4", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log1", + NumberOfEntries: 5, + Length: 800, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, + { + Path: "log2", + NumberOfEntries: 5, + Length: 200, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, + } + applyFunc := func( + files []*backuppb.DataFileInfo, + kvCount int64, + size uint64, + ) { + runCount += 1 + totalKVCount += kvCount + log := make([]string, 0, len(files)) + for _, f := range files { + log = append(log, f.GetPath()) + } + logs = append(logs, log) + } + + restore.ApplyKVFilesWithBatchMethod( + context.TODO(), + iter.FromSlice(ds), + batchCount, + batchSize, + applyFunc, + ) + + require.Equal(t, runCount, 3) + require.Equal(t, totalKVCount, int64(25)) + require.Equal(t, + logs, + [][]string{ + {"log1", "log2"}, + {"log3", "log4"}, + {"log5"}, + }, + ) +} + +func TestApplyKVFilesWithBatchMethod2(t *testing.T) { + var ( + runCount = 0 + batchCount int = 2 + batchSize uint64 = 1500 + totalKVCount int64 = 0 + logs = make([][]string, 0) + ) + ds := []*backuppb.DataFileInfo{ + { + Path: "log1", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Delete, + RegionId: 1, + }, { + Path: "log2", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log3", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log4", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log5", + NumberOfEntries: 5, + Length: 800, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, + { + Path: "log6", + NumberOfEntries: 5, + Length: 200, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, + } + applyFunc := func( + files []*backuppb.DataFileInfo, + kvCount int64, + size uint64, + ) { + runCount += 1 + totalKVCount += kvCount + log := make([]string, 0, len(files)) + for _, f := range files { + log = append(log, f.GetPath()) + } + logs = append(logs, log) + } + + restore.ApplyKVFilesWithBatchMethod( + context.TODO(), + iter.FromSlice(ds), + batchCount, + batchSize, + applyFunc, + ) + + require.Equal(t, runCount, 4) + require.Equal(t, totalKVCount, int64(30)) + require.Equal(t, + logs, + [][]string{ + {"log2", "log3"}, + {"log5", "log6"}, + {"log4"}, + {"log1"}, + }, + ) +} + +func TestApplyKVFilesWithBatchMethod3(t *testing.T) { + var ( + runCount = 0 + batchCount int = 2 + batchSize uint64 = 1500 + totalKVCount int64 = 0 + logs = make([][]string, 0) + ) + ds := []*backuppb.DataFileInfo{ + { + Path: "log1", + NumberOfEntries: 5, + Length: 2000, + Cf: stream.WriteCF, + Type: backuppb.FileType_Delete, + RegionId: 1, + }, { + Path: "log2", + NumberOfEntries: 5, + Length: 2000, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log3", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 1, + }, { + Path: "log4", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + RegionId: 2, + }, { + Path: "log5", + NumberOfEntries: 5, + Length: 800, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 3, + }, + { + Path: "log6", + NumberOfEntries: 5, + Length: 200, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + RegionId: 3, + }, + } + applyFunc := func( + files []*backuppb.DataFileInfo, + kvCount int64, + size uint64, + ) { + runCount += 1 + totalKVCount += kvCount + log := make([]string, 0, len(files)) + for _, f := range files { + log = append(log, f.GetPath()) + } + logs = append(logs, log) + } + + restore.ApplyKVFilesWithBatchMethod( + context.TODO(), + iter.FromSlice(ds), + batchCount, + batchSize, + applyFunc, + ) + + require.Equal(t, runCount, 5) + require.Equal(t, totalKVCount, int64(30)) + require.Equal(t, + logs, + [][]string{ + {"log2"}, + {"log5", "log6"}, + {"log3"}, + {"log4"}, + {"log1"}, + }, + ) +} + +func TestApplyKVFilesWithBatchMethod4(t *testing.T) { + var ( + runCount = 0 + batchCount int = 2 + batchSize uint64 = 1500 + totalKVCount int64 = 0 + logs = make([][]string, 0) + ) + ds := []*backuppb.DataFileInfo{ + { + Path: "log1", + NumberOfEntries: 5, + Length: 2000, + Cf: stream.WriteCF, + Type: backuppb.FileType_Delete, + TableId: 1, + }, { + Path: "log2", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + TableId: 1, + }, { + Path: "log3", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + TableId: 2, + }, { + Path: "log4", + NumberOfEntries: 5, + Length: 100, + Cf: stream.WriteCF, + Type: backuppb.FileType_Put, + TableId: 1, + }, { + Path: "log5", + NumberOfEntries: 5, + Length: 100, + Cf: stream.DefaultCF, + Type: backuppb.FileType_Put, + TableId: 2, + }, + } + applyFunc := func( + files []*backuppb.DataFileInfo, + kvCount int64, + size uint64, + ) { + runCount += 1 + totalKVCount += kvCount + log := make([]string, 0, len(files)) + for _, f := range files { + log = append(log, f.GetPath()) + } + logs = append(logs, log) + } + + restore.ApplyKVFilesWithBatchMethod( + context.TODO(), + iter.FromSlice(ds), + batchCount, + batchSize, + applyFunc, + ) + + require.Equal(t, runCount, 4) + require.Equal(t, totalKVCount, int64(25)) + require.Equal(t, + logs, + [][]string{ + {"log2", "log4"}, + {"log5"}, + {"log3"}, + {"log1"}, + }, + ) +} + +func TestCheckNewCollationEnable(t *testing.T) { + caseList := []struct { + backupMeta *backuppb.BackupMeta + newCollationEnableInCluster string + CheckRequirements bool + isErr bool + }{ + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "True"}, + newCollationEnableInCluster: "True", + CheckRequirements: true, + isErr: false, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "True"}, + newCollationEnableInCluster: "False", + CheckRequirements: true, + isErr: true, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "False"}, + newCollationEnableInCluster: "True", + CheckRequirements: true, + isErr: true, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "False"}, + newCollationEnableInCluster: "false", + CheckRequirements: true, + isErr: false, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "False"}, + newCollationEnableInCluster: "True", + CheckRequirements: false, + isErr: true, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: "True"}, + newCollationEnableInCluster: "False", + CheckRequirements: false, + isErr: true, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: ""}, + newCollationEnableInCluster: "True", + CheckRequirements: false, + isErr: false, + }, + { + backupMeta: &backuppb.BackupMeta{NewCollationsEnabled: ""}, + newCollationEnableInCluster: "True", + CheckRequirements: true, + isErr: true, + }, + } + + for i, ca := range caseList { + g := &gluetidb.MockGlue{ + GlobalVars: map[string]string{"new_collation_enabled": ca.newCollationEnableInCluster}, + } + err := restore.CheckNewCollationEnable(ca.backupMeta.GetNewCollationsEnabled(), g, nil, ca.CheckRequirements) + + t.Logf("[%d] Got Error: %v\n", i, err) + if ca.isErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + } +} +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index d089d50b64fe1..1eabae944b46e 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -264,12 +264,12 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig var newCollationEnable string err = g.UseOneShotSession(mgr.GetStorage(), !needDomain, func(se glue.Session) error { - newCollationEnable, err = se.GetGlobalVariable(tidbNewCollationEnabled) + newCollationEnable, err = se.GetGlobalVariable(utils.GetTidbNewCollationEnabled()) if err != nil { return errors.Trace(err) } log.Info("get new_collations_enabled_on_first_bootstrap config from system table", - zap.String(tidbNewCollationEnabled, newCollationEnable)) + zap.String(utils.GetTidbNewCollationEnabled(), newCollationEnable)) return nil }) if err != nil { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 7deada25bd562..f774c60ab91de 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -88,7 +88,11 @@ const ( crypterAES192KeyLen = 24 crypterAES256KeyLen = 32 +<<<<<<< HEAD tidbNewCollationEnabled = "new_collation_enabled" +======= + flagFullBackupType = "type" +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) ) // TLSConfig is the common configuration for TLS connection. diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index b4dd4cd0e67b6..209152c59d26a 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -23,7 +23,11 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/config" +<<<<<<< HEAD "github.com/pingcap/tidb/kv" +======= + "github.com/pingcap/tidb/util" +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) "github.com/pingcap/tidb/util/mathutil" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -330,6 +334,7 @@ func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error { return nil } +<<<<<<< HEAD func CheckNewCollationEnable( backupNewCollationEnable string, g glue.Glue, @@ -367,6 +372,8 @@ func CheckNewCollationEnable( return nil } +======= +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) func isFullRestore(cmdName string) bool { return cmdName == FullRestoreCmd } @@ -439,7 +446,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf return errors.Trace(versionErr) } } - if err = CheckNewCollationEnable(backupMeta.GetNewCollationsEnabled(), g, mgr.GetStorage(), cfg.CheckRequirements); err != nil { + if err = restore.CheckNewCollationEnable(backupMeta.GetNewCollationsEnabled(), g, mgr.GetStorage(), cfg.CheckRequirements); err != nil { return errors.Trace(err) } diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index 346aca6157dbb..dfcdea0c5b462 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -7,6 +7,10 @@ import ( "database/sql" ) +const ( + tidbNewCollationEnabled = "new_collation_enabled" +) + var ( // check sql.DB and sql.Conn implement QueryExecutor and DBExecutor _ DBExecutor = &sql.DB{} @@ -30,3 +34,86 @@ type DBExecutor interface { StmtExecutor BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) } +<<<<<<< HEAD +======= + +// CheckLogBackupEnabled checks if LogBackup is enabled in cluster. +// this mainly used in three places. +// 1. GC worker resolve locks to scan more locks after safepoint. (every minute) +// 2. Add index skipping use lightning.(every add index ddl) +// 3. Telemetry of log backup feature usage (every 6 hours). +// NOTE: this result shouldn't be cached by caller. because it may change every time in one cluster. +func CheckLogBackupEnabled(ctx sessionctx.Context) bool { + executor, ok := ctx.(sqlexec.RestrictedSQLExecutor) + if !ok { + // shouldn't happen + log.Error("[backup] unable to translate executor from sessionctx") + return false + } + enabled, err := IsLogBackupEnabled(executor) + if err != nil { + // if it failed by any reason. we can simply return true this time. + // for GC worker it will scan more locks in one tick. + // for Add index it will skip using lightning this time. + // for Telemetry it will get a false positive usage count. + log.Warn("[backup] check log backup config failed, ignore it", zap.Error(err)) + return true + } + return enabled +} + +// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup. +// we use `sqlexec.RestrictedSQLExecutor` as parameter because it's easy to mock. +// it should return error. +func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { + valStr := "show config where name = 'log-backup.enable' and type = 'tikv'" + internalCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR) + rows, fields, errSQL := ctx.ExecRestrictedSQL(internalCtx, nil, valStr) + if errSQL != nil { + return false, errSQL + } + if len(rows) == 0 { + // no rows mean not support log backup. + return false, nil + } + for _, row := range rows { + d := row.GetDatum(3, &fields[3].Column.FieldType) + value, errField := d.ToString() + if errField != nil { + return false, errField + } + if strings.ToLower(value) == "false" { + return false, nil + } + } + return true, nil +} + +// CheckLogBackupTaskExist increases the count of log backup task. +func LogBackupTaskCountInc() { + LogBackupTaskMutex.Lock() + logBackupTaskCount++ + LogBackupTaskMutex.Unlock() +} + +// CheckLogBackupTaskExist decreases the count of log backup task. +func LogBackupTaskCountDec() { + LogBackupTaskMutex.Lock() + logBackupTaskCount-- + LogBackupTaskMutex.Unlock() +} + +// CheckLogBackupTaskExist checks that whether log-backup is existed. +func CheckLogBackupTaskExist() bool { + return logBackupTaskCount > 0 +} + +// IsLogBackupInUse checks the log backup task existed. +func IsLogBackupInUse(ctx sessionctx.Context) bool { + return CheckLogBackupEnabled(ctx) && CheckLogBackupTaskExist() +} + +func GetTidbNewCollationEnabled() string { + return tidbNewCollationEnabled +} +>>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) From 69abddfb659db253b3d93143d106c27a50f2a40f Mon Sep 17 00:00:00 2001 From: MoCuishle28 Date: Thu, 24 Nov 2022 19:43:17 +0800 Subject: [PATCH 2/2] br: modify collate.newCollationEnabled according to the config of the cluster Signed-off-by: MoCuishle28 --- br/pkg/gluetidb/glue.go | 24 +- br/pkg/restore/BUILD.bazel | 181 ------ br/pkg/restore/client.go | 55 -- br/pkg/restore/client_test.go | 1047 +-------------------------------- br/pkg/task/common.go | 6 - br/pkg/task/restore.go | 46 -- br/pkg/utils/db.go | 79 --- 7 files changed, 3 insertions(+), 1435 deletions(-) delete mode 100644 br/pkg/restore/BUILD.bazel diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index 997747c296d8f..39bf953aba964 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -298,8 +298,6 @@ func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) { func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string { return executor.ConstructResultOfShowCreatePlacementPolicy(policy) } -<<<<<<< HEAD -======= // mockSession is used for test. type mockSession struct { @@ -318,23 +316,6 @@ func (s *mockSession) Execute(ctx context.Context, sql string) error { } func (s *mockSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error { - ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR) - rs, err := s.se.ExecuteInternal(ctx, sql, args...) - if err != nil { - return err - } - // Some of SQLs (like ADMIN RECOVER INDEX) may lazily take effect - // when we polling the result set. - // At least call `next` once for triggering theirs side effect. - // (Maybe we'd better drain all returned rows?) - if rs != nil { - //nolint: errcheck - defer rs.Close() - c := rs.NewChunk(nil) - if err := rs.Next(ctx, c); err != nil { - return nil - } - } return nil } @@ -351,13 +332,13 @@ func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.P } // CreateTables implements glue.BatchCreateTableSession. -func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { +func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error { log.Fatal("unimplemented CreateDatabase for mock session") return nil } // CreateTable implements glue.Session. -func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { +func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error { log.Fatal("unimplemented CreateDatabase for mock session") return nil } @@ -430,4 +411,3 @@ func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func } return fn(glueSession) } ->>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel deleted file mode 100644 index 77c2fc2976570..0000000000000 --- a/br/pkg/restore/BUILD.bazel +++ /dev/null @@ -1,181 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "restore", - srcs = [ - "batcher.go", - "client.go", - "data.go", - "db.go", - "import.go", - "import_retry.go", - "log_client.go", - "merge.go", - "pipeline_items.go", - "range.go", - "rawkv_client.go", - "search.go", - "split.go", - "stream_metas.go", - "systable_restore.go", - "util.go", - ], - importpath = "github.com/pingcap/tidb/br/pkg/restore", - visibility = ["//visibility:public"], - deps = [ - "//br/pkg/backup", - "//br/pkg/checksum", - "//br/pkg/common", - "//br/pkg/conn", - "//br/pkg/conn/util", - "//br/pkg/errors", - "//br/pkg/glue", - "//br/pkg/logutil", - "//br/pkg/metautil", - "//br/pkg/pdutil", - "//br/pkg/redact", - "//br/pkg/restore/prealloc_table_id", - "//br/pkg/restore/split", - "//br/pkg/restore/tiflashrec", - "//br/pkg/rtree", - "//br/pkg/storage", - "//br/pkg/stream", - "//br/pkg/summary", - "//br/pkg/utils", - "//br/pkg/utils/iter", - "//br/pkg/version", - "//config", - "//ddl", - "//ddl/util", - "//domain", - "//kv", - "//meta", - "//parser/model", - "//parser/mysql", - "//sessionctx/variable", - "//statistics/handle", - "//store/pdtypes", - "//tablecodec", - "//util", - "//util/codec", - "//util/collate", - "//util/hack", - "//util/mathutil", - "//util/table-filter", - "@com_github_emirpasic_gods//maps/treemap", - "@com_github_go_sql_driver_mysql//:mysql", - "@com_github_google_uuid//:uuid", - "@com_github_opentracing_opentracing_go//:opentracing-go", - "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_failpoint//:failpoint", - "@com_github_pingcap_kvproto//pkg/brpb", - "@com_github_pingcap_kvproto//pkg/errorpb", - "@com_github_pingcap_kvproto//pkg/import_sstpb", - "@com_github_pingcap_kvproto//pkg/kvrpcpb", - "@com_github_pingcap_kvproto//pkg/metapb", - "@com_github_pingcap_kvproto//pkg/pdpb", - "@com_github_pingcap_kvproto//pkg/recoverdatapb", - "@com_github_pingcap_log//:log", - "@com_github_tikv_client_go_v2//config", - "@com_github_tikv_client_go_v2//kv", - "@com_github_tikv_client_go_v2//oracle", - "@com_github_tikv_client_go_v2//rawkv", - "@com_github_tikv_client_go_v2//tikv", - "@com_github_tikv_client_go_v2//txnkv/rangetask", - "@com_github_tikv_pd_client//:client", - "@org_golang_google_grpc//:grpc", - "@org_golang_google_grpc//backoff", - "@org_golang_google_grpc//codes", - "@org_golang_google_grpc//credentials", - "@org_golang_google_grpc//keepalive", - "@org_golang_google_grpc//status", - "@org_golang_x_exp//slices", - "@org_golang_x_sync//errgroup", - "@org_uber_go_multierr//:multierr", - "@org_uber_go_zap//:zap", - "@org_uber_go_zap//zapcore", - ], -) - -go_test( - name = "restore_test", - timeout = "short", - srcs = [ - "batcher_test.go", - "client_test.go", - "data_test.go", - "db_test.go", - "import_retry_test.go", - "log_client_test.go", - "main_test.go", - "merge_fuzz_test.go", - "merge_test.go", - "range_test.go", - "rawkv_client_test.go", - "search_test.go", - "split_test.go", - "stream_metas_test.go", - "util_test.go", - ], - embed = [":restore"], - flaky = True, - race = "on", - shard_count = 20, - deps = [ - "//br/pkg/backup", - "//br/pkg/conn", - "//br/pkg/errors", - "//br/pkg/glue", - "//br/pkg/gluetidb", - "//br/pkg/logutil", - "//br/pkg/metautil", - "//br/pkg/mock", - "//br/pkg/pdutil", - "//br/pkg/restore/split", - "//br/pkg/restore/tiflashrec", - "//br/pkg/rtree", - "//br/pkg/storage", - "//br/pkg/stream", - "//br/pkg/utils", - "//br/pkg/utils/iter", - "//infoschema", - "//kv", - "//meta/autoid", - "//parser/model", - "//parser/mysql", - "//parser/types", - "//sessionctx/stmtctx", - "//store/pdtypes", - "//tablecodec", - "//testkit", - "//testkit/testsetup", - "//types", - "//util/codec", - "@com_github_fsouza_fake_gcs_server//fakestorage", - "@com_github_golang_protobuf//proto", - "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_failpoint//:failpoint", - "@com_github_pingcap_kvproto//pkg/brpb", - "@com_github_pingcap_kvproto//pkg/encryptionpb", - "@com_github_pingcap_kvproto//pkg/errorpb", - "@com_github_pingcap_kvproto//pkg/import_sstpb", - "@com_github_pingcap_kvproto//pkg/metapb", - "@com_github_pingcap_kvproto//pkg/pdpb", - "@com_github_pingcap_kvproto//pkg/recoverdatapb", - "@com_github_pingcap_log//:log", - "@com_github_stretchr_testify//assert", - "@com_github_stretchr_testify//require", - "@com_github_tikv_client_go_v2//oracle", - "@com_github_tikv_client_go_v2//rawkv", - "@com_github_tikv_client_go_v2//testutils", - "@com_github_tikv_pd_client//:client", - "@org_golang_google_grpc//codes", - "@org_golang_google_grpc//keepalive", - "@org_golang_google_grpc//status", - "@org_golang_x_exp//slices", - "@org_uber_go_goleak//:goleak", - "@org_uber_go_multierr//:multierr", - "@org_uber_go_zap//:zap", - "@org_uber_go_zap//zapcore", - ], -) diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 2c535974956ad..db90d11db1d34 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -2017,60 +2017,6 @@ func (rc *Client) SaveSchemas( } return nil } -<<<<<<< HEAD -======= - -// InitFullClusterRestore init fullClusterRestore and set SkipGrantTable as needed -func (rc *Client) InitFullClusterRestore(explicitFilter bool) { - rc.fullClusterRestore = !explicitFilter && rc.IsFull() - - log.Info("full cluster restore", zap.Bool("value", rc.fullClusterRestore)) - - if rc.fullClusterRestore { - // have to skip grant table, in order to NotifyUpdatePrivilege - config.GetGlobalConfig().Security.SkipGrantTable = true - } -} - -func (rc *Client) IsFullClusterRestore() bool { - return rc.fullClusterRestore -} - -func (rc *Client) SetWithSysTable(withSysTable bool) { - rc.withSysTable = withSysTable -} - -// MockClient create a fake client used to test. -func MockClient(dbs map[string]*utils.Database) *Client { - return &Client{databases: dbs} -} - -// TidyOldSchemas produces schemas information. -func TidyOldSchemas(sr *stream.SchemasReplace) *backup.Schemas { - var schemaIsEmpty bool - schemas := backup.NewBackupSchemas() - - for _, dr := range sr.DbMap { - if dr.OldDBInfo == nil { - continue - } - - schemaIsEmpty = true - for _, tr := range dr.TableMap { - if tr.OldTableInfo == nil { - continue - } - schemas.AddSchema(dr.OldDBInfo, tr.OldTableInfo) - schemaIsEmpty = false - } - - // backup this empty schema if it has nothing table. - if schemaIsEmpty { - schemas.AddSchema(dr.OldDBInfo, nil) - } - } - return schemas -} func CheckNewCollationEnable( backupNewCollationEnable string, @@ -2115,4 +2061,3 @@ func CheckNewCollationEnable( log.Info("set new_collation_enabled", zap.Bool("new_collation_enabled", enabled)) return nil } ->>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index 76787c8b0028d..3f42c110f2e31 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/gluetidb" "github.com/pingcap/tidb/br/pkg/metautil" @@ -240,1051 +241,6 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) { require.Nil(t, tables[i].Info.TiFlashReplica) } } -<<<<<<< HEAD -======= - -// Mock ImporterClient interface -type FakeImporterClient struct { - restore.ImporterClient -} - -// Record the stores that have communicated -type RecordStores struct { - mu sync.Mutex - stores []uint64 -} - -func NewRecordStores() RecordStores { - return RecordStores{stores: make([]uint64, 0)} -} - -func (r *RecordStores) put(id uint64) { - r.mu.Lock() - defer r.mu.Unlock() - r.stores = append(r.stores, id) -} - -func (r *RecordStores) sort() { - r.mu.Lock() - defer r.mu.Unlock() - slices.Sort(r.stores) -} - -func (r *RecordStores) len() int { - r.mu.Lock() - defer r.mu.Unlock() - return len(r.stores) -} - -func (r *RecordStores) get(i int) uint64 { - r.mu.Lock() - defer r.mu.Unlock() - return r.stores[i] -} - -func (r *RecordStores) toString() string { - r.mu.Lock() - defer r.mu.Unlock() - return fmt.Sprintf("%v", r.stores) -} - -var recordStores RecordStores - -const ( - SET_SPEED_LIMIT_ERROR = 999999 - WORKING_TIME = 100 -) - -func (fakeImportCli FakeImporterClient) SetDownloadSpeedLimit( - ctx context.Context, - storeID uint64, - req *import_sstpb.SetDownloadSpeedLimitRequest, -) (*import_sstpb.SetDownloadSpeedLimitResponse, error) { - if storeID == SET_SPEED_LIMIT_ERROR { - return nil, fmt.Errorf("storeID:%v ERROR", storeID) - } - - time.Sleep(WORKING_TIME * time.Millisecond) // simulate doing 100 ms work - recordStores.put(storeID) - return nil, nil -} - -func TestSetSpeedLimit(t *testing.T) { - mockStores := []*metapb.Store{ - {Id: 1}, - {Id: 2}, - {Id: 3}, - {Id: 4}, - {Id: 5}, - {Id: 6}, - {Id: 7}, - {Id: 8}, - {Id: 9}, - {Id: 10}, - } - - // 1. The cost of concurrent communication is expected to be less than the cost of serial communication. - client := restore.NewRestoreClient(fakePDClient{ - stores: mockStores, - }, nil, defaultKeepaliveCfg, false) - ctx := context.Background() - - recordStores = NewRecordStores() - start := time.Now() - err := restore.MockCallSetSpeedLimit(ctx, FakeImporterClient{}, client, 10) - cost := time.Since(start) - require.NoError(t, err) - - recordStores.sort() - t.Logf("Total Cost: %v\n", cost) - t.Logf("Has Communicated: %v\n", recordStores.toString()) - - serialCost := len(mockStores) * WORKING_TIME - require.Less(t, cost, time.Duration(serialCost)*time.Millisecond) - require.Equal(t, len(mockStores), recordStores.len()) - for i := 0; i < recordStores.len(); i++ { - require.Equal(t, mockStores[i].Id, recordStores.get(i)) - } - - // 2. Expect the number of communicated stores to be less than the length of the mockStore - // Because subsequent unstarted communications are aborted when an error is encountered. - recordStores = NewRecordStores() - mockStores[5].Id = SET_SPEED_LIMIT_ERROR // setting a fault store - client = restore.NewRestoreClient(fakePDClient{ - stores: mockStores, - }, nil, defaultKeepaliveCfg, false) - - // Concurrency needs to be less than the number of stores - err = restore.MockCallSetSpeedLimit(ctx, FakeImporterClient{}, client, 2) - require.Error(t, err) - t.Log(err) - - recordStores.sort() - sort.Slice(mockStores, func(i, j int) bool { return mockStores[i].Id < mockStores[j].Id }) - t.Logf("Has Communicated: %v\n", recordStores.toString()) - require.Less(t, recordStores.len(), len(mockStores)) - for i := 0; i < recordStores.len(); i++ { - require.Equal(t, mockStores[i].Id, recordStores.get(i)) - } -} - -func TestDeleteRangeQuery(t *testing.T) { - ctx := context.Background() - m := mc - mockStores := []*metapb.Store{ - { - Id: 1, - Labels: []*metapb.StoreLabel{ - { - Key: "engine", - Value: "tiflash", - }, - }, - }, - { - Id: 2, - Labels: []*metapb.StoreLabel{ - { - Key: "engine", - Value: "tiflash", - }, - }, - }, - } - - g := gluetidb.New() - client := restore.NewRestoreClient(fakePDClient{ - stores: mockStores, - }, nil, defaultKeepaliveCfg, false) - err := client.Init(g, m.Storage) - require.NoError(t, err) - - client.RunGCRowsLoader(ctx) - - client.InsertDeleteRangeForTable(2, []int64{3}) - client.InsertDeleteRangeForTable(4, []int64{5, 6}) - - elementID := int64(1) - client.InsertDeleteRangeForIndex(7, &elementID, 8, []int64{1}) - client.InsertDeleteRangeForIndex(9, &elementID, 10, []int64{1, 2}) - - querys := client.GetGCRows() - require.Equal(t, querys[0], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (2, 1, '748000000000000003', '748000000000000004', %[1]d)") - require.Equal(t, querys[1], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (4, 1, '748000000000000005', '748000000000000006', %[1]d),(4, 2, '748000000000000006', '748000000000000007', %[1]d)") - require.Equal(t, querys[2], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (7, 1, '7480000000000000085f698000000000000001', '7480000000000000085f698000000000000002', %[1]d)") - require.Equal(t, querys[3], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (9, 2, '74800000000000000a5f698000000000000001', '74800000000000000a5f698000000000000002', %[1]d),(9, 3, '74800000000000000a5f698000000000000002', '74800000000000000a5f698000000000000003', %[1]d)") -} - -func TestRestoreMetaKVFilesWithBatchMethod1(t *testing.T) { - files := []*backuppb.DataFileInfo{} - batchCount := 0 - - client := restore.MockClient(nil) - err := client.RestoreMetaKVFilesWithBatchMethod( - context.Background(), - files, - files, - nil, - nil, - nil, - func( - ctx context.Context, - defaultFiles []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, - entries []*restore.KvEntryWithTS, - filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*restore.KvEntryWithTS, error) { - batchCount++ - return nil, nil - }, - ) - require.Nil(t, err) - require.Equal(t, batchCount, 0) -} - -func TestRestoreMetaKVFilesWithBatchMethod2(t *testing.T) { - files := []*backuppb.DataFileInfo{ - { - Path: "f1", - MinTs: 100, - MaxTs: 120, - }, - } - batchCount := 0 - result := make(map[int][]*backuppb.DataFileInfo) - - client := restore.MockClient(nil) - err := client.RestoreMetaKVFilesWithBatchMethod( - context.Background(), - files, - nil, - nil, - nil, - nil, - func( - ctx context.Context, - fs []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, - entries []*restore.KvEntryWithTS, - filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*restore.KvEntryWithTS, error) { - if len(fs) > 0 { - result[batchCount] = fs - batchCount++ - } - return nil, nil - }, - ) - require.Nil(t, err) - require.Equal(t, batchCount, 1) - require.Equal(t, len(result), 1) - require.Equal(t, result[0], files) -} - -func TestRestoreMetaKVFilesWithBatchMethod3(t *testing.T) { - defaultFiles := []*backuppb.DataFileInfo{ - { - Path: "f1", - MinTs: 100, - MaxTs: 120, - }, - { - Path: "f2", - MinTs: 100, - MaxTs: 120, - }, - { - Path: "f3", - MinTs: 110, - MaxTs: 130, - }, - { - Path: "f4", - MinTs: 140, - MaxTs: 150, - }, - { - Path: "f5", - MinTs: 150, - MaxTs: 160, - }, - } - writeFiles := []*backuppb.DataFileInfo{ - { - Path: "f1", - MinTs: 100, - MaxTs: 120, - }, - { - Path: "f2", - MinTs: 100, - MaxTs: 120, - }, - { - Path: "f3", - MinTs: 110, - MaxTs: 130, - }, - { - Path: "f4", - MinTs: 135, - MaxTs: 150, - }, - { - Path: "f5", - MinTs: 150, - MaxTs: 160, - }, - } - - batchCount := 0 - result := make(map[int][]*backuppb.DataFileInfo) - resultKV := make(map[int]int) - - client := restore.MockClient(nil) - err := client.RestoreMetaKVFilesWithBatchMethod( - context.Background(), - defaultFiles, - writeFiles, - nil, - nil, - nil, - func( - ctx context.Context, - fs []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, - entries []*restore.KvEntryWithTS, - filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*restore.KvEntryWithTS, error) { - result[batchCount] = fs - t.Log(filterTS) - resultKV[batchCount] = len(entries) - batchCount++ - return make([]*restore.KvEntryWithTS, batchCount), nil - }, - ) - require.Nil(t, err) - require.Equal(t, len(result), 4) - require.Equal(t, result[0], defaultFiles[0:3]) - require.Equal(t, resultKV[0], 0) - require.Equal(t, result[1], writeFiles[0:4]) - require.Equal(t, resultKV[1], 0) - require.Equal(t, result[2], defaultFiles[3:]) - require.Equal(t, resultKV[2], 1) - require.Equal(t, result[3], writeFiles[4:]) - require.Equal(t, resultKV[3], 2) -} - -func TestRestoreMetaKVFilesWithBatchMethod4(t *testing.T) { - defaultFiles := []*backuppb.DataFileInfo{ - { - Path: "f1", - MinTs: 100, - MaxTs: 100, - }, - { - Path: "f2", - MinTs: 100, - MaxTs: 100, - }, - { - Path: "f3", - MinTs: 110, - MaxTs: 130, - }, - { - Path: "f4", - MinTs: 110, - MaxTs: 150, - }, - } - - writeFiles := []*backuppb.DataFileInfo{ - { - Path: "f1", - MinTs: 100, - MaxTs: 100, - }, - { - Path: "f2", - MinTs: 100, - MaxTs: 100, - }, - { - Path: "f3", - MinTs: 110, - MaxTs: 130, - }, - { - Path: "f4", - MinTs: 110, - MaxTs: 150, - }, - } - batchCount := 0 - result := make(map[int][]*backuppb.DataFileInfo) - - client := restore.MockClient(nil) - err := client.RestoreMetaKVFilesWithBatchMethod( - context.Background(), - defaultFiles, - writeFiles, - nil, - nil, - nil, - func( - ctx context.Context, - fs []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, - entries []*restore.KvEntryWithTS, - filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*restore.KvEntryWithTS, error) { - result[batchCount] = fs - batchCount++ - return nil, nil - }, - ) - require.Nil(t, err) - require.Equal(t, len(result), 4) - require.Equal(t, result[0], defaultFiles[0:2]) - require.Equal(t, result[1], writeFiles[0:2]) - require.Equal(t, result[2], defaultFiles[2:]) - require.Equal(t, result[3], writeFiles[2:]) -} - -func TestRestoreMetaKVFilesWithBatchMethod5(t *testing.T) { - defaultFiles := []*backuppb.DataFileInfo{ - { - Path: "f1", - MinTs: 100, - MaxTs: 100, - }, - { - Path: "f2", - MinTs: 100, - MaxTs: 100, - }, - { - Path: "f3", - MinTs: 110, - MaxTs: 130, - }, - { - Path: "f4", - MinTs: 110, - MaxTs: 150, - }, - } - - writeFiles := []*backuppb.DataFileInfo{ - { - Path: "f1", - MinTs: 100, - MaxTs: 100, - }, - { - Path: "f2", - MinTs: 100, - MaxTs: 100, - }, - { - Path: "f3", - MinTs: 100, - MaxTs: 130, - }, - { - Path: "f4", - MinTs: 100, - MaxTs: 150, - }, - } - batchCount := 0 - result := make(map[int][]*backuppb.DataFileInfo) - - client := restore.MockClient(nil) - err := client.RestoreMetaKVFilesWithBatchMethod( - context.Background(), - defaultFiles, - writeFiles, - nil, - nil, - nil, - func( - ctx context.Context, - fs []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, - entries []*restore.KvEntryWithTS, - filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*restore.KvEntryWithTS, error) { - result[batchCount] = fs - batchCount++ - return nil, nil - }, - ) - require.Nil(t, err) - require.Equal(t, len(result), 4) - require.Equal(t, result[0], defaultFiles[0:2]) - require.Equal(t, result[1], writeFiles[0:]) - require.Equal(t, result[2], defaultFiles[2:]) - require.Equal(t, len(result[3]), 0) -} - -func TestRestoreMetaKVFilesWithBatchMethod6(t *testing.T) { - defaultFiles := []*backuppb.DataFileInfo{ - { - Path: "f1", - MinTs: 100, - MaxTs: 120, - Length: 1, - }, - { - Path: "f2", - MinTs: 100, - MaxTs: 120, - Length: restore.MetaKVBatchSize, - }, - { - Path: "f3", - MinTs: 110, - MaxTs: 130, - Length: 1, - }, - { - Path: "f4", - MinTs: 140, - MaxTs: 150, - Length: 1, - }, - { - Path: "f5", - MinTs: 150, - MaxTs: 160, - Length: 1, - }, - } - - writeFiles := []*backuppb.DataFileInfo{ - { - Path: "f1", - MinTs: 100, - MaxTs: 120, - }, - { - Path: "f2", - MinTs: 100, - MaxTs: 120, - }, - { - Path: "f3", - MinTs: 110, - MaxTs: 140, - }, - { - Path: "f4", - MinTs: 120, - MaxTs: 150, - }, - { - Path: "f5", - MinTs: 140, - MaxTs: 160, - }, - } - - batchCount := 0 - result := make(map[int][]*backuppb.DataFileInfo) - resultKV := make(map[int]int) - - client := restore.MockClient(nil) - err := client.RestoreMetaKVFilesWithBatchMethod( - context.Background(), - defaultFiles, - writeFiles, - nil, - nil, - nil, - func( - ctx context.Context, - fs []*backuppb.DataFileInfo, - schemasReplace *stream.SchemasReplace, - entries []*restore.KvEntryWithTS, - filterTS uint64, - updateStats func(kvCount uint64, size uint64), - progressInc func(), - cf string, - ) ([]*restore.KvEntryWithTS, error) { - result[batchCount] = fs - t.Log(filterTS) - resultKV[batchCount] = len(entries) - batchCount++ - return make([]*restore.KvEntryWithTS, batchCount), nil - }, - ) - require.Nil(t, err) - require.Equal(t, len(result), 6) - require.Equal(t, result[0], defaultFiles[0:2]) - require.Equal(t, resultKV[0], 0) - require.Equal(t, result[1], writeFiles[0:2]) - require.Equal(t, resultKV[1], 0) - require.Equal(t, result[2], defaultFiles[2:3]) - require.Equal(t, resultKV[2], 1) - require.Equal(t, result[3], writeFiles[2:4]) - require.Equal(t, resultKV[3], 2) - require.Equal(t, result[4], defaultFiles[3:]) - require.Equal(t, resultKV[4], 3) - require.Equal(t, result[5], writeFiles[4:]) - require.Equal(t, resultKV[5], 4) -} - -func TestSortMetaKVFiles(t *testing.T) { - files := []*backuppb.DataFileInfo{ - { - Path: "f5", - MinTs: 110, - MaxTs: 150, - ResolvedTs: 120, - }, - { - Path: "f1", - MinTs: 100, - MaxTs: 100, - ResolvedTs: 80, - }, - { - Path: "f2", - MinTs: 100, - MaxTs: 100, - ResolvedTs: 90, - }, - { - Path: "f4", - MinTs: 110, - MaxTs: 130, - ResolvedTs: 120, - }, - { - Path: "f3", - MinTs: 105, - MaxTs: 130, - ResolvedTs: 100, - }, - } - - files = restore.SortMetaKVFiles(files) - require.Equal(t, len(files), 5) - require.Equal(t, files[0].Path, "f1") - require.Equal(t, files[1].Path, "f2") - require.Equal(t, files[2].Path, "f3") - require.Equal(t, files[3].Path, "f4") - require.Equal(t, files[4].Path, "f5") -} - -func TestApplyKVFilesWithSingelMethod(t *testing.T) { - var ( - totalKVCount int64 = 0 - totalSize uint64 = 0 - logs = make([]string, 0) - ) - ds := []*backuppb.DataFileInfo{ - { - Path: "log3", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Delete, - }, - { - Path: "log1", - NumberOfEntries: 5, - Length: 100, - Cf: stream.DefaultCF, - Type: backuppb.FileType_Put, - }, { - Path: "log2", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - }, - } - applyFunc := func( - files []*backuppb.DataFileInfo, - kvCount int64, - size uint64, - ) { - totalKVCount += kvCount - totalSize += size - for _, f := range files { - logs = append(logs, f.GetPath()) - } - } - - restore.ApplyKVFilesWithSingelMethod( - context.TODO(), - iter.FromSlice(ds), - applyFunc, - ) - - require.Equal(t, totalKVCount, int64(15)) - require.Equal(t, totalSize, uint64(300)) - require.Equal(t, logs, []string{"log1", "log2", "log3"}) -} - -func TestApplyKVFilesWithBatchMethod1(t *testing.T) { - var ( - runCount = 0 - batchCount int = 3 - batchSize uint64 = 1000 - totalKVCount int64 = 0 - logs = make([][]string, 0) - ) - ds := []*backuppb.DataFileInfo{ - { - Path: "log5", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Delete, - RegionId: 1, - }, { - Path: "log3", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - RegionId: 1, - }, { - Path: "log4", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - RegionId: 1, - }, { - Path: "log1", - NumberOfEntries: 5, - Length: 800, - Cf: stream.DefaultCF, - Type: backuppb.FileType_Put, - RegionId: 1, - }, - { - Path: "log2", - NumberOfEntries: 5, - Length: 200, - Cf: stream.DefaultCF, - Type: backuppb.FileType_Put, - RegionId: 1, - }, - } - applyFunc := func( - files []*backuppb.DataFileInfo, - kvCount int64, - size uint64, - ) { - runCount += 1 - totalKVCount += kvCount - log := make([]string, 0, len(files)) - for _, f := range files { - log = append(log, f.GetPath()) - } - logs = append(logs, log) - } - - restore.ApplyKVFilesWithBatchMethod( - context.TODO(), - iter.FromSlice(ds), - batchCount, - batchSize, - applyFunc, - ) - - require.Equal(t, runCount, 3) - require.Equal(t, totalKVCount, int64(25)) - require.Equal(t, - logs, - [][]string{ - {"log1", "log2"}, - {"log3", "log4"}, - {"log5"}, - }, - ) -} - -func TestApplyKVFilesWithBatchMethod2(t *testing.T) { - var ( - runCount = 0 - batchCount int = 2 - batchSize uint64 = 1500 - totalKVCount int64 = 0 - logs = make([][]string, 0) - ) - ds := []*backuppb.DataFileInfo{ - { - Path: "log1", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Delete, - RegionId: 1, - }, { - Path: "log2", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - RegionId: 1, - }, { - Path: "log3", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - RegionId: 1, - }, { - Path: "log4", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - RegionId: 1, - }, { - Path: "log5", - NumberOfEntries: 5, - Length: 800, - Cf: stream.DefaultCF, - Type: backuppb.FileType_Put, - RegionId: 1, - }, - { - Path: "log6", - NumberOfEntries: 5, - Length: 200, - Cf: stream.DefaultCF, - Type: backuppb.FileType_Put, - RegionId: 1, - }, - } - applyFunc := func( - files []*backuppb.DataFileInfo, - kvCount int64, - size uint64, - ) { - runCount += 1 - totalKVCount += kvCount - log := make([]string, 0, len(files)) - for _, f := range files { - log = append(log, f.GetPath()) - } - logs = append(logs, log) - } - - restore.ApplyKVFilesWithBatchMethod( - context.TODO(), - iter.FromSlice(ds), - batchCount, - batchSize, - applyFunc, - ) - - require.Equal(t, runCount, 4) - require.Equal(t, totalKVCount, int64(30)) - require.Equal(t, - logs, - [][]string{ - {"log2", "log3"}, - {"log5", "log6"}, - {"log4"}, - {"log1"}, - }, - ) -} - -func TestApplyKVFilesWithBatchMethod3(t *testing.T) { - var ( - runCount = 0 - batchCount int = 2 - batchSize uint64 = 1500 - totalKVCount int64 = 0 - logs = make([][]string, 0) - ) - ds := []*backuppb.DataFileInfo{ - { - Path: "log1", - NumberOfEntries: 5, - Length: 2000, - Cf: stream.WriteCF, - Type: backuppb.FileType_Delete, - RegionId: 1, - }, { - Path: "log2", - NumberOfEntries: 5, - Length: 2000, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - RegionId: 1, - }, { - Path: "log3", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - RegionId: 1, - }, { - Path: "log4", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - RegionId: 2, - }, { - Path: "log5", - NumberOfEntries: 5, - Length: 800, - Cf: stream.DefaultCF, - Type: backuppb.FileType_Put, - RegionId: 3, - }, - { - Path: "log6", - NumberOfEntries: 5, - Length: 200, - Cf: stream.DefaultCF, - Type: backuppb.FileType_Put, - RegionId: 3, - }, - } - applyFunc := func( - files []*backuppb.DataFileInfo, - kvCount int64, - size uint64, - ) { - runCount += 1 - totalKVCount += kvCount - log := make([]string, 0, len(files)) - for _, f := range files { - log = append(log, f.GetPath()) - } - logs = append(logs, log) - } - - restore.ApplyKVFilesWithBatchMethod( - context.TODO(), - iter.FromSlice(ds), - batchCount, - batchSize, - applyFunc, - ) - - require.Equal(t, runCount, 5) - require.Equal(t, totalKVCount, int64(30)) - require.Equal(t, - logs, - [][]string{ - {"log2"}, - {"log5", "log6"}, - {"log3"}, - {"log4"}, - {"log1"}, - }, - ) -} - -func TestApplyKVFilesWithBatchMethod4(t *testing.T) { - var ( - runCount = 0 - batchCount int = 2 - batchSize uint64 = 1500 - totalKVCount int64 = 0 - logs = make([][]string, 0) - ) - ds := []*backuppb.DataFileInfo{ - { - Path: "log1", - NumberOfEntries: 5, - Length: 2000, - Cf: stream.WriteCF, - Type: backuppb.FileType_Delete, - TableId: 1, - }, { - Path: "log2", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - TableId: 1, - }, { - Path: "log3", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - TableId: 2, - }, { - Path: "log4", - NumberOfEntries: 5, - Length: 100, - Cf: stream.WriteCF, - Type: backuppb.FileType_Put, - TableId: 1, - }, { - Path: "log5", - NumberOfEntries: 5, - Length: 100, - Cf: stream.DefaultCF, - Type: backuppb.FileType_Put, - TableId: 2, - }, - } - applyFunc := func( - files []*backuppb.DataFileInfo, - kvCount int64, - size uint64, - ) { - runCount += 1 - totalKVCount += kvCount - log := make([]string, 0, len(files)) - for _, f := range files { - log = append(log, f.GetPath()) - } - logs = append(logs, log) - } - - restore.ApplyKVFilesWithBatchMethod( - context.TODO(), - iter.FromSlice(ds), - batchCount, - batchSize, - applyFunc, - ) - - require.Equal(t, runCount, 4) - require.Equal(t, totalKVCount, int64(25)) - require.Equal(t, - logs, - [][]string{ - {"log2", "log4"}, - {"log5"}, - {"log3"}, - {"log1"}, - }, - ) -} func TestCheckNewCollationEnable(t *testing.T) { caseList := []struct { @@ -1357,4 +313,3 @@ func TestCheckNewCollationEnable(t *testing.T) { } } } ->>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index f774c60ab91de..534983ad5e831 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -87,12 +87,6 @@ const ( crypterAES128KeyLen = 16 crypterAES192KeyLen = 24 crypterAES256KeyLen = 32 - -<<<<<<< HEAD - tidbNewCollationEnabled = "new_collation_enabled" -======= - flagFullBackupType = "type" ->>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) ) // TLSConfig is the common configuration for TLS connection. diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 209152c59d26a..fc2f6616288e1 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -4,7 +4,6 @@ package task import ( "context" - "strings" "time" "github.com/opentracing/opentracing-go" @@ -23,11 +22,6 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" "github.com/pingcap/tidb/config" -<<<<<<< HEAD - "github.com/pingcap/tidb/kv" -======= - "github.com/pingcap/tidb/util" ->>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) "github.com/pingcap/tidb/util/mathutil" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -334,46 +328,6 @@ func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error { return nil } -<<<<<<< HEAD -func CheckNewCollationEnable( - backupNewCollationEnable string, - g glue.Glue, - storage kv.Storage, - CheckRequirements bool, -) error { - if backupNewCollationEnable == "" { - if CheckRequirements { - return errors.Annotatef(berrors.ErrUnknown, - "the config 'new_collations_enabled_on_first_bootstrap' not found in backupmeta. "+ - "you can use \"show config WHERE name='new_collations_enabled_on_first_bootstrap';\" to manually check the config. "+ - "if you ensure the config 'new_collations_enabled_on_first_bootstrap' in backup cluster is as same as restore cluster, "+ - "use --check-requirements=false to skip this check") - } else { - log.Warn("the config 'new_collations_enabled_on_first_bootstrap' is not in backupmeta") - return nil - } - } - - se, err := g.CreateSession(storage) - if err != nil { - return errors.Trace(err) - } - - newCollationEnable, err := se.GetGlobalVariable(tidbNewCollationEnabled) - if err != nil { - return errors.Trace(err) - } - - if !strings.EqualFold(backupNewCollationEnable, newCollationEnable) { - return errors.Annotatef(berrors.ErrUnknown, - "the config 'new_collations_enabled_on_first_bootstrap' not match, upstream:%v, downstream: %v", - backupNewCollationEnable, newCollationEnable) - } - return nil -} - -======= ->>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173)) func isFullRestore(cmdName string) bool { return cmdName == FullRestoreCmd } diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index dfcdea0c5b462..3c0a71d074dfe 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -34,86 +34,7 @@ type DBExecutor interface { StmtExecutor BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error) } -<<<<<<< HEAD -======= - -// CheckLogBackupEnabled checks if LogBackup is enabled in cluster. -// this mainly used in three places. -// 1. GC worker resolve locks to scan more locks after safepoint. (every minute) -// 2. Add index skipping use lightning.(every add index ddl) -// 3. Telemetry of log backup feature usage (every 6 hours). -// NOTE: this result shouldn't be cached by caller. because it may change every time in one cluster. -func CheckLogBackupEnabled(ctx sessionctx.Context) bool { - executor, ok := ctx.(sqlexec.RestrictedSQLExecutor) - if !ok { - // shouldn't happen - log.Error("[backup] unable to translate executor from sessionctx") - return false - } - enabled, err := IsLogBackupEnabled(executor) - if err != nil { - // if it failed by any reason. we can simply return true this time. - // for GC worker it will scan more locks in one tick. - // for Add index it will skip using lightning this time. - // for Telemetry it will get a false positive usage count. - log.Warn("[backup] check log backup config failed, ignore it", zap.Error(err)) - return true - } - return enabled -} - -// IsLogBackupEnabled is used for br to check whether tikv has enabled log backup. -// we use `sqlexec.RestrictedSQLExecutor` as parameter because it's easy to mock. -// it should return error. -func IsLogBackupEnabled(ctx sqlexec.RestrictedSQLExecutor) (bool, error) { - valStr := "show config where name = 'log-backup.enable' and type = 'tikv'" - internalCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR) - rows, fields, errSQL := ctx.ExecRestrictedSQL(internalCtx, nil, valStr) - if errSQL != nil { - return false, errSQL - } - if len(rows) == 0 { - // no rows mean not support log backup. - return false, nil - } - for _, row := range rows { - d := row.GetDatum(3, &fields[3].Column.FieldType) - value, errField := d.ToString() - if errField != nil { - return false, errField - } - if strings.ToLower(value) == "false" { - return false, nil - } - } - return true, nil -} - -// CheckLogBackupTaskExist increases the count of log backup task. -func LogBackupTaskCountInc() { - LogBackupTaskMutex.Lock() - logBackupTaskCount++ - LogBackupTaskMutex.Unlock() -} - -// CheckLogBackupTaskExist decreases the count of log backup task. -func LogBackupTaskCountDec() { - LogBackupTaskMutex.Lock() - logBackupTaskCount-- - LogBackupTaskMutex.Unlock() -} - -// CheckLogBackupTaskExist checks that whether log-backup is existed. -func CheckLogBackupTaskExist() bool { - return logBackupTaskCount > 0 -} - -// IsLogBackupInUse checks the log backup task existed. -func IsLogBackupInUse(ctx sessionctx.Context) bool { - return CheckLogBackupEnabled(ctx) && CheckLogBackupTaskExist() -} func GetTidbNewCollationEnabled() string { return tidbNewCollationEnabled } ->>>>>>> 84703efd01 (br: modify collate.newCollationEnabled according to the config of the cluster (#39173))