From b1ed365120742e378c1076c6cba94faad85bdd3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Fri, 30 Apr 2021 11:19:49 +0800 Subject: [PATCH] backup,restore: support backing up / restore system databases (#1048) --- cmd/br/backup.go | 2 +- cmd/br/cmd.go | 14 +++ cmd/br/restore.go | 4 +- errors.toml | 5 + pkg/backup/client.go | 2 +- pkg/backup/schema.go | 3 + pkg/backup/schema_test.go | 3 +- pkg/errors/errors.go | 1 + pkg/gluetidb/glue.go | 2 +- pkg/restore/systable_restore.go | 185 ++++++++++++++++++++++++++++++++ pkg/task/common.go | 4 +- pkg/task/restore.go | 4 + pkg/utils/schema.go | 18 ++++ tests/br_full_ddl/run.sh | 2 +- tests/br_full_index/run.sh | 4 +- tests/br_systables/run.sh | 53 +++++++++ tests/br_systables/workload | 12 +++ 17 files changed, 307 insertions(+), 11 deletions(-) create mode 100644 pkg/restore/systable_restore.go create mode 100644 tests/br_systables/run.sh create mode 100644 tests/br_systables/workload diff --git a/cmd/br/backup.go b/cmd/br/backup.go index 1f3626906..84bc32be1 100644 --- a/cmd/br/backup.go +++ b/cmd/br/backup.go @@ -109,7 +109,7 @@ func newFullBackupCommand() *cobra.Command { return runBackupCommand(command, "Full backup") }, } - task.DefineFilterFlags(command) + task.DefineFilterFlags(command, acceptAllTables) return command } diff --git a/cmd/br/cmd.go b/cmd/br/cmd.go index 60dd48540..54a89ff10 100644 --- a/cmd/br/cmd.go +++ b/cmd/br/cmd.go @@ -32,6 +32,20 @@ var ( hasLogFile uint64 tidbGlue = gluetidb.New() envLogToTermKey = "BR_LOG_TO_TERM" + + filterOutSysAndMemTables = []string{ + "*.*", + fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")), + "!mysql.*", + "!sys.*", + "!INFORMATION_SCHEMA.*", + "!PERFORMANCE_SCHEMA.*", + "!METRICS_SCHEMA.*", + "!INSPECTION_SCHEMA.*", + } + acceptAllTables = []string{ + "*.*", + } ) const ( diff --git a/cmd/br/restore.go b/cmd/br/restore.go index 8ff4d7110..498be61f1 100644 --- a/cmd/br/restore.go +++ b/cmd/br/restore.go @@ -120,7 +120,7 @@ func newFullRestoreCommand() *cobra.Command { return runRestoreCommand(cmd, "Full restore") }, } - task.DefineFilterFlags(command) + task.DefineFilterFlags(command, filterOutSysAndMemTables) return command } @@ -159,7 +159,7 @@ func newLogRestoreCommand() *cobra.Command { return runLogRestoreCommand(cmd) }, } - task.DefineFilterFlags(command) + task.DefineFilterFlags(command, filterOutSysAndMemTables) task.DefineLogRestoreFlags(command) return command } diff --git a/errors.toml b/errors.toml index 7ff28559a..b90fa2da6 100644 --- a/errors.toml +++ b/errors.toml @@ -191,3 +191,8 @@ error = ''' failed to write and ingest ''' +["BR:Restore:ErrUnsupportedSysTable"] +error = ''' +the system table isn't supported for restoring yet +''' + diff --git a/pkg/backup/client.go b/pkg/backup/client.go index d3e9bb932..8de9133bd 100644 --- a/pkg/backup/client.go +++ b/pkg/backup/client.go @@ -302,7 +302,7 @@ func BuildBackupRangeAndSchema( for _, dbInfo := range dbs { // skip system databases - if util.IsMemOrSysDB(dbInfo.Name.L) { + if !tableFilter.MatchSchema(dbInfo.Name.O) || util.IsMemDB(dbInfo.Name.L) { continue } diff --git a/pkg/backup/schema.go b/pkg/backup/schema.go index 9e3ee85a8..c0d3f2e89 100644 --- a/pkg/backup/schema.go +++ b/pkg/backup/schema.go @@ -130,6 +130,9 @@ func (ss *Schemas) BackupSchemas( schemas := make([]*backuppb.Schema, 0, len(ss.schemas)) for name, schema := range ss.schemas { + if utils.IsSysDB(schema.dbInfo.Name.L) { + schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O) + } dbBytes, err := json.Marshal(schema.dbInfo) if err != nil { return nil, errors.Trace(err) diff --git a/pkg/backup/schema_test.go b/pkg/backup/schema_test.go index 1ae7b4bbc..c25bd8e53 100644 --- a/pkg/backup/schema_test.go +++ b/pkg/backup/schema_test.go @@ -73,7 +73,8 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) { c.Assert(backupSchemas, IsNil) // Empty database. - noFilter, err := filter.Parse([]string{"*.*"}) + // Filter out system tables manually. + noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"}) c.Assert(err, IsNil) _, backupSchemas, err = backup.BuildBackupRangeAndSchema( s.mock.Storage, noFilter, math.MaxUint64) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index ae93ca41f..1a096861b 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -43,6 +43,7 @@ var ( ErrRestoreInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Restore:ErrRestoreInvalidRange")) ErrRestoreWriteAndIngest = errors.Normalize("failed to write and ingest", errors.RFCCodeText("BR:Restore:ErrRestoreWriteAndIngest")) ErrRestoreSchemaNotExists = errors.Normalize("schema not exists", errors.RFCCodeText("BR:Restore:ErrRestoreSchemaNotExists")) + ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable")) // TODO maybe it belongs to PiTR. ErrRestoreRTsConstrain = errors.Normalize("resolved ts constrain violation", errors.RFCCodeText("BR:Restore:ErrRestoreResolvedTsConstrain")) diff --git a/pkg/gluetidb/glue.go b/pkg/gluetidb/glue.go index 2799291f4..2cb1cdc8e 100644 --- a/pkg/gluetidb/glue.go +++ b/pkg/gluetidb/glue.go @@ -105,7 +105,7 @@ func (g Glue) GetVersion() string { // Execute implements glue.Session. func (gs *tidbSession) Execute(ctx context.Context, sql string) error { - _, err := gs.se.Execute(ctx, sql) + _, err := gs.se.ExecuteInternal(ctx, sql) return errors.Trace(err) } diff --git a/pkg/restore/systable_restore.go b/pkg/restore/systable_restore.go new file mode 100644 index 000000000..81e19e92e --- /dev/null +++ b/pkg/restore/systable_restore.go @@ -0,0 +1,185 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package restore + +import ( + "context" + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" + filter "github.com/pingcap/tidb-tools/pkg/table-filter" + "go.uber.org/multierr" + "go.uber.org/zap" + + berrors "github.com/pingcap/br/pkg/errors" + "github.com/pingcap/br/pkg/logutil" + "github.com/pingcap/br/pkg/utils" +) + +var statsTables = map[string]struct{}{ + "stats_buckets": {}, + "stats_extended": {}, + "stats_feedback": {}, + "stats_fm_sketch": {}, + "stats_histograms": {}, + "stats_meta": {}, + "stats_top_n": {}, +} + +func isStatsTable(tableName string) bool { + _, ok := statsTables[tableName] + return ok +} + +// RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema). +// Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254. +func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) { + sysDB := mysql.SystemDB + + temporaryDB := utils.TemporaryDBName(sysDB) + defer rc.cleanTemporaryDatabase(ctx, sysDB) + + if !f.MatchSchema(temporaryDB.O) { + log.Debug("system database filtered out", zap.String("database", sysDB)) + return + } + originDatabase, ok := rc.databases[temporaryDB.O] + if !ok { + log.Info("system database not backed up, skipping", zap.String("database", sysDB)) + return + } + db, ok := rc.getDatabaseByName(sysDB) + if !ok { + // Or should we create the database here? + log.Warn("target database not exist, aborting", zap.String("database", sysDB)) + return + } + + tablesRestored := make([]string, 0, len(originDatabase.Tables)) + for _, table := range originDatabase.Tables { + tableName := table.Info.Name + if f.MatchTable(sysDB, tableName.O) { + if err := rc.replaceTemporaryTableToSystable(ctx, tableName.L, db); err != nil { + logutil.WarnTerm("error during merging temporary tables into system tables", + logutil.ShortError(err), + zap.Stringer("table", tableName), + ) + } + } + tablesRestored = append(tablesRestored, tableName.L) + } + if err := rc.afterSystemTablesReplaced(ctx, tablesRestored); err != nil { + for _, e := range multierr.Errors(err) { + logutil.WarnTerm("error during reconfigurating the system tables", zap.String("database", sysDB), logutil.ShortError(e)) + } + } +} + +// database is a record of a database. +// For fast querying whether a table exists and the temporary database of it. +type database struct { + ExistingTables map[string]*model.TableInfo + Name model.CIStr + TemporaryName model.CIStr +} + +// getDatabaseByName make a record of a database from info schema by its name. +func (rc *Client) getDatabaseByName(name string) (*database, bool) { + infoSchema := rc.dom.InfoSchema() + schema, ok := infoSchema.SchemaByName(model.NewCIStr(name)) + if !ok { + return nil, false + } + db := &database{ + ExistingTables: map[string]*model.TableInfo{}, + Name: model.NewCIStr(name), + TemporaryName: utils.TemporaryDBName(name), + } + for _, t := range schema.Tables { + db.ExistingTables[t.Name.L] = t + } + return db, true +} + +// afterSystemTablesReplaced do some extra work for special system tables. +// e.g. after inserting to the table mysql.user, we must execute `FLUSH PRIVILEGES` to allow it take effect. +func (rc *Client) afterSystemTablesReplaced(ctx context.Context, tables []string) error { + var err error + for _, table := range tables { + switch { + case table == "user": + // We cannot execute `rc.dom.NotifyUpdatePrivilege` here, because there isn't + // sessionctx.Context provided by the glue. + // TODO: update the glue type and allow we retrive a session context from it. + err = multierr.Append(err, errors.Annotatef(berrors.ErrUnsupportedSystemTable, + "restored user info may not take effect, until you should execute `FLUSH PRIVILEGES` manually")) + } + } + return err +} + +// replaceTemporaryTableToSystable replaces the temporary table to real system table. +func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, tableName string, db *database) error { + execSQL := func(sql string) error { + // SQLs here only contain table name and database name, seems it is no need to redact them. + if err := rc.db.se.Execute(ctx, sql); err != nil { + log.Warn("failed to execute SQL restore system database", + zap.String("table", tableName), + zap.Stringer("database", db.Name), + zap.String("sql", sql), + zap.Error(err), + ) + return berrors.ErrUnknown.Wrap(err).GenWithStack("failed to execute %s", sql) + } + log.Info("successfully restore system database", + zap.String("table", tableName), + zap.Stringer("database", db.Name), + zap.String("sql", sql), + ) + return nil + } + + // The newly created tables have different table IDs with original tables, + // hence the old statistics are invalid. + // + // TODO: + // 1 ) Rewrite the table IDs via `UPDATE _temporary_mysql.stats_xxx SET table_id = new_table_id WHERE table_id = old_table_id` + // BEFORE replacing into and then execute `rc.statsHandler.Update(rc.dom.InfoSchema())`. + // 1.5 ) (Optional) The UPDATE statement sometimes costs, the whole system tables restore step can be place into the restore pipeline. + // 2 ) Deprecate the origin interface for backing up statistics. + if isStatsTable(tableName) { + return berrors.ErrUnsupportedSystemTable.GenWithStack("restoring stats via `mysql` schema isn't support yet: " + + "the table ID is out-of-date and may corrupt existing statistics") + } + + if db.ExistingTables[tableName] != nil { + log.Info("table existing, using replace into for restore", + zap.String("table", tableName), + zap.Stringer("schema", db.Name)) + replaceIntoSQL := fmt.Sprintf("REPLACE INTO %s SELECT * FROM %s;", + utils.EncloseDBAndTable(db.Name.L, tableName), + utils.EncloseDBAndTable(db.TemporaryName.L, tableName)) + return execSQL(replaceIntoSQL) + } + + renameSQL := fmt.Sprintf("RENAME TABLE %s TO %s;", + utils.EncloseDBAndTable(db.TemporaryName.L, tableName), + utils.EncloseDBAndTable(db.Name.L, tableName), + ) + return execSQL(renameSQL) +} + +func (rc *Client) cleanTemporaryDatabase(ctx context.Context, originDB string) { + database := utils.TemporaryDBName(originDB) + log.Debug("dropping temporary database", zap.Stringer("database", database)) + sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", utils.EncloseName(database.L)) + if err := rc.db.se.Execute(ctx, sql); err != nil { + logutil.WarnTerm("failed to drop temporary database, it should be dropped manually", + zap.Stringer("database", database), + logutil.ShortError(err), + ) + } +} diff --git a/pkg/task/common.go b/pkg/task/common.go index c41e6216a..6d69ca69a 100644 --- a/pkg/task/common.go +++ b/pkg/task/common.go @@ -208,9 +208,9 @@ func DefineTableFlags(command *cobra.Command) { } // DefineFilterFlags defines the --filter and --case-sensitive flags for `full` subcommand. -func DefineFilterFlags(command *cobra.Command) { +func DefineFilterFlags(command *cobra.Command, defaultFilter []string) { flags := command.Flags() - flags.StringArrayP(flagFilter, "f", []string{"*.*"}, "select tables to process") + flags.StringArrayP(flagFilter, "f", defaultFilter, "select tables to process") flags.Bool(flagCaseSensitive, false, "whether the table names used in --filter should be case-sensitive") } diff --git a/pkg/task/restore.go b/pkg/task/restore.go index ffd1f4f03..f11f44b06 100644 --- a/pkg/task/restore.go +++ b/pkg/task/restore.go @@ -383,6 +383,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf return errors.Trace(err) } + // The cost of rename user table / replace into system table wouldn't be so high. + // So leave it out of the pipeline for easier implementation. + client.RestoreSystemSchemas(ctx, cfg.TableFilter) + // Set task summary to success status. summary.SetSuccessStatus(true) return nil diff --git a/pkg/utils/schema.go b/pkg/utils/schema.go index 77dd5c2d7..f1f45e0d0 100644 --- a/pkg/utils/schema.go +++ b/pkg/utils/schema.go @@ -4,12 +4,14 @@ package utils import ( "encoding/json" + "fmt" "strings" "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/log" "github.com/pingcap/parser/model" + "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/tablecodec" @@ -155,3 +157,19 @@ func ArchiveSize(meta *backuppb.BackupMeta) uint64 { func EncloseName(name string) string { return "`" + strings.ReplaceAll(name, "`", "``") + "`" } + +// EncloseDBAndTable formats the database and table name in sql. +func EncloseDBAndTable(database, table string) string { + return fmt.Sprintf("%s.%s", EncloseName(database), EncloseName(table)) +} + +// IsSysDB tests whether the database is system DB. +// Currently, the only system DB is mysql. +func IsSysDB(dbLowerName string) bool { + return dbLowerName == mysql.SystemDB +} + +// TemporaryDBName makes a 'private' database name. +func TemporaryDBName(db string) model.CIStr { + return model.NewCIStr("__TiDB_BR_Temporary_" + db) +} diff --git a/tests/br_full_ddl/run.sh b/tests/br_full_ddl/run.sh index c7c0a046d..1d4ae3e58 100755 --- a/tests/br_full_ddl/run.sh +++ b/tests/br_full_ddl/run.sh @@ -74,7 +74,7 @@ cluster_index_before_backup=$(run_sql "show variables like '%cluster%';" | awk ' run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG --ignore-stats=false || cat $LOG checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs) -if [ "${checksum_count}" != "1" ];then +if [ "${checksum_count}" -lt "1" ];then echo "TEST: [$TEST_NAME] fail on fast checksum" echo $(cat $LOG | grep checksum) exit 1 diff --git a/tests/br_full_index/run.sh b/tests/br_full_index/run.sh index 2135e1c34..6d54f91f6 100755 --- a/tests/br_full_index/run.sh +++ b/tests/br_full_index/run.sh @@ -44,8 +44,8 @@ BR_LOG_TO_TERM=1 checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs) -if [ "${checksum_count}" != "$DB_COUNT" ];then - echo "TEST: [$TEST_NAME] fail on fast checksum" +if [ "${checksum_count}" -lt "$DB_COUNT" ];then + echo "TEST: [$TEST_NAME] fail on fast checksum: required $DB_COUNT databases checked, but only ${checksum_count} dbs checked" echo $(cat $LOG | grep checksum) exit 1 fi diff --git a/tests/br_systables/run.sh b/tests/br_systables/run.sh new file mode 100644 index 000000000..250c1ec44 --- /dev/null +++ b/tests/br_systables/run.sh @@ -0,0 +1,53 @@ +#! /bin/bash + +set -eux + +backup_dir=$TEST_DIR/$TEST_NAME + +test_data="('TiDB'),('TiKV'),('TiFlash'),('TiSpark'),('TiCDC'),('TiPB'),('Rust'),('C++'),('Go'),('Haskell'),('Scala')" + +modify_systables() { + run_sql "CREATE USER 'Alyssa P. Hacker'@'%' IDENTIFIED BY 'password';" + run_sql "UPDATE mysql.tidb SET VARIABLE_VALUE = '1h' WHERE VARIABLE_NAME = 'tikv_gc_life_time';" + + run_sql "CREATE TABLE mysql.foo(pk int primary key auto_increment, field varchar(255));" + run_sql "CREATE TABLE mysql.bar(pk int primary key auto_increment, field varchar(255));" + + run_sql "INSERT INTO mysql.foo(field) VALUES $test_data" + run_sql "INSERT INTO mysql.bar(field) VALUES $test_data" + + go-ycsb load mysql -P tests/"$TEST_NAME"/workload \ + -p mysql.host="$TIDB_IP" \ + -p mysql.port="$TIDB_PORT" \ + -p mysql.user=root \ + -p mysql.db=mysql + + run_sql "ANALYZE TABLE mysql.usertable;" +} + +rollback_modify() { + run_sql "DROP TABLE mysql.foo;" + run_sql "DROP TABLE mysql.bar;" + run_sql "UPDATE mysql.tidb SET VARIABLE_VALUE = '10m' WHERE VARIABLE_NAME = 'tikv_gc_life_time';" + run_sql "DROP USER 'Alyssa P. Hacker';" + run_sql "DROP TABLE mysql.usertable;" +} + +check() { + run_sql "SELECT count(*) from mysql.foo;" | grep 11 + run_sql "SELECT count(*) from mysql.usertable;" | grep 1000 + run_sql "SHOW TABLES IN mysql;" | grep -v bar + run_sql "SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'" | grep "1h" + + # TODO remove this after supporting auto flush. + run_sql "FLUSH PRIVILEGES;" + run_sql "SELECT CURRENT_USER();" -u'Alyssa P. Hacker' -p'password' | grep 'Alyssa P. Hacker' + run_sql "SHOW DATABASES" | grep -v '__TiDB_BR_Temporary_' + # TODO check stats after supportting. +} + +modify_systables +run_br backup full -s "local://$backup_dir" +rollback_modify +run_br restore full -f '*.*' -f '!mysql.bar' -s "local://$backup_dir" +check diff --git a/tests/br_systables/workload b/tests/br_systables/workload new file mode 100644 index 000000000..664fe7ee8 --- /dev/null +++ b/tests/br_systables/workload @@ -0,0 +1,12 @@ +recordcount=1000 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform