Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#46726
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lance6716 authored and ti-chi-bot committed Sep 7, 2023
1 parent 4350f8d commit dd323fa
Show file tree
Hide file tree
Showing 22 changed files with 334 additions and 36 deletions.
6 changes: 6 additions & 0 deletions br/pkg/lightning/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ func InitLogger(cfg *Config, _ string) error {
tidbLogCfg := logutil.LogConfig{}
// Disable annoying TiDB Log.
// TODO: some error logs outputs randomly, we need to fix them in TiDB.
<<<<<<< HEAD
tidbLogCfg.Level = "fatal"
=======
// this LEVEL only affects SlowQueryLogger, later ReplaceGlobals will overwrite it.
tidbLogCfg.Level = "debug"
// this also init GRPCLogger, controlled by GRPC_DEBUG env.
>>>>>>> 41d1ec0267e (lightning: always get latest PD leader when access PD after initialized (#46726))
err := logutil.InitLogger(&tidbLogCfg)
if err != nil {
return errors.Trace(err)
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func TestCheckCSVHeader(t *testing.T) {
dbMetas,
preInfoGetter,
nil,
nil,
)
preInfoGetter.dbInfosCache = rc.dbInfos
err = rc.checkCSVHeader(ctx)
Expand Down Expand Up @@ -465,6 +466,7 @@ func TestCheckTableEmpty(t *testing.T) {
dbMetas,
preInfoGetter,
nil,
nil,
)

rc := &Controller{
Expand Down Expand Up @@ -622,6 +624,7 @@ func TestLocalResource(t *testing.T) {
nil,
preInfoGetter,
nil,
nil,
)
rc := &Controller{
cfg: cfg,
Expand Down
44 changes: 39 additions & 5 deletions br/pkg/lightning/restore/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,31 @@ func WithPrecheckKey(ctx context.Context, key precheckContextKey, val any) conte
}

type PrecheckItemBuilder struct {
<<<<<<< HEAD:br/pkg/lightning/restore/precheck.go
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
preInfoGetter PreRestoreInfoGetter
checkpointsDB checkpoints.DB
}

func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
=======
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
preInfoGetter PreImportInfoGetter
checkpointsDB checkpoints.DB
pdLeaderAddrGetter func() string
}

// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
// pdCli **must not** be nil for local backend
func NewPrecheckItemBuilderFromConfig(
ctx context.Context,
cfg *config.Config,
pdCli pd.Client,
opts ...ropts.PrecheckItemBuilderOption,
) (*PrecheckItemBuilder, error) {
>>>>>>> 41d1ec0267e (lightning: always get latest PD leader when access PD after initialized (#46726)):br/pkg/lightning/importer/precheck.go
var gerr error
builderCfg := new(ropts.PrecheckItemBuilderConfig)
for _, o := range opts {
Expand Down Expand Up @@ -98,20 +116,29 @@ func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, p
if err != nil {
return nil, errors.Trace(err)
}
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb), gerr
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdCli), gerr
}

func NewPrecheckItemBuilder(
cfg *config.Config,
dbMetas []*mydump.MDDatabaseMeta,
preInfoGetter PreRestoreInfoGetter,
checkpointsDB checkpoints.DB,
pdCli pd.Client,
) *PrecheckItemBuilder {
leaderAddrGetter := func() string {
return cfg.TiDB.PdAddr
}
// in tests we may not have a pdCli
if pdCli != nil {
leaderAddrGetter = pdCli.GetLeaderAddr
}
return &PrecheckItemBuilder{
cfg: cfg,
dbMetas: dbMetas,
preInfoGetter: preInfoGetter,
checkpointsDB: checkpointsDB,
cfg: cfg,
dbMetas: dbMetas,
preInfoGetter: preInfoGetter,
checkpointsDB: checkpointsDB,
pdLeaderAddrGetter: leaderAddrGetter,
}
}

Expand Down Expand Up @@ -139,10 +166,17 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID CheckItemID) (PrecheckIt
return NewClusterVersionCheckItem(b.preInfoGetter, b.dbMetas), nil
case CheckLocalDiskPlacement:
return NewLocalDiskPlacementCheckItem(b.cfg), nil
<<<<<<< HEAD:br/pkg/lightning/restore/precheck.go
case CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter), nil
case CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg), nil
=======
case precheck.CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil
case precheck.CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg, b.pdLeaderAddrGetter), nil
>>>>>>> 41d1ec0267e (lightning: always get latest PD leader when access PD after initialized (#46726)):br/pkg/lightning/importer/precheck.go
default:
return nil, errors.Errorf("unsupported check item: %v", checkID)
}
Expand Down
24 changes: 17 additions & 7 deletions br/pkg/lightning/restore/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,17 +681,23 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo
// CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let
// caller override the Instruction message.
type CDCPITRCheckItem struct {
cfg *config.Config
Instruction string
cfg *config.Config
Instruction string
leaderAddrGetter func() string
// used in test
etcdCli *clientv3.Client
}

// NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR.
<<<<<<< HEAD:br/pkg/lightning/restore/precheck_impl.go
func NewCDCPITRCheckItem(cfg *config.Config) PrecheckItem {
=======
func NewCDCPITRCheckItem(cfg *config.Config, leaderAddrGetter func() string) precheck.Checker {
>>>>>>> 41d1ec0267e (lightning: always get latest PD leader when access PD after initialized (#46726)):br/pkg/lightning/importer/precheck_impl.go
return &CDCPITRCheckItem{
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
leaderAddrGetter: leaderAddrGetter,
}
}

Expand All @@ -700,7 +706,11 @@ func (ci *CDCPITRCheckItem) GetCheckItemID() CheckItemID {
return CheckTargetUsingCDCPITR
}

func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) {
func dialEtcdWithCfg(
ctx context.Context,
cfg *config.Config,
leaderAddr string,
) (*clientv3.Client, error) {
cfg2, err := cfg.ToTLS()
if err != nil {
return nil, err
Expand All @@ -709,7 +719,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client,

return clientv3.New(clientv3.Config{
TLS: tlsConfig,
Endpoints: []string{cfg.TiDB.PdAddr},
Endpoints: []string{leaderAddr},
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
Expand Down Expand Up @@ -740,7 +750,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*CheckResult, error) {

if ci.etcdCli == nil {
var err error
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg)
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg, ci.leaderAddrGetter())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/precheck_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() {
Backend: config.BackendLocal,
},
}
ci := NewCDCPITRCheckItem(cfg)
ci := NewCDCPITRCheckItem(cfg, nil)
checker := ci.(*CDCPITRCheckItem)
checker.etcdCli = testEtcdCluster.RandClient()
result, err := ci.Check(ctx)
Expand Down
17 changes: 17 additions & 0 deletions br/pkg/lightning/restore/precheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestPrecheckBuilderBasic(t *testing.T) {

preInfoGetter, err := NewPreRestoreInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil)
require.NoError(t, err)
<<<<<<< HEAD:br/pkg/lightning/restore/precheck_test.go
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil)
for _, checkItemID := range []CheckItemID{
CheckLargeDataFile,
Expand All @@ -45,6 +46,22 @@ func TestPrecheckBuilderBasic(t *testing.T) {
CheckTargetClusterVersion,
CheckLocalDiskPlacement,
CheckLocalTempKVDir,
=======
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil)
for _, checkItemID := range []precheck.CheckItemID{
precheck.CheckLargeDataFile,
precheck.CheckSourcePermission,
precheck.CheckTargetTableEmpty,
precheck.CheckSourceSchemaValid,
precheck.CheckCheckpoints,
precheck.CheckCSVHeader,
precheck.CheckTargetClusterSize,
precheck.CheckTargetClusterEmptyRegion,
precheck.CheckTargetClusterRegionDist,
precheck.CheckTargetClusterVersion,
precheck.CheckLocalDiskPlacement,
precheck.CheckLocalTempKVDir,
>>>>>>> 41d1ec0267e (lightning: always get latest PD leader when access PD after initialized (#46726)):br/pkg/lightning/importer/precheck_test.go
} {
theChecker, err := theCheckBuilder.BuildPrecheckItem(checkItemID)
require.NoError(t, err)
Expand Down
46 changes: 43 additions & 3 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func NewRestoreControllerWithPauser(
}

preCheckBuilder := NewPrecheckItemBuilder(
cfg, p.DBMetas, preInfoGetter, cpdb,
cfg, p.DBMetas, preInfoGetter, cpdb, pdCli,
)

rc := &Controller{
Expand Down Expand Up @@ -462,6 +462,8 @@ func (rc *Controller) Close() {
}

func (rc *Controller) Run(ctx context.Context) error {
failpoint.Inject("beforeRun", func() {})

opts := []func(context.Context) error{
rc.setGlobalVariables,
rc.restoreSchema,
Expand Down Expand Up @@ -1351,7 +1353,7 @@ const (

func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.cfg.TiDB.PdAddr}, tlsOpt)
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.pdCli.GetLeaderAddr()}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1512,8 +1514,17 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
}

// Disable GC because TiDB enables GC already.

currentLeaderAddr := rc.pdCli.GetLeaderAddr()
// remove URL scheme
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "http://")
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "https://")
kvStore, err = driver.TiKVDriver{}.OpenWithOptions(
<<<<<<< HEAD:br/pkg/lightning/restore/restore.go
fmt.Sprintf("tikv://%s?disableGC=true", rc.cfg.TiDB.PdAddr),
=======
fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", currentLeaderAddr, rc.keyspaceName),
>>>>>>> 41d1ec0267e (lightning: always get latest PD leader when access PD after initialized (#46726)):br/pkg/lightning/importer/import.go
driver.WithSecurity(rc.tls.ToTiKVSecurityConfig()),
)
if err != nil {
Expand Down Expand Up @@ -1691,6 +1702,35 @@ func (rc *Controller) restoreTables(ctx context.Context) (finalErr error) {
return nil
}

<<<<<<< HEAD:br/pkg/lightning/restore/restore.go
=======
func (rc *Controller) registerTaskToPD(ctx context.Context) (undo func(), _ error) {
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg, rc.pdCli.GetLeaderAddr())
if err != nil {
return nil, errors.Trace(err)
}

register := utils.NewTaskRegister(etcdCli, utils.RegisterLightning, fmt.Sprintf("lightning-%s", uuid.New()))

undo = func() {
closeCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := register.Close(closeCtx); err != nil {
log.L().Warn("failed to unregister task", zap.Error(err))
}
if err := etcdCli.Close(); err != nil {
log.L().Warn("failed to close etcd client", zap.Error(err))
}
}
if err := register.RegisterTask(ctx); err != nil {
undo()
return nil, errors.Trace(err)
}
return undo, nil
}

>>>>>>> 41d1ec0267e (lightning: always get latest PD leader when access PD after initialized (#46726)):br/pkg/lightning/importer/import.go
func addExtendDataForCheckpoint(
ctx context.Context,
cfg *config.Config,
Expand Down Expand Up @@ -2114,7 +2154,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
rc.status.TotalFileSize.Store(estimatedSizeResult.SizeWithoutIndex)
}
if isLocalBackend(rc.cfg) {
pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr,
pdController, err := pdutil.NewPdController(ctx, rc.pdCli.GetLeaderAddr(),
rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption())
if err != nil {
return common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestPreCheckFailed(t *testing.T) {
dbMetas: make([]*mydump.MDDatabaseMeta, 0),
}
cpdb := panicCheckpointDB{}
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb)
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil)
ctl := &Controller{
cfg: cfg,
saveCpCh: make(chan saveCp),
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/restore/table_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
targetInfoGetter: targetInfoGetter,
srcStorage: mockStore,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil)
rc := &Controller{
cfg: cfg,
tls: tls,
Expand Down Expand Up @@ -1294,7 +1294,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
targetInfoGetter: targetInfoGetter,
dbMetas: dbMetas,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB())
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil)
rc := &Controller{
cfg: cfg,
tls: tls,
Expand Down Expand Up @@ -1390,7 +1390,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() {
for _, ca := range cases {
template := NewSimpleTemplate()
cfg := &config.Config{Mydumper: config.MydumperRuntime{StrictFormat: ca.strictFormat}}
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil)
rc := &Controller{
cfg: cfg,
checkTemplate: template,
Expand Down
Loading

0 comments on commit dd323fa

Please sign in to comment.