Skip to content

Commit

Permalink
workloadrepo: make sure WORKLOAD_SCHEMA is ignored by BR (pingcap#58878)
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 authored Mar 3, 2025
1 parent af93fff commit c73ae58
Show file tree
Hide file tree
Showing 16 changed files with 57 additions and 27 deletions.
3 changes: 2 additions & 1 deletion br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/mysql"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/redact"
kvutil "github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -755,7 +756,7 @@ func (rc *SnapClient) GetDatabaseMap() map[int64]*metautil.Database {
// HasBackedUpSysDB whether we have backed up system tables
// br backs system tables up since 5.1.0
func (rc *SnapClient) HasBackedUpSysDB() bool {
sysDBs := []string{"mysql", "sys"}
sysDBs := []string{mysql.SystemDB, mysql.SysDB, mysql.WorkloadSchema}
for _, db := range sysDBs {
temporaryDB := utils.TemporaryDBName(db)
_, backedUp := rc.databases[temporaryDB.O]
Expand Down
9 changes: 8 additions & 1 deletion br/pkg/restore/snap_client/systable_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,14 @@ var unRecoverableTable = map[string]map[string]struct{}{
},
}

var unRecoverableSchema = map[string]struct{}{
mysql.WorkloadSchema: {},
}

func isUnrecoverableTable(schemaName string, tableName string) bool {
if _, ok := unRecoverableSchema[schemaName]; ok {
return true
}
tableMap, ok := unRecoverableTable[schemaName]
if !ok {
return false
Expand Down Expand Up @@ -159,7 +166,7 @@ func isPlanReplayerTables(schemaName string, tableName string) bool {
// RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema).
// Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254.
func (rc *SnapClient) RestoreSystemSchemas(ctx context.Context, f filter.Filter) (rerr error) {
sysDBs := []string{mysql.SystemDB, mysql.SysDB}
sysDBs := []string{mysql.SystemDB, mysql.SysDB, mysql.WorkloadSchema}
for _, sysDB := range sysDBs {
err := rc.restoreSystemSchema(ctx, f, sysDB)
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions br/pkg/utils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ func IsTemplateSysDB(dbname ast.CIStr) bool {

// IsSysDB tests whether the database is system DB.
// Currently, both `mysql` and `sys` are system DB.
func IsSysDB(dbName string) bool {
// just in case
dbLowerName := strings.ToLower(dbName)
return dbLowerName == mysql.SystemDB || dbLowerName == mysql.SysDB
func IsSysDB(dbLowerName string) bool {
return dbLowerName == mysql.SystemDB || dbLowerName == mysql.SysDB || dbLowerName == mysql.WorkloadSchema
}

// TemporaryDBName makes a 'private' database name.
Expand Down
13 changes: 13 additions & 0 deletions br/tests/br_systables/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ modify_systables() {
-p mysql.db=mysql

run_sql "ANALYZE TABLE mysql.usertable;"

# enable workload schema
run_sql "SET GLOBAL tidb_workload_repository_dest = 'table';"
sleep 5
run_sql "ADMIN CREATE WORKLOAD SNAPSHOT;"
# disable workload schema
run_sql "SET GLOBAL tidb_workload_repository_dest = '';"
}

add_user() {
Expand Down Expand Up @@ -53,6 +60,8 @@ rollback_modify() {
# FIXME don't check the user table until we support restore user correctly.
# run_sql "DROP USER 'Alyssa P. Hacker';"
run_sql "DROP TABLE mysql.usertable;"

run_sql "DROP DATABASE IF EXISTS workload_schema;"
}

check() {
Expand All @@ -62,6 +71,10 @@ check() {
# we cannot let user overwrite `mysql.tidb` through br in any time.
run_sql "SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'" | awk '/1h/{exit 1}'

run_sql "SELECT SCHEMA_NAME FROM information_schema.schemata;"
# workload_schema schema should not be recovered
check_not_contains "workload_schema"

# FIXME don't check the user table until we support restore user correctly.
# TODO remove this after supporting auto flush.
# run_sql "FLUSH PRIVILEGES;"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4067,7 +4067,7 @@ var systemTables = map[string]struct{}{
}

func isUndroppableTable(schema, table string) bool {
if schema == "workload_schema" {
if schema == mysql.WorkloadSchema {
return true
}
if schema != mysql.SystemDB {
Expand Down
4 changes: 3 additions & 1 deletion pkg/parser/mysql/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ const (
AuthLDAPSASL = "authentication_ldap_sasl"
)

// MySQL database and tables.
// System database and tables that mostly inherited from MySQL.
const (
// SystemDB is the name of system database.
SystemDB = "mysql"
Expand Down Expand Up @@ -225,6 +225,8 @@ const (
DefaultRoleTable = "default_roles"
// PasswordHistoryTable is the table in system db contains password history.
PasswordHistoryTable = "password_history"
// WorkloadSchema is the name of workload repository database.
WorkloadSchema = "workload_schema"
)

// MySQL type maximum length.
Expand Down
1 change: 1 addition & 0 deletions pkg/util/filter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/util/filter",
visibility = ["//visibility:public"],
deps = [
"//pkg/parser/mysql",
"//pkg/util/table-filter",
"//pkg/util/table-rule-selector",
"@com_github_pingcap_errors//:errors",
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/filter/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package filter

import (
"strings"

"github.com/pingcap/tidb/pkg/parser/mysql"
)

var (
Expand All @@ -29,6 +31,8 @@ var (
MetricSchemaName = "METRICS_SCHEMA"
// InspectionSchemaName is the `INSPECTION_SCHEMA` database name
InspectionSchemaName = "INSPECTION_SCHEMA"
// WorkloadSchemaName is the `WORKLOAD_SCHEMA` database name
WorkloadSchemaName = strings.ToUpper(mysql.WorkloadSchema)
)

// IsSystemSchema checks whether schema is system schema or not.
Expand All @@ -39,6 +43,7 @@ func IsSystemSchema(schema string) bool {
case DMHeartbeatSchema, // do not create table in it manually
"SYS", // https://dev.mysql.com/doc/refman/8.0/en/sys-schema.html
"MYSQL", // the name of system database.
WorkloadSchemaName,
InformationSchemaName,
InspectionSchemaName,
PerformanceSchemaName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func IsMemDB(dbLowerName string) bool {

// IsSysDB checks whether dbLowerName is system database.
func IsSysDB(dbLowerName string) bool {
return dbLowerName == mysql.SystemDB || dbLowerName == mysql.SysDB
return dbLowerName == mysql.SystemDB || dbLowerName == mysql.SysDB || dbLowerName == mysql.WorkloadSchema
}

// IsSystemView is similar to IsMemOrSyDB, but does not include the mysql schema
Expand Down
1 change: 1 addition & 0 deletions pkg/util/workloadrepo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_test(
"//pkg/kv",
"//pkg/owner",
"//pkg/parser/ast",
"//pkg/parser/mysql",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/testkit",
Expand Down
4 changes: 1 addition & 3 deletions pkg/util/workloadrepo/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,11 @@ const (
defSnapshotInterval = 3600
defRententionDays = 7

// WorkloadSchema is the name of database for workloadrepo worker.
WorkloadSchema = "WORKLOAD_SCHEMA"
histSnapshotsTable = "HIST_SNAPSHOTS"
)

var (
workloadSchemaCIStr = ast.NewCIStr(WorkloadSchema)
workloadSchemaCIStr = ast.NewCIStr(mysql.WorkloadSchema)
zeroTime = time.Time{}

errWrongValueForVar = dbterror.ClassUtil.NewStd(errno.ErrWrongValueForVar)
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/workloadrepo/housekeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand All @@ -48,7 +49,7 @@ func createPartition(ctx context.Context, is infoschema.InfoSchema, tbl *reposit
tbInfo := tbSchema.Meta()

sb := &strings.Builder{}
sqlescape.MustFormatSQL(sb, "ALTER TABLE %n.%n ADD PARTITION (", WorkloadSchema, tbl.destTable)
sqlescape.MustFormatSQL(sb, "ALTER TABLE %n.%n ADD PARTITION (", mysql.WorkloadSchema, tbl.destTable)
skip, err := generatePartitionRanges(sb, tbInfo, now)
if err != nil {
logutil.BgLogger().Info("workload repository cannot generate partition definitions", zap.String("table", tbl.destTable), zap.NamedError("err", err))
Expand Down Expand Up @@ -98,7 +99,7 @@ func dropOldPartition(ctx context.Context, is infoschema.InfoSchema,
}
sb := &strings.Builder{}
sqlescape.MustFormatSQL(sb, "ALTER TABLE %n.%n DROP PARTITION %n",
WorkloadSchema, tbl.destTable, pt.Name.L)
mysql.WorkloadSchema, tbl.destTable, pt.Name.L)
_, err = execRetry(ctx, sess, sb.String())
if err != nil {
return fmt.Errorf("workload repository cannot drop partition (%s) on '%s': %v", pt.Name.L, tbl.destTable, err)
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/workloadrepo/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -92,7 +93,7 @@ func (w *worker) getSnapID(ctx context.Context) (uint64, error) {
func upsertHistSnapshot(ctx context.Context, sctx sessionctx.Context, snapID uint64) error {
// TODO: fill DB_VER, WR_VER
snapshotsInsert := sqlescape.MustEscapeSQL("INSERT INTO %n.%n (`BEGIN_TIME`, `SNAP_ID`) VALUES (now(), %%?) ON DUPLICATE KEY UPDATE `BEGIN_TIME` = now()",
WorkloadSchema, histSnapshotsTable)
mysql.WorkloadSchema, histSnapshotsTable)
_, err := runQuery(ctx, sctx, snapshotsInsert, snapID)
return err
}
Expand All @@ -107,7 +108,7 @@ func (w *worker) updateHistSnapshot(ctx context.Context, snapID uint64, errs []e
nerr = err.Error()
}

snapshotsUpdate := sqlescape.MustEscapeSQL("UPDATE %n.%n SET `END_TIME` = now(), `ERROR` = COALESCE(CONCAT(ERROR, %%?), ERROR, %%?) WHERE `SNAP_ID` = %%?", WorkloadSchema, histSnapshotsTable)
snapshotsUpdate := sqlescape.MustEscapeSQL("UPDATE %n.%n SET `END_TIME` = now(), `ERROR` = COALESCE(CONCAT(ERROR, %%?), ERROR, %%?) WHERE `SNAP_ID` = %%?", mysql.WorkloadSchema, histSnapshotsTable)
_, err := runQuery(ctx, sctx, snapshotsUpdate, nerr, nerr, snapID)
return err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/util/workloadrepo/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/util/slice"
Expand All @@ -40,7 +41,7 @@ func buildCreateQuery(ctx context.Context, sess sessionctx.Context, rt *reposito
}

sb := &strings.Builder{}
sqlescape.MustFormatSQL(sb, "CREATE TABLE IF NOT EXISTS %n.%n (", WorkloadSchema, rt.destTable)
sqlescape.MustFormatSQL(sb, "CREATE TABLE IF NOT EXISTS %n.%n (", mysql.WorkloadSchema, rt.destTable)
if rt.tableType == snapshotTable {
fmt.Fprintf(sb, "`SNAP_ID` INT UNSIGNED NOT NULL, ")
}
Expand Down Expand Up @@ -68,7 +69,7 @@ func buildInsertQuery(ctx context.Context, sess sessionctx.Context, rt *reposito
}

sb := &strings.Builder{}
sqlescape.MustFormatSQL(sb, "INSERT %n.%n (", WorkloadSchema, rt.destTable)
sqlescape.MustFormatSQL(sb, "INSERT %n.%n (", mysql.WorkloadSchema, rt.destTable)

if rt.tableType == snapshotTable {
fmt.Fprint(sb, "`SNAP_ID`, ")
Expand Down Expand Up @@ -105,7 +106,7 @@ func (w *worker) createAllTables(ctx context.Context, now time.Time) error {
defer w.sesspool.Put(_sessctx)
is := sess.GetDomainInfoSchema().(infoschema.InfoSchema)
if !is.SchemaExists(workloadSchemaCIStr) {
_, err := execRetry(ctx, sess, "create database if not exists "+WorkloadSchema)
_, err := execRetry(ctx, sess, "create database if not exists "+mysql.WorkloadSchema)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/workloadrepo/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
Expand Down Expand Up @@ -77,7 +78,7 @@ var workloadTables = []repositoryTable{
DB_VER JSON NULL COMMENT 'Versions of TiDB, TiKV, PD at the moment',
WR_VER int unsigned NULL COMMENT 'Version to identify the compatibility of workload schema between releases.',
SOURCE VARCHAR(20) NULL COMMENT 'The program that initializes the snaphost. ',
ERROR TEXT DEFAULT NULL COMMENT 'extra messages are written if anything happens to block that snapshots.')`, WorkloadSchema, histSnapshotsTable),
ERROR TEXT DEFAULT NULL COMMENT 'extra messages are written if anything happens to block that snapshots.')`, mysql.WorkloadSchema, histSnapshotsTable),
"",
},
{"INFORMATION_SCHEMA", "TIDB_INDEX_USAGE", snapshotTable, "", "", "", ""},
Expand Down Expand Up @@ -138,7 +139,7 @@ func takeSnapshot(ctx context.Context) error {
snapID, err := workerCtx.takeSnapshot(ctx)
if err != nil {
logutil.BgLogger().Info("workload repository manual snapshot failed", zap.String("owner", workerCtx.instanceID), zap.NamedError("err", err))
return errCouldNotStartSnapshot.GenWithStackByArgs()
return errCouldNotStartSnapshot.GenWithStackByArgs(err)
}

logutil.BgLogger().Info("workload repository ran manual snapshot", zap.String("owner", workerCtx.instanceID), zap.Uint64("snapID", snapID))
Expand Down
12 changes: 6 additions & 6 deletions pkg/util/workloadrepo/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -141,7 +142,7 @@ func waitForTables(ctx context.Context, t *testing.T, wrk *worker, now time.Time
func TestRaceToCreateTablesWorker(t *testing.T) {
ctx, store, dom, addr := setupDomainAndContext(t)

_, ok := dom.InfoSchema().SchemaByName(ast.NewCIStr("workload_schema"))
_, ok := dom.InfoSchema().SchemaByName(workloadSchemaCIStr)
require.False(t, ok)

wrk1 := setupWorker(ctx, t, addr, dom, "worker1", true)
Expand Down Expand Up @@ -240,7 +241,6 @@ func TestMultipleWorker(t *testing.T) {
require.Eventually(t, func() bool {
return wrk1.owner.IsOwner()
}, time.Minute, time.Second)

// start worker 2 again
require.NoError(t, wrk2.setRepositoryDest(ctx, "table"))
eventuallyWithLock(t, wrk2, func() bool { return wrk2.owner != nil })
Expand All @@ -256,7 +256,7 @@ func TestGlobalWorker(t *testing.T) {
ctx, store, dom, addr := setupDomainAndContext(t)
tk := testkit.NewTestKit(t, store)

_, ok := dom.InfoSchema().SchemaByName(ast.NewCIStr("workload_schema"))
_, ok := dom.InfoSchema().SchemaByName(workloadSchemaCIStr)
require.False(t, ok)

wrk := setupWorker(ctx, t, addr, dom, "worker", false)
Expand All @@ -276,7 +276,7 @@ func TestAdminWorkloadRepo(t *testing.T) {
ctx, store, dom, addr := setupDomainAndContext(t)
tk := testkit.NewTestKit(t, store)

_, ok := dom.InfoSchema().SchemaByName(ast.NewCIStr("workload_schema"))
_, ok := dom.InfoSchema().SchemaByName(workloadSchemaCIStr)
require.False(t, ok)

wrk := setupWorker(ctx, t, addr, dom, "worker", false)
Expand Down Expand Up @@ -320,7 +320,7 @@ func validateDate(t *testing.T, row []any, idx int, lastRowTs time.Time, maxSecs
}

func SamplingTimingWorker(t *testing.T, tk *testkit.TestKit, lastRowTs time.Time, cnt int, maxSecs int) time.Time {
rows := getRows(t, tk, cnt, maxSecs, "select instance_id, ts from "+WorkloadSchema+".hist_memory_usage where ts > '"+lastRowTs.Format("2006-01-02 15:04:05")+"' order by ts asc")
rows := getRows(t, tk, cnt, maxSecs, "select instance_id, ts from "+mysql.WorkloadSchema+".hist_memory_usage where ts > '"+lastRowTs.Format("2006-01-02 15:04:05")+"' order by ts asc")

for _, row := range rows {
// check that the instance_id is correct
Expand Down Expand Up @@ -362,7 +362,7 @@ func findMatchingRowForSnapshot(t *testing.T, rowidx int, snapRows [][]any, row
}

func SnapshotTimingWorker(t *testing.T, tk *testkit.TestKit, lastRowTs time.Time, lastSnapID int, cnt int, maxSecs int) (time.Time, int) {
rows := getRows(t, tk, cnt, maxSecs, "select snap_id, begin_time from "+WorkloadSchema+"."+histSnapshotsTable+" where begin_time > '"+lastRowTs.Format("2006-01-02 15:04:05")+"' order by begin_time asc")
rows := getRows(t, tk, cnt, maxSecs, "select snap_id, begin_time from "+mysql.WorkloadSchema+"."+histSnapshotsTable+" where begin_time > '"+lastRowTs.Format("2006-01-02 15:04:05")+"' order by begin_time asc")

// We want to get all rows if we are starting from 0.
snapWhere := ""
Expand Down

0 comments on commit c73ae58

Please sign in to comment.