Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: make pitr executor work #42944

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,6 +1117,7 @@ const (
ErrResourceGroupSupportDisabled = 8250
ErrResourceGroupConfigUnavailable = 8251
ErrResourceGroupThrottled = 8252
ErrBRIEStreamFailed = 8253

// TiKV/PD/TiFlash errors.
ErrPDServerTimeout = 9001
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrBRIERestoreFailed: mysql.Message("Restore failed: %s", nil),
ErrBRIEImportFailed: mysql.Message("Import failed: %s", nil),
ErrBRIEExportFailed: mysql.Message("Export failed: %s", nil),
ErrBRIEStreamFailed: mysql.Message("Stream failed: %s", nil),

ErrInvalidTableSample: mysql.Message("Invalid TABLESAMPLE: %s", nil),

Expand Down
86 changes: 85 additions & 1 deletion executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor
import (
"bytes"
"context"
"github.com/pingcap/tidb/br/pkg/utils"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -232,6 +233,7 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
Checksum: true,
SendCreds: true,
LogProgress: true,

CipherInfo: backuppb.CipherInfo{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
},
Expand Down Expand Up @@ -304,6 +306,9 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
switch s.Kind {
case ast.BRIEKindBackup:
e.backupCfg = &task.BackupConfig{Config: cfg}
// Set some default flags
e.backupCfg.UseCheckpoint = true
e.backupCfg.IgnoreStats = true

for _, opt := range s.Options {
switch opt.Tp {
Expand All @@ -330,12 +335,65 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
}
}

case ast.BRIEKindRestore:
case ast.BRIEKindRestore, ast.BRIEKindRestorePIT:
e.restoreCfg = &task.RestoreConfig{Config: cfg}
for _, opt := range s.Options {
switch opt.Tp {
case ast.BRIEOptionOnline:
e.restoreCfg.Online = opt.UintValue != 0
case ast.BRIEOptionStartTS:
tso, err := b.parseTSString(opt.StrValue)
if err != nil {
b.err = err
return nil
}
e.restoreCfg.StartTS = tso
case ast.BRIEOptionRestoredTS:
tso, err := b.parseTSString(opt.StrValue)
if err != nil {
b.err = err
return nil
}
e.restoreCfg.RestoreTS = tso
case ast.BRIEOptionFullBackupStorage:
e.restoreCfg.FullBackupStorage = opt.StrValue
}
}

case ast.BRIEKindStreamStart, ast.BRIEKindStreamStatus, ast.BRIEKindStreamStop, ast.BRIEKindStreamPause, ast.BRIEKindStreamPurge, ast.BRIEKindStreamResume, ast.BRIEKindStreamMetaData:
e.streamCfg = &task.StreamConfig{Config: cfg}

for _, opt := range s.Options {
switch opt.Tp {
case ast.BRIEOptionStartTS:
tso, err := b.parseTSString(opt.StrValue)
if err != nil {
b.err = err
return nil
}
e.streamCfg.StartTS = tso

case ast.BRIEOptionEndTS:
tso, err := b.parseTSString(opt.StrValue)
if err != nil {
b.err = err
return nil
}
e.streamCfg.EndTS = tso

case ast.BRIEOptionUntilTS:
tso, err := b.parseTSString(opt.StrValue)
if err != nil {
b.err = err
return nil
}
e.streamCfg.Until = tso
case ast.BRIEOptionGCTTL:
if opt.IntValue <= 0 {
e.streamCfg.SafePointTTL = utils.DefaultStreamPauseSafePointTTL
} else {
e.streamCfg.SafePointTTL = opt.IntValue
}
}
}

Expand All @@ -353,6 +411,7 @@ type BRIEExec struct {

backupCfg *task.BackupConfig
restoreCfg *task.RestoreConfig
streamCfg *task.StreamConfig
info *brieTaskInfo
}

Expand Down Expand Up @@ -397,11 +456,32 @@ func (e *BRIEExec) Next(ctx context.Context, req *chunk.Chunk) error {
e.info.execTime = types.CurrentTime(mysql.TypeDatetime)
glue := &tidbGlueSession{se: e.ctx, progress: progress, info: e.info}

// use 'pitr' as the default task-name for backup log related tasks
switch e.info.kind {
case ast.BRIEKindStreamStop, ast.BRIEKindStreamStatus, ast.BRIEKindStreamStart, ast.BRIEKindStreamResume, ast.BRIEKindStreamPause:
e.streamCfg.TaskName = "pitr"
default:
}

switch e.info.kind {
case ast.BRIEKindBackup:
err = handleBRIEError(task.RunBackup(taskCtx, glue, "Backup", e.backupCfg), exeerrors.ErrBRIEBackupFailed)
case ast.BRIEKindRestore:
err = handleBRIEError(task.RunRestore(taskCtx, glue, "Restore", e.restoreCfg), exeerrors.ErrBRIERestoreFailed)
case ast.BRIEKindStreamStart:
err = handleBRIEError(task.RunStreamStart(taskCtx, glue, "Start Backup log", e.streamCfg), exeerrors.ErrBRIEStreamFailed)
case ast.BRIEKindStreamPurge:
err = handleBRIEError(task.RunStreamTruncate(taskCtx, glue, "Purge Backup Log", e.streamCfg), exeerrors.ErrBRIEStreamFailed)
case ast.BRIEKindStreamPause:
err = handleBRIEError(task.RunStreamPause(taskCtx, glue, "Pause Backup Log", e.streamCfg), exeerrors.ErrBRIEStreamFailed)
case ast.BRIEKindStreamStop:
err = handleBRIEError(task.RunStreamStop(taskCtx, glue, "Stop Backup Log", e.streamCfg), exeerrors.ErrBRIEStreamFailed)
case ast.BRIEKindStreamResume:
err = handleBRIEError(task.RunStreamResume(taskCtx, glue, "Resume Backup Log", e.streamCfg), exeerrors.ErrBRIEStreamFailed)
case ast.BRIEKindStreamStatus:
err = handleBRIEError(task.RunStreamStatus(taskCtx, glue, "Resume Backup Log", e.streamCfg), exeerrors.ErrBRIEStreamFailed)
case ast.BRIEKindRestorePIT:
err = handleBRIEError(task.RunStreamRestore(taskCtx, glue, "RESTORE PiTR", e.restoreCfg), exeerrors.ErrBRIEStreamFailed)
default:
err = errors.Errorf("unsupported BRIE statement kind: %s", e.info.kind)
}
Expand All @@ -424,6 +504,10 @@ func (e *BRIEExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.AppendUint64(3, e.info.restoreTS)
req.AppendTime(4, e.info.queueTime)
req.AppendTime(5, e.info.execTime)
default:
req.AppendUint64(2, e.info.backupTS)
req.AppendTime(3, e.info.queueTime)
req.AppendTime(4, e.info.execTime)
}
e.info = nil
return nil
Expand Down
39 changes: 17 additions & 22 deletions parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3116,7 +3116,6 @@ type BRIEOptionType uint16

const (
BRIEKindBackup BRIEKind = iota
BRIEKindCancelJob
BRIEKindStreamStart
BRIEKindStreamMetaData
BRIEKindStreamStatus
Expand All @@ -3126,8 +3125,6 @@ const (
BRIEKindStreamPurge
BRIEKindRestore
BRIEKindRestorePIT
BRIEKindShowJob
BRIEKindShowQuery
BRIEKindShowBackupMeta
// common BRIE options
BRIEOptionRateLimit BRIEOptionType = iota + 1
Expand All @@ -3136,6 +3133,7 @@ const (
BRIEOptionSendCreds
BRIEOptionCheckpoint
BRIEOptionStartTS
BRIEOptionEndTS
BRIEOptionUntilTS
// backup options
BRIEOptionBackupTimeAgo
Expand Down Expand Up @@ -3183,29 +3181,23 @@ func (kind BRIEKind) String() string {
case BRIEKindRestore:
return "RESTORE"
case BRIEKindStreamStart:
return "BACKUP LOGS"
return "BACKUP INCREMENTAL START"
case BRIEKindStreamStop:
return "STOP BACKUP LOGS"
return "BACKUP INCREMENTAL STOP"
case BRIEKindStreamPause:
return "PAUSE BACKUP LOGS"
return "BACKUP INCREMENTAL PAUSE"
case BRIEKindStreamResume:
return "RESUME BACKUP LOGS"
return "BACKUP INCREMENTAL RESUME"
case BRIEKindStreamStatus:
return "SHOW BACKUP LOGS STATUS"
case BRIEKindStreamMetaData:
return "SHOW BACKUP LOGS METADATA"
return "BACKUP INCREMENTAL STATUS"
case BRIEKindStreamPurge:
return "PURGE BACKUP LOGS"
case BRIEKindRestorePIT:
return "RESTORE POINT"
case BRIEKindShowJob:
return "SHOW BR JOB"
case BRIEKindShowQuery:
return "SHOW BR JOB QUERY"
case BRIEKindCancelJob:
return "CANCEL BR JOB"
return "BACKUP INCREMENTAL TRUNCATE"
case BRIEKindStreamMetaData:
return "BACKUP INCREMENTAL METADATA"
case BRIEKindShowBackupMeta:
return "SHOW BACKUP METADATA"
case BRIEKindRestorePIT:
return "RESTORE POINT"
default:
return ""
}
Expand Down Expand Up @@ -3263,6 +3255,8 @@ func (kind BRIEOptionType) String() string {
return "RESTORED_TS"
case BRIEOptionStartTS:
return "START_TS"
case BRIEOptionEndTS:
return "END_TS"
case BRIEOptionUntilTS:
return "UNTIL_TS"
case BRIEOptionGCTTL:
Expand All @@ -3289,13 +3283,14 @@ type BRIEOption struct {
Tp BRIEOptionType
StrValue string
UintValue uint64
IntValue int64
}

func (opt *BRIEOption) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord(opt.Tp.String())
ctx.WritePlain(" = ")
switch opt.Tp {
case BRIEOptionBackupTS, BRIEOptionLastBackupTS, BRIEOptionBackend, BRIEOptionOnDuplicate, BRIEOptionTiKVImporter, BRIEOptionCSVDelimiter, BRIEOptionCSVNull, BRIEOptionCSVSeparator, BRIEOptionFullBackupStorage, BRIEOptionRestoredTS, BRIEOptionStartTS, BRIEOptionUntilTS, BRIEOptionGCTTL:
case BRIEOptionBackupTS, BRIEOptionLastBackupTS, BRIEOptionBackend, BRIEOptionOnDuplicate, BRIEOptionTiKVImporter, BRIEOptionCSVDelimiter, BRIEOptionCSVNull, BRIEOptionCSVSeparator, BRIEOptionFullBackupStorage, BRIEOptionRestoredTS, BRIEOptionStartTS, BRIEOptionEndTS, BRIEOptionUntilTS:
ctx.WriteString(opt.StrValue)
case BRIEOptionBackupTimeAgo:
ctx.WritePlainf("%d ", opt.UintValue/1000)
Expand All @@ -3314,6 +3309,8 @@ func (opt *BRIEOption) Restore(ctx *format.RestoreCtx) error {
case BRIEOptionChecksum, BRIEOptionAnalyze:
// BACKUP/RESTORE doesn't support OPTIONAL value for now, should warn at executor
ctx.WriteKeyWord(BRIEOptionLevel(opt.UintValue).String())
case BRIEOptionGCTTL:
ctx.WritePlainf("%d", opt.IntValue)
default:
ctx.WritePlainf("%d", opt.UintValue)
}
Expand Down Expand Up @@ -3384,8 +3381,6 @@ func (n *BRIEStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord(" FROM ")
ctx.WriteString(n.Storage)
}
case BRIEKindCancelJob, BRIEKindShowJob, BRIEKindShowQuery:
ctx.WritePlainf(" %d", n.JobID)
case BRIEKindStreamStart:
ctx.WriteKeyWord(" TO ")
ctx.WriteString(n.Storage)
Expand Down
2 changes: 1 addition & 1 deletion parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ var tokenMap = map[string]int{
"ENCLOSED": enclosed,
"ENCRYPTION": encryption,
"END": end,
"END_TS": endTS,
"ENFORCED": enforced,
"ENGINE": engine,
"ENGINES": engines,
Expand Down Expand Up @@ -590,7 +591,6 @@ var tokenMap = map[string]int{
"PROFILES": profiles,
"PROXY": proxy,
"PUMP": pump,
"PURGE": purge,
"QUARTER": quarter,
"QUERIES": queries,
"QUERY": query,
Expand Down
Loading