From 1e6f30f3898f70c709adf9c03730a549a275d9c9 Mon Sep 17 00:00:00 2001 From: Jian Zhang Date: Mon, 28 Mar 2022 17:48:47 +0800 Subject: [PATCH] [to #67] remove unused code related to restore Signed-off-by: Jian Zhang --- br/cmd/br/debug.go | 174 +------------- br/pkg/conn/conn.go | 21 -- br/pkg/glue/glue.go | 8 - br/pkg/gluetidb/glue.go | 123 ---------- br/pkg/gluetikv/glue.go | 11 - br/pkg/restore/client.go | 65 +----- br/pkg/restore/db.go | 292 ------------------------ br/pkg/restore/db_test.go | 119 ---------- br/pkg/restore/systable_restore.go | 217 ------------------ br/pkg/restore/util.go | 125 ----------- br/pkg/restore/util_test.go | 37 --- br/pkg/task/backup_raw.go | 4 +- br/pkg/task/common.go | 73 +----- br/pkg/task/restore_raw.go | 4 +- br/pkg/utils/schema.go | 98 -------- br/pkg/utils/schema_test.go | 350 ----------------------------- 16 files changed, 8 insertions(+), 1713 deletions(-) delete mode 100644 br/pkg/restore/db.go delete mode 100644 br/pkg/restore/db_test.go delete mode 100644 br/pkg/restore/systable_restore.go delete mode 100644 br/pkg/utils/schema_test.go diff --git a/br/cmd/br/debug.go b/br/cmd/br/debug.go index a033346b..ea908eec 100644 --- a/br/cmd/br/debug.go +++ b/br/cmd/br/debug.go @@ -3,31 +3,18 @@ package main import ( - "bytes" "context" - "crypto/sha256" - "encoding/hex" - "encoding/json" "path" "reflect" "github.com/gogo/protobuf/proto" "github.com/pingcap/errors" - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/log" - "github.com/pingcap/tidb/parser/model" "github.com/spf13/cobra" - berrors "github.com/tikv/migration/br/pkg/errors" - "github.com/tikv/migration/br/pkg/logutil" "github.com/tikv/migration/br/pkg/metautil" - "github.com/tikv/migration/br/pkg/mock/mockid" - "github.com/tikv/migration/br/pkg/restore" - "github.com/tikv/migration/br/pkg/rtree" "github.com/tikv/migration/br/pkg/task" "github.com/tikv/migration/br/pkg/utils" "github.com/tikv/migration/br/pkg/version/build" - "go.uber.org/zap" ) // NewDebugCommand return a debug subcommand. @@ -64,80 +51,7 @@ func newCheckSumCommand() *cobra.Command { Short: "check the backup data", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, _ []string) error { - ctx, cancel := context.WithCancel(GetDefaultContext()) - defer cancel() - - var cfg task.Config - if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { - return errors.Trace(err) - } - - _, s, backupMeta, err := task.ReadBackupMeta(ctx, metautil.MetaFile, &cfg) - if err != nil { - return errors.Trace(err) - } - - reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - dbs, err := utils.LoadBackupTables(ctx, reader) - if err != nil { - return errors.Trace(err) - } - - for _, schema := range backupMeta.Schemas { - dbInfo := &model.DBInfo{} - err = json.Unmarshal(schema.Db, dbInfo) - if err != nil { - return errors.Trace(err) - } - tblInfo := &model.TableInfo{} - err = json.Unmarshal(schema.Table, tblInfo) - if err != nil { - return errors.Trace(err) - } - tbl := dbs[dbInfo.Name.String()].GetTable(tblInfo.Name.String()) - - var calCRC64 uint64 - var totalKVs uint64 - var totalBytes uint64 - for _, file := range tbl.Files { - calCRC64 ^= file.Crc64Xor - totalKVs += file.GetTotalKvs() - totalBytes += file.GetTotalBytes() - log.Info("file info", zap.Stringer("table", tblInfo.Name), - zap.String("file", file.GetName()), - zap.Uint64("crc64xor", file.GetCrc64Xor()), - zap.Uint64("totalKvs", file.GetTotalKvs()), - zap.Uint64("totalBytes", file.GetTotalBytes()), - zap.Uint64("startVersion", file.GetStartVersion()), - zap.Uint64("endVersion", file.GetEndVersion()), - logutil.Key("startKey", file.GetStartKey()), - logutil.Key("endKey", file.GetEndKey()), - ) - - var data []byte - data, err = s.ReadFile(ctx, file.Name) - if err != nil { - return errors.Trace(err) - } - s := sha256.Sum256(data) - if !bytes.Equal(s[:], file.Sha256) { - return errors.Annotatef(berrors.ErrBackupChecksumMismatch, ` -backup data checksum failed: %s may be changed -calculated sha256 is %s, -origin sha256 is %s`, - file.Name, hex.EncodeToString(s[:]), hex.EncodeToString(file.Sha256)) - } - } - log.Info("table info", zap.Stringer("table", tblInfo.Name), - zap.Uint64("CRC64", calCRC64), - zap.Uint64("totalKvs", totalKVs), - zap.Uint64("totalBytes", totalBytes), - zap.Uint64("schemaTotalKvs", schema.TotalKvs), - zap.Uint64("schemaTotalBytes", schema.TotalBytes), - zap.Uint64("schemaCRC64", schema.Crc64Xor)) - } - cmd.Println("backup data checksum succeed!") - return nil + return errors.Errorf("checksum is unsupported") }, } command.Hidden = true @@ -159,89 +73,7 @@ func newBackupMetaValidateCommand() *cobra.Command { Use: "validate", Short: "validate key range and rewrite rules of backupmeta", RunE: func(cmd *cobra.Command, _ []string) error { - ctx, cancel := context.WithCancel(GetDefaultContext()) - defer cancel() - - tableIDOffset, err := cmd.Flags().GetUint64("offset") - if err != nil { - return errors.Trace(err) - } - - var cfg task.Config - if err = cfg.ParseFromFlags(cmd.Flags()); err != nil { - return errors.Trace(err) - } - _, s, backupMeta, err := task.ReadBackupMeta(ctx, metautil.MetaFile, &cfg) - if err != nil { - log.Error("read backupmeta failed", zap.Error(err)) - return errors.Trace(err) - } - reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - dbs, err := utils.LoadBackupTables(ctx, reader) - if err != nil { - log.Error("load tables failed", zap.Error(err)) - return errors.Trace(err) - } - files := make([]*backuppb.File, 0) - tables := make([]*metautil.Table, 0) - for _, db := range dbs { - for _, table := range db.Tables { - files = append(files, table.Files...) - } - tables = append(tables, db.Tables...) - } - // Check if the ranges of files overlapped - rangeTree := rtree.NewRangeTree() - for _, file := range files { - if out := rangeTree.InsertRange(rtree.Range{ - StartKey: file.GetStartKey(), - EndKey: file.GetEndKey(), - }); out != nil { - log.Error( - "file ranges overlapped", - zap.Stringer("out", out), - logutil.File(file), - ) - } - } - - tableIDAllocator := mockid.NewIDAllocator() - // Advance table ID allocator to the offset. - for offset := uint64(0); offset < tableIDOffset; offset++ { - _, _ = tableIDAllocator.Alloc() // Ignore error - } - rewriteRules := &restore.RewriteRules{ - Data: make([]*import_sstpb.RewriteRule, 0), - } - tableIDMap := make(map[int64]int64) - // Simulate to create table - for _, table := range tables { - indexIDAllocator := mockid.NewIDAllocator() - newTable := new(model.TableInfo) - tableID, _ := tableIDAllocator.Alloc() - newTable.ID = int64(tableID) - newTable.Name = table.Info.Name - newTable.Indices = make([]*model.IndexInfo, len(table.Info.Indices)) - for i, indexInfo := range table.Info.Indices { - indexID, _ := indexIDAllocator.Alloc() - newTable.Indices[i] = &model.IndexInfo{ - ID: int64(indexID), - Name: indexInfo.Name, - } - } - rules := restore.GetRewriteRules(newTable, table.Info, 0) - rewriteRules.Data = append(rewriteRules.Data, rules.Data...) - tableIDMap[table.Info.ID] = int64(tableID) - } - // Validate rewrite rules - for _, file := range files { - err = restore.ValidateFileRewriteRule(file, rewriteRules) - if err != nil { - return errors.Trace(err) - } - } - cmd.Println("Check backupmeta done") - return nil + return errors.Errorf("validate is unsupported") }, } command.Flags().Uint64("offset", 0, "the offset of table id alloctor") @@ -376,7 +208,7 @@ func setPDConfigCommand() *cobra.Command { return errors.Trace(err) } - mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements, false) + mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index 77daaebb..1ad0fda2 100755 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -16,7 +16,6 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" - "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv/txnlock" @@ -104,7 +103,6 @@ func NewConnPool(cap int, newConn func(ctx context.Context) (*grpc.ClientConn, e type Mgr struct { *pdutil.PdController tlsConf *tls.Config - dom *domain.Domain storage kv.Storage // Used to access SQL related interfaces. tikvStore tikv.Storage // Used to access TiKV specific interfaces. grpcClis struct { @@ -222,7 +220,6 @@ func checkStoresAlive(ctx context.Context, // NewMgr creates a new Mgr. // -// Domain is optional for Backup, set `needDomain` to false to disable // initializing Domain. func NewMgr( ctx context.Context, @@ -233,7 +230,6 @@ func NewMgr( keepalive keepalive.ClientParameters, storeBehavior StoreBehavior, checkRequirements bool, - needDomain bool, ) (*Mgr, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("conn.NewMgr", opentracing.ChildOf(span.Context())) @@ -272,19 +268,10 @@ func NewMgr( return nil, berrors.ErrKVNotTiKV } - var dom *domain.Domain - if needDomain { - dom, err = g.GetDomain(storage) - if err != nil { - return nil, errors.Trace(err) - } - } - mgr := &Mgr{ PdController: controller, storage: storage, tikvStore: tikvStorage, - dom: dom, tlsConf: tlsConf, ownsStorage: g.OwnsStorage(), grpcClis: struct { @@ -418,11 +405,6 @@ func (mgr *Mgr) GetLockResolver() *txnlock.LockResolver { return mgr.tikvStore.GetLockResolver() } -// GetDomain returns a tikv storage. -func (mgr *Mgr) GetDomain() *domain.Domain { - return mgr.dom -} - // Close closes all client in Mgr. func (mgr *Mgr) Close() { mgr.grpcClis.mu.Lock() @@ -437,9 +419,6 @@ func (mgr *Mgr) Close() { // Gracefully shutdown domain so it does not affect other TiDB DDL. // Must close domain before closing storage, otherwise it gets stuck forever. if mgr.ownsStorage { - if mgr.dom != nil { - mgr.dom.Close() - } tikv.StoreShuttingDown(1) mgr.storage.Close() } diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 7f2be30a..e21b526c 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -5,16 +5,12 @@ package glue import ( "context" - "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/model" pd "github.com/tikv/pd/client" ) // Glue is an abstraction of TiDB function calls used in BR. type Glue interface { - GetDomain(store kv.Storage) (*domain.Domain, error) - CreateSession(store kv.Storage) (Session, error) Open(path string, option pd.SecurityOption) (kv.Storage, error) // OwnsStorage returns whether the storage returned by Open() is owned @@ -32,10 +28,6 @@ type Glue interface { // Session is an abstraction of the session.Session interface. type Session interface { - Execute(ctx context.Context, sql string) error - ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error - CreateDatabase(ctx context.Context, schema *model.DBInfo) error - CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error Close() } diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index 952de0d3..edb596d0 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -3,32 +3,16 @@ package gluetidb import ( - "bytes" "context" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/ddl" - "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/pingcap/tidb/session" - "github.com/pingcap/tidb/sessionctx" "github.com/tikv/migration/br/pkg/glue" "github.com/tikv/migration/br/pkg/gluetikv" pd "github.com/tikv/pd/client" ) -const ( - defaultCapOfCreateTable = 512 - defaultCapOfCreateDatabase = 64 - brComment = `/*from(br)*/` -) - // New makes a new tidb glue. func New() Glue { log.Debug("enabling no register config") @@ -43,40 +27,6 @@ type Glue struct { tikvGlue gluetikv.Glue } -type tidbSession struct { - se session.Session -} - -// GetDomain implements glue.Glue. -func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) { - se, err := session.CreateSession(store) - if err != nil { - return nil, errors.Trace(err) - } - dom, err := session.GetDomain(store) - if err != nil { - return nil, errors.Trace(err) - } - // create stats handler for backup and restore. - err = dom.UpdateTableStatsLoop(se) - if err != nil { - return nil, errors.Trace(err) - } - return dom, nil -} - -// CreateSession implements glue.Glue. -func (Glue) CreateSession(store kv.Storage) (glue.Session, error) { - se, err := session.CreateSession(store) - if err != nil { - return nil, errors.Trace(err) - } - tiSession := &tidbSession{ - se: se, - } - return tiSession, nil -} - // Open implements glue.Glue. func (g Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) { return g.tikvGlue.Open(path, option) @@ -101,76 +51,3 @@ func (g Glue) Record(name string, value uint64) { func (g Glue) GetVersion() string { return g.tikvGlue.GetVersion() } - -// Execute implements glue.Session. -func (gs *tidbSession) Execute(ctx context.Context, sql string) error { - _, err := gs.se.ExecuteInternal(ctx, sql) - return errors.Trace(err) -} - -func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error { - _, err := gs.se.ExecuteInternal(ctx, sql, args...) - return errors.Trace(err) -} - -// CreateDatabase implements glue.Session. -func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { - d := domain.GetDomain(gs.se).DDL() - query, err := gs.showCreateDatabase(schema) - if err != nil { - return errors.Trace(err) - } - gs.se.SetValue(sessionctx.QueryString, query) - schema = schema.Clone() - if len(schema.Charset) == 0 { - schema.Charset = mysql.DefaultCharset - } - return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore, true) -} - -// CreateTable implements glue.Session. -func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error { - d := domain.GetDomain(gs.se).DDL() - query, err := gs.showCreateTable(table) - if err != nil { - return errors.Trace(err) - } - gs.se.SetValue(sessionctx.QueryString, query) - // Clone() does not clone partitions yet :( - table = table.Clone() - if table.Partition != nil { - newPartition := *table.Partition - newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...) - table.Partition = &newPartition - } - return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore, true) -} - -// Close implements glue.Session. -func (gs *tidbSession) Close() { - gs.se.Close() -} - -// showCreateTable shows the result of SHOW CREATE TABLE from a TableInfo. -func (gs *tidbSession) showCreateTable(tbl *model.TableInfo) (string, error) { - table := tbl.Clone() - table.AutoIncID = 0 - result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateTable)) - // this can never fail. - _, _ = result.WriteString(brComment) - if err := executor.ConstructResultOfShowCreateTable(gs.se, tbl, autoid.Allocators{}, result); err != nil { - return "", errors.Trace(err) - } - return result.String(), nil -} - -// showCreateDatabase shows the result of SHOW CREATE DATABASE from a dbInfo. -func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) { - result := bytes.NewBuffer(make([]byte, 0, defaultCapOfCreateDatabase)) - // this can never fail. - _, _ = result.WriteString(brComment) - if err := executor.ConstructResultOfShowCreateDatabase(gs.se, db, true, result); err != nil { - return "", errors.Trace(err) - } - return result.String(), nil -} diff --git a/br/pkg/gluetikv/glue.go b/br/pkg/gluetikv/glue.go index 99b1b4a6..2299ecf5 100644 --- a/br/pkg/gluetikv/glue.go +++ b/br/pkg/gluetikv/glue.go @@ -6,7 +6,6 @@ import ( "context" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/driver" "github.com/tikv/migration/br/pkg/glue" @@ -19,16 +18,6 @@ import ( // Glue is an implementation of glue.Glue that accesses only TiKV without TiDB. type Glue struct{} -// GetDomain implements glue.Glue. -func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) { - return nil, nil -} - -// CreateSession implements glue.Glue. -func (Glue) CreateSession(store kv.Storage) (glue.Session, error) { - return nil, nil -} - // Open implements glue.Glue. func (Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) { if option.CAPath != "" { diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 580dffec..e4f571d7 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -7,7 +7,6 @@ import ( "context" "crypto/tls" "encoding/hex" - "encoding/json" "fmt" "strconv" "strings" @@ -19,10 +18,8 @@ import ( "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" - "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "github.com/tikv/client-go/v2/oracle" @@ -55,19 +52,8 @@ type Client struct { tlsConf *tls.Config keepaliveConf keepalive.ClientParameters - databases map[string]*utils.Database - ddlJobs []*model.Job backupMeta *backuppb.BackupMeta - // TODO Remove this field or replace it with a []*DB, - // since https://github.com/pingcap/br/pull/377 needs more DBs to speed up DDL execution. - // And for now, we must inject a pool of DBs to `Client.GoCreateTables`, otherwise there would be a race condition. - // This is dirty: why we need DBs from different sources? - // By replace it with a []*DB, we can remove the dirty parameter of `Client.GoCreateTable`, - // along with them in some private functions. - // Before you do it, you can firstly read discussions at - // https://github.com/pingcap/br/pull/377#discussion_r446594501, - // this probably isn't as easy as it seems like (however, not hard, too :D) - db *DB + rateLimit uint64 isOnline bool hasSpeedLimited bool @@ -79,12 +65,6 @@ type Client struct { backend *backuppb.StorageBackend switchModeInterval time.Duration switchCh chan struct{} - - // statHandler and dom are used for analyze table after restore. - // it will backup stats with #dump.DumpStatsToJSON - // and restore stats with #dump.LoadStatsFromJSON - statsHandler *handle.Handle - dom *domain.Domain } // NewRestoreClient returns a new RestoreClient. @@ -95,30 +75,12 @@ func NewRestoreClient( tlsConf *tls.Config, keepaliveConf keepalive.ClientParameters, ) (*Client, error) { - db, err := NewDB(g, store) - if err != nil { - return nil, errors.Trace(err) - } - dom, err := g.GetDomain(store) - if err != nil { - return nil, errors.Trace(err) - } - - var statsHandle *handle.Handle - // tikv.Glue will return nil, tidb.Glue will return available domain - if dom != nil { - statsHandle = dom.StatsHandle() - } - return &Client{ pdClient: pdClient, toolClient: NewSplitClient(pdClient, tlsConf), - db: db, tlsConf: tlsConf, keepaliveConf: keepaliveConf, switchCh: make(chan struct{}), - dom: dom, - statsHandler: statsHandle, }, nil } @@ -159,10 +121,6 @@ func (rc *Client) SetSwitchModeInterval(interval time.Duration) { // Close a client. func (rc *Client) Close() { - // rc.db can be nil in raw kv mode. - if rc.db != nil { - rc.db.Close() - } log.Info("Restore client closed") } @@ -174,28 +132,9 @@ func (rc *Client) InitBackupMeta( externalStorage storage.ExternalStorage, reader *metautil.MetaReader) error { if !backupMeta.IsRawKv { - databases, err := utils.LoadBackupTables(c, reader) - if err != nil { - return errors.Trace(err) - } - rc.databases = databases - - var ddlJobs []*model.Job - // ddls is the bytes of json.Marshal - ddls, err := reader.ReadDDLs(c) - if err != nil { - return errors.Trace(err) - } - if len(ddls) != 0 { - err = json.Unmarshal(ddls, &ddlJobs) - if err != nil { - return errors.Trace(err) - } - } - rc.ddlJobs = ddlJobs + return errors.Errorf("backup meta for non-rawkv is unsupported") } rc.backupMeta = backupMeta - log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs))) metaClient := NewSplitClient(rc.pdClient, rc.tlsConf) importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf) diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go deleted file mode 100644 index 594cc75b..00000000 --- a/br/pkg/restore/db.go +++ /dev/null @@ -1,292 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package restore - -import ( - "context" - "fmt" - "sort" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/tikv/migration/br/pkg/glue" - "github.com/tikv/migration/br/pkg/metautil" - "github.com/tikv/migration/br/pkg/utils" - "go.uber.org/zap" -) - -// DB is a TiDB instance, not thread-safe. -type DB struct { - se glue.Session -} - -type UniqueTableName struct { - DB string - Table string -} - -// NewDB returns a new DB. -func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { - se, err := g.CreateSession(store) - if err != nil { - return nil, errors.Trace(err) - } - // The session may be nil in raw kv mode - if se == nil { - return nil, nil - } - // Set SQL mode to None for avoiding SQL compatibility problem - err = se.Execute(context.Background(), "set @@sql_mode=''") - if err != nil { - return nil, errors.Trace(err) - } - return &DB{ - se: se, - }, nil -} - -// ExecDDL executes the query of a ddl job. -func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { - var err error - tableInfo := ddlJob.BinlogInfo.TableInfo - dbInfo := ddlJob.BinlogInfo.DBInfo - switch ddlJob.Type { - case model.ActionCreateSchema: - err = db.se.CreateDatabase(ctx, dbInfo) - if err != nil { - log.Error("create database failed", zap.Stringer("db", dbInfo.Name), zap.Error(err)) - } - return errors.Trace(err) - case model.ActionCreateTable: - err = db.se.CreateTable(ctx, model.NewCIStr(ddlJob.SchemaName), tableInfo) - if err != nil { - log.Error("create table failed", - zap.Stringer("db", dbInfo.Name), - zap.Stringer("table", tableInfo.Name), - zap.Error(err)) - } - return errors.Trace(err) - } - - if tableInfo != nil { - switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName)) - err = db.se.Execute(ctx, switchDBSQL) - if err != nil { - log.Error("switch db failed", - zap.String("query", switchDBSQL), - zap.String("db", ddlJob.SchemaName), - zap.Error(err)) - return errors.Trace(err) - } - } - err = db.se.Execute(ctx, ddlJob.Query) - if err != nil { - log.Error("execute ddl query failed", - zap.String("query", ddlJob.Query), - zap.String("db", ddlJob.SchemaName), - zap.Int64("historySchemaVersion", ddlJob.BinlogInfo.SchemaVersion), - zap.Error(err)) - } - return errors.Trace(err) -} - -// UpdateStatsMeta update count and snapshot ts in mysql.stats_meta -func (db *DB) UpdateStatsMeta(ctx context.Context, tableID int64, restoreTS uint64, count uint64) error { - sysDB := mysql.SystemDB - statsMetaTbl := "stats_meta" - - // set restoreTS to snapshot and version which is used to update stats_meta - err := db.se.ExecuteInternal( - ctx, - "update %n.%n set snapshot = %?, version = %?, count = %? where table_id = %?", - sysDB, - statsMetaTbl, - restoreTS, - restoreTS, - count, - tableID, - ) - if err != nil { - log.Error("execute update sql failed", zap.Error(err)) - } - return nil -} - -// CreateDatabase executes a CREATE DATABASE SQL. -func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { - err := db.se.CreateDatabase(ctx, schema) - if err != nil { - log.Error("create database failed", zap.Stringer("db", schema.Name), zap.Error(err)) - } - return errors.Trace(err) -} - -// CreateTable executes a CREATE TABLE SQL. -func (db *DB) CreateTable(ctx context.Context, table *metautil.Table, ddlTables map[UniqueTableName]bool) error { - err := db.se.CreateTable(ctx, table.DB.Name, table.Info) - if err != nil { - log.Error("create table failed", - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } - - var restoreMetaSQL string - switch { - case table.Info.IsView(): - return nil - case table.Info.IsSequence(): - setValFormat := fmt.Sprintf("do setval(%s.%s, %%d);", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O)) - if table.Info.Sequence.Cycle { - increment := table.Info.Sequence.Increment - // TiDB sequence's behaviour is designed to keep the same pace - // among all nodes within the same cluster. so we need restore round. - // Here is a hack way to trigger sequence cycle round > 0 according to - // https://github.com/pingcap/br/pull/242#issuecomment-631307978 - // TODO use sql to set cycle round - nextSeqSQL := fmt.Sprintf("do nextval(%s.%s);", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O)) - var setValSQL string - if increment < 0 { - setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue) - } else { - setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MaxValue) - } - err = db.se.Execute(ctx, setValSQL) - if err != nil { - log.Error("restore meta sql failed", - zap.String("query", setValSQL), - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } - - // trigger cycle round > 0 - err = db.se.Execute(ctx, nextSeqSQL) - if err != nil { - log.Error("restore meta sql failed", - zap.String("query", nextSeqSQL), - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } - } - restoreMetaSQL = fmt.Sprintf(setValFormat, table.Info.AutoIncID) - err = db.se.Execute(ctx, restoreMetaSQL) - if err != nil { - log.Error("restore meta sql failed", - zap.String("query", restoreMetaSQL), - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } - // only table exists in ddlJobs during incremental restoration should do alter after creation. - case ddlTables[UniqueTableName{table.DB.Name.String(), table.Info.Name.String()}]: - if utils.NeedAutoID(table.Info) { - restoreMetaSQL = fmt.Sprintf( - "alter table %s.%s auto_increment = %d;", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O), - table.Info.AutoIncID) - } else if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() { - restoreMetaSQL = fmt.Sprintf( - "alter table %s.%s auto_random_base = %d", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O), - table.Info.AutoRandID) - } else { - log.Info("table exists in incremental ddl jobs, but don't need to be altered", - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name)) - return nil - } - err = db.se.Execute(ctx, restoreMetaSQL) - if err != nil { - log.Error("restore meta sql failed", - zap.String("query", restoreMetaSQL), - zap.Stringer("db", table.DB.Name), - zap.Stringer("table", table.Info.Name), - zap.Error(err)) - return errors.Trace(err) - } - } - return errors.Trace(err) -} - -// Close closes the connection. -func (db *DB) Close() { - db.se.Close() -} - -// FilterDDLJobs filters ddl jobs. -func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs []*model.Job) { - // Sort the ddl jobs by schema version in descending order. - sort.Slice(allDDLJobs, func(i, j int) bool { - return allDDLJobs[i].BinlogInfo.SchemaVersion > allDDLJobs[j].BinlogInfo.SchemaVersion - }) - dbs := getDatabases(tables) - for _, db := range dbs { - // These maps is for solving some corner case. - // e.g. let "t=2" indicates that the id of database "t" is 2, if the ddl execution sequence is: - // rename "a" to "b"(a=1) -> drop "b"(b=1) -> create "b"(b=2) -> rename "b" to "a"(a=2) - // Which we cannot find the "create" DDL by name and id directly. - // To cover †his case, we must find all names and ids the database/table ever had. - dbIDs := make(map[int64]bool) - dbIDs[db.ID] = true - dbNames := make(map[string]bool) - dbNames[db.Name.String()] = true - for _, job := range allDDLJobs { - if job.BinlogInfo.DBInfo != nil { - if dbIDs[job.SchemaID] || dbNames[job.BinlogInfo.DBInfo.Name.String()] { - ddlJobs = append(ddlJobs, job) - // The the jobs executed with the old id, like the step 2 in the example above. - dbIDs[job.SchemaID] = true - // For the jobs executed after rename, like the step 3 in the example above. - dbNames[job.BinlogInfo.DBInfo.Name.String()] = true - } - } - } - } - - for _, table := range tables { - tableIDs := make(map[int64]bool) - tableIDs[table.Info.ID] = true - tableNames := make(map[UniqueTableName]bool) - name := UniqueTableName{table.DB.Name.String(), table.Info.Name.String()} - tableNames[name] = true - for _, job := range allDDLJobs { - if job.BinlogInfo.TableInfo != nil { - name = UniqueTableName{job.SchemaName, job.BinlogInfo.TableInfo.Name.String()} - if tableIDs[job.TableID] || tableNames[name] { - ddlJobs = append(ddlJobs, job) - tableIDs[job.TableID] = true - // For truncate table, the id may be changed - tableIDs[job.BinlogInfo.TableInfo.ID] = true - tableNames[name] = true - } - } - } - } - return ddlJobs -} - -func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) { - dbIDs := make(map[int64]bool) - for _, table := range tables { - if !dbIDs[table.DB.ID] { - dbs = append(dbs, table.DB) - dbIDs[table.DB.ID] = true - } - } - return -} diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go deleted file mode 100644 index babfc16c..00000000 --- a/br/pkg/restore/db_test.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package restore_test - -import ( - "context" - "math" - "strconv" - "testing" - - "github.com/pingcap/tidb/meta/autoid" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/testkit" - "github.com/stretchr/testify/require" - "github.com/tikv/migration/br/pkg/gluetidb" - "github.com/tikv/migration/br/pkg/metautil" - "github.com/tikv/migration/br/pkg/mock" - "github.com/tikv/migration/br/pkg/restore" - "github.com/tikv/migration/br/pkg/storage" -) - -type testRestoreSchemaSuite struct { - mock *mock.Cluster - storage storage.ExternalStorage -} - -func createRestoreSchemaSuite(t *testing.T) (s *testRestoreSchemaSuite, clean func()) { - var err error - s = new(testRestoreSchemaSuite) - s.mock, err = mock.NewCluster() - require.NoError(t, err) - base := t.TempDir() - s.storage, err = storage.NewLocalStorage(base) - require.NoError(t, err) - require.NoError(t, s.mock.Start()) - clean = func() { - s.mock.Stop() - } - return -} - -func TestRestoreAutoIncID(t *testing.T) { - s, clean := createRestoreSchemaSuite(t) - defer clean() - tk := testkit.NewTestKit(t, s.mock.Storage) - tk.MustExec("use test") - tk.MustExec("set @@sql_mode=''") - tk.MustExec("drop table if exists `\"t\"`;") - // Test SQL Mode - tk.MustExec("create table `\"t\"` (" + - "a int not null," + - "time timestamp not null default '0000-00-00 00:00:00');", - ) - tk.MustExec("insert into `\"t\"` values (10, '0000-00-00 00:00:00');") - // Query the current AutoIncID - autoIncID, err := strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) - require.NoErrorf(t, err, "Error query auto inc id: %s", err) - // Get schemas of db and table - info, err := s.mock.Domain.GetSnapshotInfoSchema(math.MaxUint64) - require.NoErrorf(t, err, "Error get snapshot info schema: %s", err) - dbInfo, exists := info.SchemaByName(model.NewCIStr("test")) - require.Truef(t, exists, "Error get db info") - tableInfo, err := info.TableByName(model.NewCIStr("test"), model.NewCIStr("\"t\"")) - require.NoErrorf(t, err, "Error get table info: %s", err) - table := metautil.Table{ - Info: tableInfo.Meta(), - DB: dbInfo, - } - // Get the next AutoIncID - idAlloc := autoid.NewAllocator(s.mock.Storage, dbInfo.ID, table.Info.ID, false, autoid.RowIDAllocType) - globalAutoID, err := idAlloc.NextGlobalAutoID() - require.NoErrorf(t, err, "Error allocate next auto id") - require.Equal(t, uint64(globalAutoID), autoIncID) - // Alter AutoIncID to the next AutoIncID + 100 - table.Info.AutoIncID = globalAutoID + 100 - db, err := restore.NewDB(gluetidb.New(), s.mock.Storage) - require.NoErrorf(t, err, "Error create DB") - tk.MustExec("drop database if exists test;") - // Test empty collate value - table.DB.Charset = "utf8mb4" - table.DB.Collate = "" - err = db.CreateDatabase(context.Background(), table.DB) - require.NoErrorf(t, err, "Error create empty collate db: %s %s", err, s.mock.DSN) - tk.MustExec("drop database if exists test;") - // Test empty charset value - table.DB.Charset = "" - table.DB.Collate = "utf8mb4_bin" - err = db.CreateDatabase(context.Background(), table.DB) - require.NoErrorf(t, err, "Error create empty charset db: %s %s", err, s.mock.DSN) - uniqueMap := make(map[restore.UniqueTableName]bool) - err = db.CreateTable(context.Background(), &table, uniqueMap) - require.NoErrorf(t, err, "Error create table: %s %s", err, s.mock.DSN) - - tk.MustExec("use test") - autoIncID, err = strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) - require.NoErrorf(t, err, "Error query auto inc id: %s", err) - // Check if AutoIncID is altered successfully. - require.Equal(t, uint64(globalAutoID+100), autoIncID) - - // try again, failed due to table exists. - table.Info.AutoIncID = globalAutoID + 200 - err = db.CreateTable(context.Background(), &table, uniqueMap) - require.NoErrorf(t, err, "Got unexpected error when create table: %v", err) - // Check if AutoIncID is not altered. - autoIncID, err = strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) - require.NoErrorf(t, err, "Error query auto inc id: %s", err) - require.Equal(t, uint64(globalAutoID+100), autoIncID) - - // try again, success because we use alter sql in unique map. - table.Info.AutoIncID = globalAutoID + 300 - uniqueMap[restore.UniqueTableName{"test", "\"t\""}] = true - err = db.CreateTable(context.Background(), &table, uniqueMap) - require.NoErrorf(t, err, "Error create table: %s", err) - // Check if AutoIncID is altered to globalAutoID + 300. - autoIncID, err = strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) - require.NoErrorf(t, err, "Error query auto inc id: %s", err) - require.Equal(t, uint64(globalAutoID+300), autoIncID) - -} diff --git a/br/pkg/restore/systable_restore.go b/br/pkg/restore/systable_restore.go deleted file mode 100644 index 9205be76..00000000 --- a/br/pkg/restore/systable_restore.go +++ /dev/null @@ -1,217 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package restore - -import ( - "context" - "fmt" - - "github.com/pingcap/errors" - "github.com/pingcap/log" - filter "github.com/pingcap/tidb-tools/pkg/table-filter" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - berrors "github.com/tikv/migration/br/pkg/errors" - "github.com/tikv/migration/br/pkg/logutil" - "github.com/tikv/migration/br/pkg/utils" - "go.uber.org/multierr" - "go.uber.org/zap" -) - -var statsTables = map[string]struct{}{ - "stats_buckets": {}, - "stats_extended": {}, - "stats_feedback": {}, - "stats_fm_sketch": {}, - "stats_histograms": {}, - "stats_meta": {}, - "stats_top_n": {}, -} - -var unRecoverableTable = map[string]struct{}{ - // some variables in tidb (e.g. gc_safe_point) cannot be recovered. - "tidb": {}, - "global_variables": {}, - - // all user related tables cannot be recovered for now. - "column_stats_usage": {}, - "columns_priv": {}, - "db": {}, - "default_roles": {}, - "global_grants": {}, - "global_priv": {}, - "role_edges": {}, - "tables_priv": {}, - "user": {}, - "capture_plan_baselines_blacklist": {}, - // gc info don't need to recover. - "gc_delete_range": {}, - "gc_delete_range_done": {}, - - // schema_index_usage has table id need to be rewrite. - "schema_index_usage": {}, -} - -func isUnrecoverableTable(tableName string) bool { - _, ok := unRecoverableTable[tableName] - return ok -} - -func isStatsTable(tableName string) bool { - _, ok := statsTables[tableName] - return ok -} - -// RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema). -// Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254. -func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) { - sysDB := mysql.SystemDB - - temporaryDB := utils.TemporaryDBName(sysDB) - defer rc.cleanTemporaryDatabase(ctx, sysDB) - - if !f.MatchSchema(sysDB) { - log.Debug("system database filtered out", zap.String("database", sysDB)) - return - } - originDatabase, ok := rc.databases[temporaryDB.O] - if !ok { - log.Info("system database not backed up, skipping", zap.String("database", sysDB)) - return - } - db, ok := rc.getDatabaseByName(sysDB) - if !ok { - // Or should we create the database here? - log.Warn("target database not exist, aborting", zap.String("database", sysDB)) - return - } - - tablesRestored := make([]string, 0, len(originDatabase.Tables)) - for _, table := range originDatabase.Tables { - tableName := table.Info.Name - if f.MatchTable(sysDB, tableName.O) { - if err := rc.replaceTemporaryTableToSystable(ctx, tableName.L, db); err != nil { - log.Warn("error during merging temporary tables into system tables", - logutil.ShortError(err), - zap.Stringer("table", tableName), - ) - } - tablesRestored = append(tablesRestored, tableName.L) - } - } - if err := rc.afterSystemTablesReplaced(tablesRestored); err != nil { - for _, e := range multierr.Errors(err) { - log.Warn("error during reconfigurating the system tables", zap.String("database", sysDB), logutil.ShortError(e)) - } - } -} - -// database is a record of a database. -// For fast querying whether a table exists and the temporary database of it. -type database struct { - ExistingTables map[string]*model.TableInfo - Name model.CIStr - TemporaryName model.CIStr -} - -// getDatabaseByName make a record of a database from info schema by its name. -func (rc *Client) getDatabaseByName(name string) (*database, bool) { - infoSchema := rc.dom.InfoSchema() - schema, ok := infoSchema.SchemaByName(model.NewCIStr(name)) - if !ok { - return nil, false - } - db := &database{ - ExistingTables: map[string]*model.TableInfo{}, - Name: model.NewCIStr(name), - TemporaryName: utils.TemporaryDBName(name), - } - for _, t := range schema.Tables { - db.ExistingTables[t.Name.L] = t - } - return db, true -} - -// afterSystemTablesReplaced do some extra work for special system tables. -// e.g. after inserting to the table mysql.user, we must execute `FLUSH PRIVILEGES` to allow it take effect. -func (rc *Client) afterSystemTablesReplaced(tables []string) error { - var err error - for _, table := range tables { - switch { - case table == "user": - // We cannot execute `rc.dom.NotifyUpdatePrivilege` here, because there isn't - // sessionctx.Context provided by the glue. - // TODO: update the glue type and allow we retrieve a session context from it. - err = multierr.Append(err, errors.Annotatef(berrors.ErrUnsupportedSystemTable, - "restored user info may not take effect, until you should execute `FLUSH PRIVILEGES` manually")) - } - } - return err -} - -// replaceTemporaryTableToSystable replaces the temporary table to real system table. -func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, tableName string, db *database) error { - execSQL := func(sql string) error { - // SQLs here only contain table name and database name, seems it is no need to redact them. - if err := rc.db.se.Execute(ctx, sql); err != nil { - log.Warn("failed to execute SQL restore system database", - zap.String("table", tableName), - zap.Stringer("database", db.Name), - zap.String("sql", sql), - zap.Error(err), - ) - return berrors.ErrUnknown.Wrap(err).GenWithStack("failed to execute %s", sql) - } - log.Info("successfully restore system database", - zap.String("table", tableName), - zap.Stringer("database", db.Name), - zap.String("sql", sql), - ) - return nil - } - - // The newly created tables have different table IDs with original tables, - // hence the old statistics are invalid. - // - // TODO: - // 1 ) Rewrite the table IDs via `UPDATE _temporary_mysql.stats_xxx SET table_id = new_table_id WHERE table_id = old_table_id` - // BEFORE replacing into and then execute `rc.statsHandler.Update(rc.dom.InfoSchema())`. - // 1.5 ) (Optional) The UPDATE statement sometimes costs, the whole system tables restore step can be place into the restore pipeline. - // 2 ) Deprecate the origin interface for backing up statistics. - if isStatsTable(tableName) { - return berrors.ErrUnsupportedSystemTable.GenWithStack("restoring stats via `mysql` schema isn't support yet: " + - "the table ID is out-of-date and may corrupt existing statistics") - } - - if isUnrecoverableTable(tableName) { - return berrors.ErrUnsupportedSystemTable.GenWithStack("restoring unsupported `mysql` schema table") - } - - if db.ExistingTables[tableName] != nil { - log.Info("table existing, using replace into for restore", - zap.String("table", tableName), - zap.Stringer("schema", db.Name)) - replaceIntoSQL := fmt.Sprintf("REPLACE INTO %s SELECT * FROM %s;", - utils.EncloseDBAndTable(db.Name.L, tableName), - utils.EncloseDBAndTable(db.TemporaryName.L, tableName)) - return execSQL(replaceIntoSQL) - } - - renameSQL := fmt.Sprintf("RENAME TABLE %s TO %s;", - utils.EncloseDBAndTable(db.TemporaryName.L, tableName), - utils.EncloseDBAndTable(db.Name.L, tableName), - ) - return execSQL(renameSQL) -} - -func (rc *Client) cleanTemporaryDatabase(ctx context.Context, originDB string) { - database := utils.TemporaryDBName(originDB) - log.Debug("dropping temporary database", zap.Stringer("database", database)) - sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", utils.EncloseName(database.L)) - if err := rc.db.se.Execute(ctx, sql); err != nil { - logutil.WarnTerm("failed to drop temporary database, it should be dropped manually", - zap.Stringer("database", database), - logutil.ShortError(err), - ) - } -} diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index 9bccb3a8..5380de92 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -137,131 +137,6 @@ func GetSSTMetaFromFile( } } -// MakeDBPool makes a session pool with specficated size by sessionFactory. -func MakeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) { - dbPool := make([]*DB, 0, size) - for i := uint(0); i < size; i++ { - db, e := dbFactory() - if e != nil { - return dbPool, e - } - dbPool = append(dbPool, db) - } - return dbPool, nil -} - -// EstimateRangeSize estimates the total range count by file. -func EstimateRangeSize(files []*backuppb.File) int { - result := 0 - for _, f := range files { - if strings.HasSuffix(f.GetName(), "_write.sst") { - result++ - } - } - return result -} - -// MapTableToFiles makes a map that mapping table ID to its backup files. -// aware that one file can and only can hold one table. -func MapTableToFiles(files []*backuppb.File) map[int64][]*backuppb.File { - result := map[int64][]*backuppb.File{} - for _, file := range files { - tableID := tablecodec.DecodeTableID(file.GetStartKey()) - tableEndID := tablecodec.DecodeTableID(file.GetEndKey()) - if tableID != tableEndID { - log.Panic("key range spread between many files.", - zap.String("file name", file.Name), - logutil.Key("startKey", file.StartKey), - logutil.Key("endKey", file.EndKey)) - } - if tableID == 0 { - log.Panic("invalid table key of file", - zap.String("file name", file.Name), - logutil.Key("startKey", file.StartKey), - logutil.Key("endKey", file.EndKey)) - } - result[tableID] = append(result[tableID], file) - } - return result -} - -// GoValidateFileRanges validate files by a stream of tables and yields -// tables with range. -func GoValidateFileRanges( - ctx context.Context, - tableStream <-chan CreatedTable, - fileOfTable map[int64][]*backuppb.File, - splitSizeBytes, splitKeyCount uint64, - errCh chan<- error, -) <-chan TableWithRange { - // Could we have a smaller outCh size? - outCh := make(chan TableWithRange, len(fileOfTable)) - go func() { - defer close(outCh) - defer log.Info("all range generated") - for { - select { - case <-ctx.Done(): - errCh <- ctx.Err() - return - case t, ok := <-tableStream: - if !ok { - return - } - files := fileOfTable[t.OldTable.Info.ID] - if partitions := t.OldTable.Info.Partition; partitions != nil { - log.Debug("table partition", - zap.Stringer("database", t.OldTable.DB.Name), - zap.Stringer("table", t.Table.Name), - zap.Any("partition info", partitions), - ) - for _, partition := range partitions.Definitions { - files = append(files, fileOfTable[partition.ID]...) - } - } - for _, file := range files { - err := ValidateFileRewriteRule(file, t.RewriteRule) - if err != nil { - errCh <- err - return - } - } - // Merge small ranges to reduce split and scatter regions. - ranges, stat, err := MergeFileRanges( - files, splitSizeBytes, splitKeyCount) - if err != nil { - errCh <- err - return - } - log.Info("merge and validate file", - zap.Stringer("database", t.OldTable.DB.Name), - zap.Stringer("table", t.Table.Name), - zap.Int("Files(total)", stat.TotalFiles), - zap.Int("File(write)", stat.TotalWriteCFFile), - zap.Int("File(default)", stat.TotalDefaultCFFile), - zap.Int("Region(total)", stat.TotalRegions), - zap.Int("Regoin(keys avg)", stat.RegionKeysAvg), - zap.Int("Region(bytes avg)", stat.RegionBytesAvg), - zap.Int("Merged(regions)", stat.MergedRegions), - zap.Int("Merged(keys avg)", stat.MergedRegionKeysAvg), - zap.Int("Merged(bytes avg)", stat.MergedRegionBytesAvg)) - - tableWithRange := TableWithRange{ - CreatedTable: t, - Range: ranges, - } - log.Debug("sending range info", - zap.Stringer("table", t.Table.Name), - zap.Int("files", len(files)), - zap.Int("range size", len(ranges)), - zap.Int("output channel size", len(outCh))) - outCh <- tableWithRange - } - } - }() - return outCh -} - // ValidateFileRewriteRule uses rewrite rules to validate the ranges of a file. func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) error { // Check if the start key has a matched rewrite key diff --git a/br/pkg/restore/util_test.go b/br/pkg/restore/util_test.go index 7edda201..7a2e2e83 100644 --- a/br/pkg/restore/util_test.go +++ b/br/pkg/restore/util_test.go @@ -53,43 +53,6 @@ func TestGetSSTMetaFromFile(t *testing.T) { require.Equal(t, "t2\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff", string(sstMeta.GetRange().GetEnd())) } -func TestMapTableToFiles(t *testing.T) { - filesOfTable1 := []*backuppb.File{ - { - Name: "table1-1.sst", - StartKey: tablecodec.EncodeTablePrefix(1), - EndKey: tablecodec.EncodeTablePrefix(1), - }, - { - Name: "table1-2.sst", - StartKey: tablecodec.EncodeTablePrefix(1), - EndKey: tablecodec.EncodeTablePrefix(1), - }, - { - Name: "table1-3.sst", - StartKey: tablecodec.EncodeTablePrefix(1), - EndKey: tablecodec.EncodeTablePrefix(1), - }, - } - filesOfTable2 := []*backuppb.File{ - { - Name: "table2-1.sst", - StartKey: tablecodec.EncodeTablePrefix(2), - EndKey: tablecodec.EncodeTablePrefix(2), - }, - { - Name: "table2-2.sst", - StartKey: tablecodec.EncodeTablePrefix(2), - EndKey: tablecodec.EncodeTablePrefix(2), - }, - } - - result := restore.MapTableToFiles(append(filesOfTable2, filesOfTable1...)) - - require.Equal(t, filesOfTable1, result[1]) - require.Equal(t, filesOfTable2, result[2]) -} - func TestValidateFileRewriteRule(t *testing.T) { rules := &restore.RewriteRules{ Data: []*import_sstpb.RewriteRule{{ diff --git a/br/pkg/task/backup_raw.go b/br/pkg/task/backup_raw.go index 4439ee1d..9502e39c 100644 --- a/br/pkg/task/backup_raw.go +++ b/br/pkg/task/backup_raw.go @@ -135,9 +135,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf if err != nil { return errors.Trace(err) } - // Backup raw does not need domain. - needDomain := false - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 2213bfc6..7a3956ee 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -29,7 +29,6 @@ import ( "github.com/tikv/migration/br/pkg/glue" "github.com/tikv/migration/br/pkg/metautil" "github.com/tikv/migration/br/pkg/storage" - "github.com/tikv/migration/br/pkg/utils" pd "github.com/tikv/pd/client" "go.etcd.io/etcd/pkg/transport" "go.uber.org/zap" @@ -52,16 +51,11 @@ const ( // flagKey is the name of TLS key flag. flagKey = "key" - flagDatabase = "db" - flagTable = "table" - flagChecksumConcurrency = "checksum-concurrency" flagRateLimit = "ratelimit" flagRateLimitUnit = "ratelimit-unit" flagConcurrency = "concurrency" flagChecksum = "checksum" - flagFilter = "filter" - flagCaseSensitive = "case-sensitive" flagRemoveTiFlash = "remove-tiflash" flagCheckRequirement = "check-requirements" flagSwitchModeInterval = "switch-mode-interval" @@ -155,12 +149,7 @@ type Config struct { // should be removed after TiDB upgrades the BR dependency. Filter filter.MySQLReplicationRules - TableFilter filter.Filter `json:"-" toml:"-"` SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"` - // Schemas is a database name set, to check whether the restore database has been backup - Schemas map[string]struct{} - // Tables is a table name set, to check whether the restore table has been backup - Tables map[string]struct{} // GrpcKeepaliveTime is the interval of pinging the server. GRPCKeepaliveTime time.Duration `json:"grpc-keepalive-time" toml:"grpc-keepalive-time"` @@ -226,26 +215,6 @@ func DefineCommonFlags(flags *pflag.FlagSet) { storage.DefineFlags(flags) } -// DefineDatabaseFlags defines the required --db flag for `db` subcommand. -func DefineDatabaseFlags(command *cobra.Command) { - command.Flags().String(flagDatabase, "", "database name") - _ = command.MarkFlagRequired(flagDatabase) -} - -// DefineTableFlags defines the required --db and --table flags for `table` subcommand. -func DefineTableFlags(command *cobra.Command) { - DefineDatabaseFlags(command) - command.Flags().StringP(flagTable, "t", "", "table name") - _ = command.MarkFlagRequired(flagTable) -} - -// DefineFilterFlags defines the --filter and --case-sensitive flags for `full` subcommand. -func DefineFilterFlags(command *cobra.Command, defaultFilter []string) { - flags := command.Flags() - flags.StringArrayP(flagFilter, "f", defaultFilter, "select tables to process") - flags.Bool(flagCaseSensitive, false, "whether the table names used in --filter should be case-sensitive") -} - // ParseTLSTripleFromFlags parses the (ca, cert, key) triple from flags. func ParseTLSTripleFromFlags(flags *pflag.FlagSet) (ca, cert, key string, err error) { ca, err = flags.GetString(flagCA) @@ -403,45 +372,6 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { } cfg.RateLimit = rateLimit * rateLimitUnit - cfg.Schemas = make(map[string]struct{}) - cfg.Tables = make(map[string]struct{}) - var caseSensitive bool - if filterFlag := flags.Lookup(flagFilter); filterFlag != nil { - var f filter.Filter - f, err = filter.Parse(filterFlag.Value.(pflag.SliceValue).GetSlice()) - if err != nil { - return errors.Trace(err) - } - cfg.TableFilter = f - caseSensitive, err = flags.GetBool(flagCaseSensitive) - if err != nil { - return errors.Trace(err) - } - } else if dbFlag := flags.Lookup(flagDatabase); dbFlag != nil { - db := dbFlag.Value.String() - if len(db) == 0 { - return errors.Annotate(berrors.ErrInvalidArgument, "empty database name is not allowed") - } - cfg.Schemas[utils.EncloseName(db)] = struct{}{} - if tblFlag := flags.Lookup(flagTable); tblFlag != nil { - tbl := tblFlag.Value.String() - if len(tbl) == 0 { - return errors.Annotate(berrors.ErrInvalidArgument, "empty table name is not allowed") - } - cfg.Tables[utils.EncloseDBAndTable(db, tbl)] = struct{}{} - cfg.TableFilter = filter.NewTablesFilter(filter.Table{ - Schema: db, - Name: tbl, - }) - } else { - cfg.TableFilter = filter.NewSchemasFilter(db) - } - } else { - cfg.TableFilter, _ = filter.Parse([]string{"*.*"}) - } - if !caseSensitive { - cfg.TableFilter = filter.CaseInsensitive(cfg.TableFilter) - } checkRequirements, err := flags.GetBool(flagCheckRequirement) if err != nil { return errors.Trace(err) @@ -502,7 +432,6 @@ func NewMgr(ctx context.Context, tlsConfig TLSConfig, keepalive keepalive.ClientParameters, checkRequirements bool, - needDomain bool, ) (*conn.Mgr, error) { var ( tlsConf *tls.Config @@ -527,7 +456,7 @@ func NewMgr(ctx context.Context, // Is it necessary to remove `StoreBehavior`? return conn.NewMgr( ctx, g, pdAddress, tlsConf, securityOption, keepalive, conn.SkipTiFlash, - checkRequirements, needDomain, + checkRequirements, ) } diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 481487a8..38ec6eeb 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -63,9 +63,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR ctx, cancel := context.WithCancel(c) defer cancel() - // Restore raw does not need domain. - needDomain := false - mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements, needDomain) + mgr, err := NewMgr(ctx, g, cfg.PD, cfg.TLS, GetKeepalive(&cfg.Config), cfg.CheckRequirements) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/utils/schema.go b/br/pkg/utils/schema.go index 57b69c6b..49087863 100644 --- a/br/pkg/utils/schema.go +++ b/br/pkg/utils/schema.go @@ -3,89 +3,10 @@ package utils import ( - "context" "fmt" "strings" - - "github.com/pingcap/errors" - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/parser/mysql" - "github.com/tikv/migration/br/pkg/metautil" ) -// temporaryDBNamePrefix is the prefix name of system db, e.g. mysql system db will be rename to __TiDB_BR_Temporary_mysql -const temporaryDBNamePrefix = "__TiDB_BR_Temporary_" - -// NeedAutoID checks whether the table needs backing up with an autoid. -func NeedAutoID(tblInfo *model.TableInfo) bool { - hasRowID := !tblInfo.PKIsHandle && !tblInfo.IsCommonHandle - hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil - return hasRowID || hasAutoIncID -} - -// Database wraps the schema and tables of a database. -type Database struct { - Info *model.DBInfo - Tables []*metautil.Table -} - -// GetTable returns a table of the database by name. -func (db *Database) GetTable(name string) *metautil.Table { - for _, table := range db.Tables { - if table.Info.Name.String() == name { - return table - } - } - return nil -} - -// LoadBackupTables loads schemas from BackupMeta. -func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[string]*Database, error) { - ch := make(chan *metautil.Table) - errCh := make(chan error) - go func() { - if err := reader.ReadSchemasFiles(ctx, ch); err != nil { - errCh <- errors.Trace(err) - } - close(ch) - }() - - databases := make(map[string]*Database) - for { - select { - case <-ctx.Done(): - return nil, ctx.Err() - case err := <-errCh: - return nil, errors.Trace(err) - case table, ok := <-ch: - if !ok { - close(errCh) - return databases, nil - } - dbName := table.DB.Name.String() - db, ok := databases[dbName] - if !ok { - db = &Database{ - Info: table.DB, - Tables: make([]*metautil.Table, 0), - } - databases[dbName] = db - } - db.Tables = append(db.Tables, table) - } - } -} - -// ArchiveSize returns the total size of the backup archive. -func ArchiveSize(meta *backuppb.BackupMeta) uint64 { - total := uint64(meta.Size()) - for _, file := range meta.Files { - total += file.Size_ - } - return total -} - // EncloseName formats name in sql. func EncloseName(name string) string { return "`" + strings.ReplaceAll(name, "`", "``") + "`" @@ -95,22 +16,3 @@ func EncloseName(name string) string { func EncloseDBAndTable(database, table string) string { return fmt.Sprintf("%s.%s", EncloseName(database), EncloseName(table)) } - -// IsSysDB tests whether the database is system DB. -// Currently, the only system DB is mysql. -func IsSysDB(dbLowerName string) bool { - return dbLowerName == mysql.SystemDB -} - -// TemporaryDBName makes a 'private' database name. -func TemporaryDBName(db string) model.CIStr { - return model.NewCIStr(temporaryDBNamePrefix + db) -} - -// GetSysDBName get the original name of system DB -func GetSysDBName(tempDB model.CIStr) (string, bool) { - if ok := strings.HasPrefix(tempDB.O, temporaryDBNamePrefix); !ok { - return tempDB.O, false - } - return tempDB.O[len(temporaryDBNamePrefix):], true -} diff --git a/br/pkg/utils/schema_test.go b/br/pkg/utils/schema_test.go deleted file mode 100644 index b57389fa..00000000 --- a/br/pkg/utils/schema_test.go +++ /dev/null @@ -1,350 +0,0 @@ -// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. - -package utils - -import ( - "context" - "encoding/json" - "fmt" - "testing" - - "github.com/golang/protobuf/proto" - backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/kvproto/pkg/encryptionpb" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/statistics/handle" - "github.com/pingcap/tidb/tablecodec" - "github.com/stretchr/testify/require" - "github.com/tikv/migration/br/pkg/metautil" - "github.com/tikv/migration/br/pkg/storage" -) - -func mockBackupMeta(mockSchemas []*backuppb.Schema, mockFiles []*backuppb.File) *backuppb.BackupMeta { - return &backuppb.BackupMeta{ - Files: mockFiles, - Schemas: mockSchemas, - } -} - -func TestLoadBackupMeta(t *testing.T) { - testDir := t.TempDir() - store, err := storage.NewLocalStorage(testDir) - require.NoError(t, err) - - tblName := model.NewCIStr("t1") - dbName := model.NewCIStr("test") - tblID := int64(123) - mockTbl := &model.TableInfo{ - ID: tblID, - Name: tblName, - } - mockStats := handle.JSONTable{ - DatabaseName: dbName.String(), - TableName: tblName.String(), - } - mockDB := model.DBInfo{ - ID: 1, - Name: dbName, - Tables: []*model.TableInfo{ - mockTbl, - }, - } - dbBytes, err := json.Marshal(mockDB) - require.NoError(t, err) - tblBytes, err := json.Marshal(mockTbl) - require.NoError(t, err) - statsBytes, err := json.Marshal(mockStats) - require.NoError(t, err) - - mockSchemas := []*backuppb.Schema{ - { - Db: dbBytes, - Table: tblBytes, - Stats: statsBytes, - }, - } - - mockFiles := []*backuppb.File{ - // should include 1.sst - { - Name: "1.sst", - StartKey: tablecodec.EncodeRowKey(tblID, []byte("a")), - EndKey: tablecodec.EncodeRowKey(tblID+1, []byte("a")), - }, - // shouldn't include 2.sst - { - Name: "2.sst", - StartKey: tablecodec.EncodeRowKey(tblID-1, []byte("a")), - EndKey: tablecodec.EncodeRowKey(tblID, []byte("a")), - }, - } - - meta := mockBackupMeta(mockSchemas, mockFiles) - data, err := proto.Marshal(meta) - require.NoError(t, err) - - ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) - require.NoError(t, err) - - dbs, err := LoadBackupTables( - ctx, - metautil.NewMetaReader( - meta, - store, - &backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - }), - ) - tbl := dbs[dbName.String()].GetTable(tblName.String()) - require.NoError(t, err) - require.Len(t, tbl.Files, 1) - require.Equal(t, "1.sst", tbl.Files[0].Name) -} - -func TestLoadBackupMetaPartionTable(t *testing.T) { - testDir := t.TempDir() - store, err := storage.NewLocalStorage(testDir) - require.NoError(t, err) - - tblName := model.NewCIStr("t1") - dbName := model.NewCIStr("test") - tblID := int64(123) - partID1 := int64(124) - partID2 := int64(125) - mockTbl := &model.TableInfo{ - ID: tblID, - Name: tblName, - Partition: &model.PartitionInfo{ - Definitions: []model.PartitionDefinition{ - {ID: partID1}, - {ID: partID2}, - }, - }, - } - mockStats := handle.JSONTable{ - DatabaseName: dbName.String(), - TableName: tblName.String(), - } - mockDB := model.DBInfo{ - ID: 1, - Name: dbName, - Tables: []*model.TableInfo{ - mockTbl, - }, - } - dbBytes, err := json.Marshal(mockDB) - require.NoError(t, err) - tblBytes, err := json.Marshal(mockTbl) - require.NoError(t, err) - statsBytes, err := json.Marshal(mockStats) - require.NoError(t, err) - - mockSchemas := []*backuppb.Schema{ - { - Db: dbBytes, - Table: tblBytes, - Stats: statsBytes, - }, - } - - mockFiles := []*backuppb.File{ - // should include 1.sst - 3.sst - { - Name: "1.sst", - StartKey: tablecodec.EncodeRowKey(partID1, []byte("a")), - EndKey: tablecodec.EncodeRowKey(partID1, []byte("b")), - }, - { - Name: "2.sst", - StartKey: tablecodec.EncodeRowKey(partID1, []byte("b")), - EndKey: tablecodec.EncodeRowKey(partID2, []byte("a")), - }, - { - Name: "3.sst", - StartKey: tablecodec.EncodeRowKey(partID2, []byte("a")), - EndKey: tablecodec.EncodeRowKey(partID2+1, []byte("b")), - }, - // shouldn't include 4.sst - { - Name: "4.sst", - StartKey: tablecodec.EncodeRowKey(tblID-1, []byte("a")), - EndKey: tablecodec.EncodeRowKey(tblID, []byte("a")), - }, - } - - meta := mockBackupMeta(mockSchemas, mockFiles) - - data, err := proto.Marshal(meta) - require.NoError(t, err) - - ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) - require.NoError(t, err) - - dbs, err := LoadBackupTables( - ctx, - metautil.NewMetaReader( - meta, - store, - &backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - }, - ), - ) - tbl := dbs[dbName.String()].GetTable(tblName.String()) - require.NoError(t, err) - require.Len(t, tbl.Files, 3) - contains := func(name string) bool { - for i := range tbl.Files { - if tbl.Files[i].Name == name { - return true - } - } - return false - } - require.True(t, contains("1.sst")) - require.True(t, contains("2.sst")) - require.True(t, contains("3.sst")) -} - -func buildTableAndFiles(name string, tableID, fileCount int) (*model.TableInfo, []*backuppb.File) { - tblName := model.NewCIStr(name) - tblID := int64(tableID) - mockTbl := &model.TableInfo{ - ID: tblID, - Name: tblName, - } - - mockFiles := make([]*backuppb.File, 0, fileCount) - for i := 0; i < fileCount; i++ { - mockFiles = append(mockFiles, &backuppb.File{ - Name: fmt.Sprintf("%d-%d.sst", tableID, i), - StartKey: tablecodec.EncodeRowKey(tblID, []byte(fmt.Sprintf("%09d", i))), - EndKey: tablecodec.EncodeRowKey(tblID, []byte(fmt.Sprintf("%09d", i+1))), - }) - } - return mockTbl, mockFiles -} - -func buildBenchmarkBackupmeta(b *testing.B, dbName string, tableCount, fileCountPerTable int) *backuppb.BackupMeta { - mockFiles := make([]*backuppb.File, 0, tableCount*fileCountPerTable) - mockSchemas := make([]*backuppb.Schema, 0, tableCount) - for i := 1; i <= tableCount; i++ { - mockTbl, files := buildTableAndFiles(fmt.Sprintf("mock%d", i), i, fileCountPerTable) - mockFiles = append(mockFiles, files...) - - mockDB := model.DBInfo{ - ID: 1, - Name: model.NewCIStr(dbName), - Tables: []*model.TableInfo{ - mockTbl, - }, - } - dbBytes, err := json.Marshal(mockDB) - require.NoError(b, err) - tblBytes, err := json.Marshal(mockTbl) - require.NoError(b, err) - mockSchemas = append(mockSchemas, &backuppb.Schema{ - Db: dbBytes, - Table: tblBytes, - }) - } - return mockBackupMeta(mockSchemas, mockFiles) -} - -func BenchmarkLoadBackupMeta64(b *testing.B) { - testDir := b.TempDir() - store, err := storage.NewLocalStorage(testDir) - require.NoError(b, err) - - meta := buildBenchmarkBackupmeta(b, "bench", 64, 64) - b.ResetTimer() - for i := 0; i < b.N; i++ { - data, err := proto.Marshal(meta) - require.NoError(b, err) - - ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) - require.NoError(b, err) - - dbs, err := LoadBackupTables( - ctx, - metautil.NewMetaReader( - meta, - store, - &backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - }, - ), - ) - require.NoError(b, err) - require.Len(b, dbs, 1) - require.Contains(b, dbs, "bench") - require.Len(b, dbs["bench"].Tables, 64) - } -} - -func BenchmarkLoadBackupMeta1024(b *testing.B) { - testDir := b.TempDir() - store, err := storage.NewLocalStorage(testDir) - require.NoError(b, err) - - meta := buildBenchmarkBackupmeta(b, "bench", 1024, 64) - b.ResetTimer() - for i := 0; i < b.N; i++ { - data, err := proto.Marshal(meta) - require.NoError(b, err) - - ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) - require.NoError(b, err) - - dbs, err := LoadBackupTables( - ctx, - metautil.NewMetaReader( - meta, - store, - &backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - }, - ), - ) - require.NoError(b, err) - require.Len(b, dbs, 1) - require.Contains(b, dbs, "bench") - require.Len(b, dbs["bench"].Tables, 1024) - } -} - -func BenchmarkLoadBackupMeta10240(b *testing.B) { - testDir := b.TempDir() - store, err := storage.NewLocalStorage(testDir) - require.NoError(b, err) - - meta := buildBenchmarkBackupmeta(b, "bench", 10240, 64) - b.ResetTimer() - for i := 0; i < b.N; i++ { - data, err := proto.Marshal(meta) - require.NoError(b, err) - - ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) - require.NoError(b, err) - - dbs, err := LoadBackupTables( - ctx, - metautil.NewMetaReader( - meta, - store, - &backuppb.CipherInfo{ - CipherType: encryptionpb.EncryptionMethod_PLAINTEXT, - }, - ), - ) - require.NoError(b, err) - require.Len(b, dbs, 1) - require.Contains(b, dbs, "bench") - require.Len(b, dbs["bench"].Tables, 10240) - } -}