Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br/operator: added some commands for managing migrations #56857

Merged
merged 5 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions br/cmd/br/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func newOperatorCommand() *cobra.Command {
cmd.AddCommand(newPrepareForSnapshotBackupCommand(
"prepare-for-snapshot-backup",
"pause gc, schedulers and importing until the program exits, for snapshot backup."))
cmd.AddCommand(newBase64ifyCommand())
cmd.AddCommand(newListMigrationsCommand())
cmd.AddCommand(newMigrateToCommand())
return cmd
}

Expand All @@ -52,3 +55,57 @@ func newPrepareForSnapshotBackupCommand(use string, short string) *cobra.Command
operator.DefineFlagsForPrepareSnapBackup(cmd.Flags())
return cmd
}

func newBase64ifyCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "base64ify [-r] -s <storage>",
Short: "generate base64 for a storage. this may be passed to `tikv-ctl compact-log-backup`.",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.Base64ifyConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.Base64ify(ctx, cfg)
},
}
operator.DefineFlagsForBase64ifyConfig(cmd.Flags())
return cmd
}

func newListMigrationsCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "list-migrations",
Short: "list all migrations",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.ListMigrationConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.RunListMigrations(ctx, cfg)
},
}
operator.DefineFlagsForListMigrationConfig(cmd.Flags())
return cmd
}

func newMigrateToCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "migrate-to",
Short: "migrate to a specific version",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
cfg := operator.MigrateToConfig{}
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return err
}
ctx := GetDefaultContext()
return operator.RunMigrateTo(ctx, cfg)
},
}
operator.DefineFlagsForMigrateToConfig(cmd.Flags())
return cmd
}
10 changes: 9 additions & 1 deletion br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,28 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "operator",
srcs = [
"cmd.go",
"base64ify.go",
"config.go",
"list_migration.go",
"migrate_to.go",
"prepare_snap.go",
],
importpath = "github.com/pingcap/tidb/br/pkg/task/operator",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/backup/prepare_snap",
"//br/pkg/errors",
"//br/pkg/glue",
"//br/pkg/logutil",
"//br/pkg/pdutil",
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/task",
"//br/pkg/utils",
"@com_github_fatih_color//:color",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_log//:log",
"@com_github_spf13_pflag//:pflag",
"@com_github_tikv_client_go_v2//tikv",
Expand Down
75 changes: 75 additions & 0 deletions br/pkg/task/operator/base64ify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package operator

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"

"github.com/fatih/color"
"github.com/pingcap/errors"
backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/storage"
)

func Base64ify(ctx context.Context, cfg Base64ifyConfig) error {
if cfg.Revert {
return runRevert(ctx, cfg)
}
return runEncode(ctx, cfg) // Assuming runEncode will be similarly modified to accept Base64ifyConfig
}

func runRevert(ctx context.Context, cfg Base64ifyConfig) error {
b64 := cfg.StorageURI
data, err := base64.RawStdEncoding.DecodeString(b64)
if err != nil {
return errors.Trace(err)
}

backend := backup.StorageBackend{}
if err := backend.Unmarshal(data); err != nil {
return errors.Trace(err)
}

s, err := storage.Create(ctx, &backend, false)
if err != nil {
return errors.Trace(err)
}
type Output struct {
Uri string `json:"uri"`
backup.StorageBackend
}
if err := backend.Unmarshal(data); err != nil {
return errors.Trace(err)
}

out := Output{
Uri: s.URI(),
StorageBackend: backend,
}
return json.NewEncoder(os.Stdout).Encode(out)
}

func runEncode(ctx context.Context, cfg Base64ifyConfig) error {
s, err := storage.ParseBackend(cfg.StorageURI, &cfg.BackendOptions)
if err != nil {
return err
}
if cfg.LoadCerd {
_, err := storage.New(ctx, s, &storage.ExternalStorageOptions{
SendCredentials: true,
})
if err != nil {
return err
}
fmt.Fprintln(os.Stderr, color.HiRedString("Credientials are encoded to the base64 string. DON'T share this with untrusted people!"))
}

sBytes, err := s.Marshal()
if err != nil {
return err
}
fmt.Println(base64.StdEncoding.EncodeToString(sBytes))
return nil
}
144 changes: 144 additions & 0 deletions br/pkg/task/operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package operator
import (
"time"

"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/spf13/pflag"
)
Expand Down Expand Up @@ -42,3 +45,144 @@ func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) error {

return nil
}

type Base64ifyConfig struct {
storage.BackendOptions
Revert bool
StorageURI string
LoadCerd bool
}

func DefineFlagsForBase64ifyConfig(flags *pflag.FlagSet) {
storage.DefineFlags(flags)
flags.BoolP("revert", "r", false, "Do the revert operation.")
flags.StringP("storage", "s", "", "The external storage input.")
flags.BoolP("load-creds", "c", false, "whether loading the credientials from current environment and marshal them to the base64 string. [!]")
}

