Skip to content

Commit

Permalink
Merge branch 'release-6.5' into cherry-pick-58211-to-release-6.5
Browse files Browse the repository at this point in the history
  • Loading branch information
RidRisR committed Feb 18, 2025
2 parents 1ed0d87 + cd63397 commit 253337e
Show file tree
Hide file tree
Showing 203 changed files with 4,130 additions and 900 deletions.
1 change: 1 addition & 0 deletions .bazelversion
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
7.4.1
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3603,8 +3603,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:0YcirnuxtXC9eQRb231im1M5w/n7JFuOo0IgE/K9ffM=",
version = "v2.0.4-0.20241125064444-5f59e4e34c62",
sum = "h1:YGhrcemIm3lAQ79g8Lb7aF/PhW6e7M5owperqKdLrb8=",
version = "v2.0.4-0.20250217042910-ad4338d3d0f9",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
1 change: 1 addition & 0 deletions bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"//util/stmtsummary",
"//util/table-filter",
"//util/timeutil",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_exp//maps",
"@org_uber_go_zap//:zap",
],
Expand Down
9 changes: 8 additions & 1 deletion bindinfo/bind_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -161,7 +162,13 @@ func (br *BindRecord) FindBinding(hint string) *Binding {

// prepareHints builds ID and Hint for BindRecord. If sctx is not nil, we check if
// the BindSQL is still valid.
func (br *BindRecord) prepareHints(sctx sessionctx.Context) error {
func (br *BindRecord) prepareHints(sctx sessionctx.Context) (rerr error) {
defer func() {
if r := recover(); r != nil {
rerr = errors.Errorf("panic when preparing hints for binding panic: %v", r)
}
}()

p := parser.New()
for i, bind := range br.Bindings {
if (bind.Hint != nil && bind.ID != "") || bind.Status == deleted {
Expand Down
10 changes: 9 additions & 1 deletion br/cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (

func runBackupCommand(command *cobra.Command, cmdName string) error {
cfg := task.BackupConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
if err := cfg.ParseFromFlags(command.Flags(), false); err != nil {
command.SilenceUsage = false
return errors.Trace(err)
}
overrideDefaultBackupConfigIfNeeded(&cfg, command)

ctx := GetDefaultContext()
if cfg.EnableOpenTracing {
Expand Down Expand Up @@ -165,3 +166,10 @@ func newRawBackupCommand() *cobra.Command {
task.DefineRawBackupFlags(command)
return command
}

func overrideDefaultBackupConfigIfNeeded(config *task.BackupConfig, cmd *cobra.Command) {
// override only if flag not set by user
if !cmd.Flags().Changed(task.FlagChecksum) {
config.Checksum = false
}
}
6 changes: 4 additions & 2 deletions br/cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func timestampLogFileName() string {
return filepath.Join(os.TempDir(), time.Now().Format("br.log.2006-01-02T15.04.05Z0700"))
}

// AddFlags adds flags to the given cmd.
func AddFlags(cmd *cobra.Command) {
// DefineCommonFlags defines the common flags for all BR cmd operation.
func DefineCommonFlags(cmd *cobra.Command) {
cmd.Version = build.Info()
cmd.Flags().BoolP(flagVersion, flagVersionShort, false, "Display version information about BR")
cmd.SetVersionTemplate("{{printf \"%s\" .Version}}\n")
Expand All @@ -97,6 +97,8 @@ func AddFlags(cmd *cobra.Command) {
"Set whether to redact sensitive info in log")
cmd.PersistentFlags().String(FlagStatusAddr, "",
"Set the HTTP listening address for the status report service. Set to empty string to disable")

// defines BR task common flags, this is shared by cmd and sql(brie)
task.DefineCommonFlags(cmd.PersistentFlags())

cmd.PersistentFlags().StringP(FlagSlowLogFile, "", "",
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func main() {
TraverseChildren: true,
SilenceUsage: true,
}
AddFlags(rootCmd)
DefineCommonFlags(rootCmd)
SetDefaultContext(ctx)
rootCmd.AddCommand(
NewDebugCommand(),
Expand Down
8 changes: 4 additions & 4 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

func runRestoreCommand(command *cobra.Command, cmdName string) error {
cfg := task.RestoreConfig{Config: task.Config{LogProgress: HasLogFile()}}
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
if err := cfg.ParseFromFlags(command.Flags(), false); err != nil {
command.SilenceUsage = false
return errors.Trace(err)
}
Expand Down Expand Up @@ -72,12 +72,12 @@ func printWorkaroundOnFullRestoreError(command *cobra.Command, err error) {
fmt.Println("#######################################################################")
switch {
case errors.ErrorEqual(err, berrors.ErrRestoreNotFreshCluster):
fmt.Println("# the target cluster is not fresh, br cannot restore system tables.")
fmt.Println("# the target cluster is not fresh, cannot restore.")
fmt.Println("# you can drop existing databases and tables and start restore again")
case errors.ErrorEqual(err, berrors.ErrRestoreIncompatibleSys):
fmt.Println("# the target cluster is not compatible with the backup data,")
fmt.Println("# br cannot restore system tables.")
fmt.Println("# you can remove 'with-sys-table' flag to skip restoring system tables")
}
fmt.Println("# you can remove 'with-sys-table' flag to skip restoring system tables")
fmt.Println("#######################################################################")
}

Expand Down
2 changes: 2 additions & 0 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/br/pkg/trace"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version/build"
"github.com/pingcap/tidb/session"
"github.com/spf13/cobra"
"sourcegraph.com/sourcegraph/appdash"
)
Expand All @@ -38,6 +39,7 @@ func NewStreamCommand() *cobra.Command {
build.LogInfo(build.BR)
utils.LogEnvVariables()
task.LogArguments(c)
session.DisableStats4Test()
return nil
},
}
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,9 @@ func (bc *Client) findRegionLeader(ctx context.Context, key []byte, isRawKv bool
// in order to find the correct region.
key = codec.EncodeBytesExt([]byte{}, key, isRawKv)
for i := 1; i < 100; i++ {
if ctx.Err() != nil {
return nil, ctx.Err()
}
// better backoff.
region, err := bc.mgr.GetPDClient().GetRegion(ctx, key)
if err != nil || region == nil {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (push *pushDown) pushBackup(
}
failpoint.Inject("backup-timeout-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-timeout-error injected.", zap.String("msg", msg))
logutil.CL(ctx).Info("failpoint backup-timeout-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (ss *Schemas) BackupSchemas(
}

var checksum *checkpoint.ChecksumItem
var exists bool = false
var exists = false
if ss.checkpointChecksum != nil && schema.tableInfo != nil {
checksum, exists = ss.checkpointChecksum[schema.tableInfo.ID]
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (ss *Schemas) BackupSchemas(
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
zap.Duration("calculate-take", calculateCost),
zap.Duration("TimeTaken", calculateCost),
zap.Duration("flush-take", flushCost))
}
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
ErrRestoreIncompatibleSys = errors.Normalize("incompatible system table", errors.RFCCodeText("BR:Restore:ErrRestoreIncompatibleSys"))
ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable"))
ErrDatabasesAlreadyExisted = errors.Normalize("databases already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrDatabasesAlreadyExisted"))
ErrTablesAlreadyExisted = errors.Normalize("tables already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrTablesAlreadyExisted"))

// ErrStreamLogTaskExist is the error when stream log task already exists, because of supporting single task currently.
ErrStreamLogTaskExist = errors.Normalize("stream task already exists", errors.RFCCodeText("BR:Stream:ErrStreamLogTaskExist"))
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ import (
pd "github.com/tikv/pd/client"
)

type GlueClient int

const (
ClientCLP GlueClient = iota
ClientSql
)

// Glue is an abstraction of TiDB function calls used in BR.
type Glue interface {
GetDomain(store kv.Storage) (*domain.Domain, error)
Expand All @@ -36,6 +43,9 @@ type Glue interface {
// we can close domain as soon as possible.
// and we must reuse the exists session and don't close it in SQL backup job.
UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se Session) error) error

// GetClient returns the client type of the glue
GetClient() GlueClient
}

// Session is an abstraction of the session.Session interface.
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue
return nil
}

func (Glue) GetClient() glue.GlueClient {
return glue.ClientCLP
}

// GetSessionCtx implements glue.Glue
func (gs *tidbSession) GetSessionCtx() sessionctx.Context {
return gs.se
Expand Down Expand Up @@ -349,3 +353,7 @@ func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func
}
return fn(glueSession)
}

func (m *MockGlue) GetClient() glue.GlueClient {
return glue.ClientCLP
}
4 changes: 4 additions & 0 deletions br/pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ func (Glue) GetVersion() string {
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
return nil
}

func (Glue) GetClient() glue.GlueClient {
return glue.ClientCLP
}
15 changes: 7 additions & 8 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type autoIDConverter func(int64) int64

type tableKVEncoder struct {
tbl table.Table
columns []*table.Column
autoRandomColID int64
se *session
recordCache []types.Datum
Expand Down Expand Up @@ -120,6 +121,7 @@ func NewTableKVEncoder(

return &tableKVEncoder{
tbl: tbl,
columns: cols,
autoRandomColID: autoRandomColID,
se: se,
genCols: genCols,
Expand Down Expand Up @@ -380,7 +382,6 @@ func (kvcodec *tableKVEncoder) Encode(
// when generating error such as mysql.ErrDataOutOfRange, the data will be part of the error, causing the buf
// unable to release. So we truncate the warnings here.
defer kvcodec.se.vars.StmtCtx.TruncateWarnings(0)
cols := kvcodec.tbl.Cols()

var value types.Datum
var err error
Expand All @@ -390,11 +391,11 @@ func (kvcodec *tableKVEncoder) Encode(
if kvcodec.recordCache != nil {
record = kvcodec.recordCache
} else {
record = make([]types.Datum, 0, len(cols)+1)
record = make([]types.Datum, 0, len(kvcodec.columns)+1)
}

meta := kvcodec.tbl.Meta()
for i, col := range cols {
for i, col := range kvcodec.columns {
var theDatum *types.Datum = nil
j := columnPermutation[i]
if j >= 0 && j < len(row) {
Expand Down Expand Up @@ -424,7 +425,7 @@ func (kvcodec *tableKVEncoder) Encode(

if common.TableHasAutoRowID(meta) {
rowValue := rowID
j := columnPermutation[len(cols)]
j := columnPermutation[len(kvcodec.columns)]
if j >= 0 && j < len(row) {
value, err = table.CastValue(kvcodec.se, row[j], ExtraHandleColumnInfo, false, false)
rowValue = value.GetInt64()
Expand All @@ -443,7 +444,7 @@ func (kvcodec *tableKVEncoder) Encode(
}

if len(kvcodec.genCols) > 0 {
if errCol, err := evaluateGeneratedColumns(kvcodec.se, record, cols, kvcodec.genCols); err != nil {
if errCol, err := evaluateGeneratedColumns(kvcodec.se, record, kvcodec.columns, kvcodec.genCols); err != nil {
return nil, logEvalGenExprFailed(logger, row, errCol, err)
}
}
Expand Down Expand Up @@ -494,11 +495,9 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa
err error
)

cols := kvcodec.tbl.Cols()

// Since this method is only called when iterating the columns in the `Encode()` method,
// we can assume that the `colIndex` always have a valid input
col := cols[colIndex]
col := kvcodec.columns[colIndex]

isBadNullValue := false
if inputDatum != nil {
Expand Down
5 changes: 4 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ const (
// maxWriteAndIngestRetryTimes is the max retry times for write and ingest.
// A large retry times is for tolerating tikv cluster failures.
maxWriteAndIngestRetryTimes = 30
maxRetryBackoffTime = 30 * time.Second
// Unlimited RPC receive message size for TiKV importer
unlimitedRPCRecvMsgSize = math.MaxInt32
maxRetryBackoffTime = 30 * time.Second

gRPCKeepAliveTime = 10 * time.Minute
gRPCKeepAliveTimeout = 5 * time.Minute
Expand Down Expand Up @@ -168,6 +170,7 @@ func (f *importClientFactoryImpl) makeConn(ctx context.Context, storeID uint64)
ctx,
addr,
opt,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(unlimitedRPCRecvMsgSize)),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bfConf}),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: gRPCKeepAliveTime,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func isSingleRetryableError(err error) bool {
case *mysql.MySQLError:
switch nerr.Number {
// ErrLockDeadlock can retry to commit while meet deadlock
case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrWriteConflict, tmysql.ErrWriteConflictInTiDB,
case tmysql.ErrUnknown, tmysql.ErrLockDeadlock, tmysql.ErrLockWaitTimeout, tmysql.ErrWriteConflict, tmysql.ErrWriteConflictInTiDB,
tmysql.ErrPDServerTimeout, tmysql.ErrTiKVServerTimeout, tmysql.ErrTiKVServerBusy, tmysql.ErrResolveLockTimeout,
tmysql.ErrRegionUnavailable, tmysql.ErrInfoSchemaExpired, tmysql.ErrInfoSchemaChanged, tmysql.ErrTxnRetryable:
return true
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//br/pkg/lightning/log",
"//br/pkg/version/build",
"//config",
"//parser/ast",
"//parser/mysql",
"//util",
"//util/table-filter",
Expand Down
11 changes: 11 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util"
filter "github.com/pingcap/tidb/util/table-filter"
Expand Down Expand Up @@ -162,6 +163,16 @@ func (cfg *Config) String() string {
return string(bytes)
}

// Redact redacts the sensitive information.
func (cfg *Config) Redact() string {
originDir := cfg.Mydumper.SourceDir
defer func() {
cfg.Mydumper.SourceDir = originDir
}()
cfg.Mydumper.SourceDir = ast.RedactURL(cfg.Mydumper.SourceDir)
return cfg.String()
}

func (cfg *Config) ToTLS() (*common.TLS, error) {
hostPort := net.JoinHostPort(cfg.TiDB.Host, strconv.Itoa(cfg.TiDB.StatusPort))
return common.NewTLS(
Expand Down
27 changes: 27 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,3 +1019,30 @@ func TestCreateSeveralConfigsWithDifferentFilters(t *testing.T) {
))
require.True(t, common.StringSliceEqual(config.GetDefaultFilter(), originalDefaultCfg))
}

func TestRedactConfig(t *testing.T) {
tests := []struct {
origin string
redact string
}{
{"", ""},
{":", ":"},
{"~/file", "~/file"},
{"gs://bucket/file", "gs://bucket/file"},
{"gs://bucket/file?access-key=123", "gs://bucket/file?access-key=123"},
{"gs://bucket/file?secret-access-key=123", "gs://bucket/file?secret-access-key=123"},
{"s3://bucket/file", "s3://bucket/file"},
{"s3://bucket/file?other-key=123", "s3://bucket/file?other-key=123"},
{"s3://bucket/file?access-key=123", "s3://bucket/file?access-key=xxxxxx"},
{"s3://bucket/file?secret-access-key=123", "s3://bucket/file?secret-access-key=xxxxxx"},
{"s3://bucket/file?access_key=123", "s3://bucket/file?access_key=xxxxxx"},
{"s3://bucket/file?secret_access_key=123", "s3://bucket/file?secret_access_key=xxxxxx"},
}
for _, tt := range tests {
cfg := config.NewConfig()
cfg.Mydumper.SourceDir = tt.origin

require.Contains(t, cfg.Redact(), tt.redact)
require.Contains(t, cfg.String(), tt.origin)
}
}
Loading

0 comments on commit 253337e

Please sign in to comment.