func (cfg *Base64ifyConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
err = cfg.BackendOptions.ParseFromFlags(flags)
if err != nil {
return err
}
cfg.Revert, err = flags.GetBool("revert")
if err != nil {
return err
}
cfg.StorageURI, err = flags.GetString("storage")
if err != nil {
return err
}
cfg.LoadCerd, err = flags.GetBool("load-creds")
if err != nil {
return err
}
return nil
}

type ListMigrationConfig struct {
storage.BackendOptions
StorageURI string
JSONOutput bool
}

func DefineFlagsForListMigrationConfig(flags *pflag.FlagSet) {
storage.DefineFlags(flags)
flags.StringP("storage", "s", "", "the external storage input.")
flags.Bool("json", false, "output the result in json format.")
}

func (cfg *ListMigrationConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
err = cfg.BackendOptions.ParseFromFlags(flags)
if err != nil {
return err
}
cfg.StorageURI, err = flags.GetString("storage")
if err != nil {
return err
}
cfg.JSONOutput, err = flags.GetBool("json")
if err != nil {
return err
}
return nil
}

type MigrateToConfig struct {
storage.BackendOptions
StorageURI string
Recent bool
MigrateTo int
Base bool

Yes bool
DryRun bool
}

const (
flagStorage = "storage"
flagRecent = "recent"
flagTo = "to"
flagBase = "base"
flagYes = "yes"
flagDryRun = "dry-run"
)

func DefineFlagsForMigrateToConfig(flags *pflag.FlagSet) {
storage.DefineFlags(flags)
flags.StringP(flagStorage, "s", "", "the external storage input.")
flags.Bool(flagRecent, true, "migrate to the most recent migration and BASE.")
flags.Int(flagTo, 0, "migrate all migrations from the BASE to the specified sequence number.")
flags.Bool(flagBase, false, "don't merge any migrations, just retry run pending operations in BASE.")
flags.BoolP(flagYes, "y", false, "skip all effect estimating and confirming. execute directly.")
flags.Bool(flagDryRun, false, "do not actually perform the migration, just print the effect.")
}

func (cfg *MigrateToConfig) ParseFromFlags(flags *pflag.FlagSet) error {
var err error
err = cfg.BackendOptions.ParseFromFlags(flags)
if err != nil {
return err
}
cfg.StorageURI, err = flags.GetString(flagStorage)
if err != nil {
return err
}
cfg.Recent, err = flags.GetBool(flagRecent)
if err != nil {
return err
}
cfg.MigrateTo, err = flags.GetInt(flagTo)
if err != nil {
return err
}
cfg.Base, err = flags.GetBool(flagBase)
if err != nil {
return err
}
cfg.Yes, err = flags.GetBool(flagYes)
if err != nil {
return err
}
cfg.DryRun, err = flags.GetBool(flagDryRun)
if err != nil {
return err
}
return nil
}

func (cfg *MigrateToConfig) Verify() error {
if cfg.Recent && cfg.MigrateTo != 0 {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the --%s and --%s flag cannot be used at the same time",
flagRecent, flagTo)
}
if cfg.Base && (cfg.Recent || cfg.MigrateTo != 0) {
return errors.Annotatef(berrors.ErrInvalidArgument,
"the --%s and ( --%s or --%s ) flag cannot be used at the same time",
flagBase, flagTo, flagRecent)
}
return nil
}
53 changes: 53 additions & 0 deletions br/pkg/task/operator/list_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package operator

import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/fatih/color"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/stream"
)

// statusOK make a string like <green>●</green> <bold>{message}</bold>
func statusOK(message string) string {
return color.GreenString("●") + color.New(color.Bold).Sprintf(" %s", message)
}

func RunListMigrations(ctx context.Context, cfg ListMigrationConfig) error {
backend, err := storage.ParseBackend(cfg.StorageURI, &cfg.BackendOptions)
if err != nil {
return err
}
st, err := storage.Create(ctx, backend, false)
if err != nil {
return err
}
ext := stream.MigerationExtension(st)
migs, err := ext.Load(ctx)
if err != nil {
return err
}
if cfg.JSONOutput {
if err := json.NewEncoder(os.Stdout).Encode(migs); err != nil {
return err
}
} else {
console := glue.ConsoleOperations{ConsoleGlue: glue.StdIOGlue{}}
console.Println(statusOK(fmt.Sprintf("Total %d Migrations.", len(migs.Layers)+1)))
console.Printf("> BASE <\n")
tbl := console.CreateTable()
stream.AddMigrationToTable(migs.Base, tbl)
tbl.Print()
for _, t := range migs.Layers {
console.Printf("> %08d <\n", t.SeqNum)
tbl := console.CreateTable()
stream.AddMigrationToTable(&t.Content, tbl)
tbl.Print()
}
}
return nil
}
Loading
Loading