Skip to content

Commit

Permalink
*: Auto refresh PD addrs for PDPlacementManager, PDLabelManager, …
Browse files Browse the repository at this point in the history
…`TiFlashPDPlacementManager` (#33909)

close #33908
  • Loading branch information
lcwangchao authored Apr 13, 2022
1 parent f5c2710 commit 14f4888
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 35 deletions.
32 changes: 13 additions & 19 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,9 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func()
if err != nil {
return nil, err
}
if etcdCli != nil {
is.labelRuleManager = initLabelRuleManager(etcdCli.Endpoints())
is.placementManager = initPlacementManager(etcdCli.Endpoints())
is.tiflashPlacementManager = initTiFlashPlacementManager(etcdCli.Endpoints())
} else {
is.labelRuleManager = initLabelRuleManager([]string{})
is.placementManager = initPlacementManager([]string{})
is.tiflashPlacementManager = initTiFlashPlacementManager([]string{})
}
is.labelRuleManager = initLabelRuleManager(etcdCli)
is.placementManager = initPlacementManager(etcdCli)
is.tiflashPlacementManager = initTiFlashPlacementManager(etcdCli)
setGlobalInfoSyncer(is)
return is, nil
}
Expand All @@ -218,27 +212,27 @@ func (is *InfoSyncer) GetSessionManager() util2.SessionManager {
return is.manager
}

func initLabelRuleManager(addrs []string) LabelRuleManager {
if len(addrs) == 0 {
func initLabelRuleManager(etcdCli *clientv3.Client) LabelRuleManager {
if etcdCli == nil {
return &mockLabelManager{labelRules: map[string][]byte{}}
}
return &PDLabelManager{addrs: addrs}
return &PDLabelManager{etcdCli: etcdCli}
}

func initPlacementManager(addrs []string) PlacementManager {
if len(addrs) == 0 {
func initPlacementManager(etcdCli *clientv3.Client) PlacementManager {
if etcdCli == nil {
return &mockPlacementManager{}
}
return &PDPlacementManager{addrs: addrs}
return &PDPlacementManager{etcdCli: etcdCli}
}

func initTiFlashPlacementManager(addrs []string) TiFlashPlacementManager {
if len(addrs) == 0 {
func initTiFlashPlacementManager(etcdCli *clientv3.Client) TiFlashPlacementManager {
if etcdCli == nil {
m := mockTiFlashPlacementManager{}
return &m
}
logutil.BgLogger().Warn("init TiFlashPlacementManager", zap.Strings("pd addrs", addrs))
return &TiFlashPDPlacementManager{addrs: addrs}
logutil.BgLogger().Warn("init TiFlashPlacementManager", zap.Strings("pd addrs", etcdCli.Endpoints()))
return &TiFlashPDPlacementManager{etcdCli: etcdCli}
}

// GetMockTiFlash can only be used in tests to get MockTiFlash
Expand Down
11 changes: 6 additions & 5 deletions domain/infosync/label_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/tidb/ddl/label"
"github.com/pingcap/tidb/util/pdapi"
clientv3 "go.etcd.io/etcd/client/v3"
)

// LabelRuleManager manages label rules
Expand All @@ -35,7 +36,7 @@ type LabelRuleManager interface {

// PDLabelManager manages rules with pd
type PDLabelManager struct {
addrs []string
etcdCli *clientv3.Client
}

// PutLabelRule implements PutLabelRule
Expand All @@ -44,7 +45,7 @@ func (lm *PDLabelManager) PutLabelRule(ctx context.Context, rule *label.Rule) er
if err != nil {
return err
}
_, err = doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rule"), "POST", bytes.NewReader(r))
_, err = doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rule"), "POST", bytes.NewReader(r))
return err
}

Expand All @@ -55,14 +56,14 @@ func (lm *PDLabelManager) UpdateLabelRules(ctx context.Context, patch *label.Rul
return err
}

_, err = doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules"), "PATCH", bytes.NewReader(r))
_, err = doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules"), "PATCH", bytes.NewReader(r))
return err
}

// GetAllLabelRules implements GetAllLabelRules
func (lm *PDLabelManager) GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) {
var rules []*label.Rule
res, err := doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules"), "GET", nil)
res, err := doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules"), "GET", nil)

if err == nil && res != nil {
err = json.Unmarshal(res, &rules)
Expand All @@ -78,7 +79,7 @@ func (lm *PDLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) (
}

rules := []*label.Rule{}
res, err := doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids))
res, err := doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids))

if err == nil && res != nil {
err = json.Unmarshal(res, &rules)
Expand Down
9 changes: 5 additions & 4 deletions domain/infosync/placement_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/util/pdapi"
clientv3 "go.etcd.io/etcd/client/v3"
)

// PlacementManager manages placement settings
Expand All @@ -37,13 +38,13 @@ type PlacementManager interface {

// PDPlacementManager manages placement with pd
type PDPlacementManager struct {
addrs []string
etcdCli *clientv3.Client
}

// GetRuleBundle is used to get one specific rule bundle from PD.
func (m *PDPlacementManager) GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) {
bundle := &placement.Bundle{ID: name}
res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule", name), "GET", nil)
res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule", name), "GET", nil)
if err == nil && res != nil {
err = json.Unmarshal(res, bundle)
}
Expand All @@ -53,7 +54,7 @@ func (m *PDPlacementManager) GetRuleBundle(ctx context.Context, name string) (*p
// GetAllRuleBundles is used to get all rule bundles from PD. It is used to load full rules from PD while fullload infoschema.
func (m *PDPlacementManager) GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) {
var bundles []*placement.Bundle
res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule"), "GET", nil)
res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule"), "GET", nil)
if err == nil && res != nil {
err = json.Unmarshal(res, &bundles)
}
Expand All @@ -71,7 +72,7 @@ func (m *PDPlacementManager) PutRuleBundles(ctx context.Context, bundles []*plac
return err
}

_, err = doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule")+"?partial=true", "POST", bytes.NewReader(b))
_, err = doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule")+"?partial=true", "POST", bytes.NewReader(b))
return err
}

Expand Down
15 changes: 8 additions & 7 deletions domain/infosync/tiflash_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/pdapi"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

Expand All @@ -60,7 +61,7 @@ type TiFlashPlacementManager interface {

// TiFlashPDPlacementManager manages placement with pd for TiFlash.
type TiFlashPDPlacementManager struct {
addrs []string
etcdCli *clientv3.Client
}

// Close is called to close TiFlashPDPlacementManager.
Expand All @@ -75,7 +76,7 @@ func (m *TiFlashPDPlacementManager) SetPlacementRule(ctx context.Context, rule p
}
j, _ := json.Marshal(rule)
buf := bytes.NewBuffer(j)
res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "rule"), "POST", buf)
res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rule"), "POST", buf)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -87,7 +88,7 @@ func (m *TiFlashPDPlacementManager) SetPlacementRule(ctx context.Context, rule p

// DeletePlacementRule is to delete placement rule for certain group.
func (m *TiFlashPDPlacementManager) DeletePlacementRule(ctx context.Context, group string, ruleID string) error {
res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "rule", group, ruleID), "DELETE", nil)
res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rule", group, ruleID), "DELETE", nil)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -99,7 +100,7 @@ func (m *TiFlashPDPlacementManager) DeletePlacementRule(ctx context.Context, gro

// GetGroupRules to get all placement rule in a certain group.
func (m *TiFlashPDPlacementManager) GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) {
res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "rules", "group", group), "GET", nil)
res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rules", "group", group), "GET", nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -132,7 +133,7 @@ func (m *TiFlashPDPlacementManager) PostAccelerateSchedule(ctx context.Context,
return errors.Trace(err)
}
buf := bytes.NewBuffer(j)
res, err := doRequest(ctx, m.addrs, "/pd/api/v1/regions/accelerate-schedule", "POST", buf)
res, err := doRequest(ctx, m.etcdCli.Endpoints(), "/pd/api/v1/regions/accelerate-schedule", "POST", buf)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -152,7 +153,7 @@ func (m *TiFlashPDPlacementManager) GetPDRegionRecordStats(ctx context.Context,
p := fmt.Sprintf("/pd/api/v1/stats/region?start_key=%s&end_key=%s",
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)))
res, err := doRequest(ctx, m.addrs, p, "GET", nil)
res, err := doRequest(ctx, m.etcdCli.Endpoints(), p, "GET", nil)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -170,7 +171,7 @@ func (m *TiFlashPDPlacementManager) GetPDRegionRecordStats(ctx context.Context,
// GetStoresStat gets the TiKV store information by accessing PD's api.
func (m *TiFlashPDPlacementManager) GetStoresStat(ctx context.Context) (*helper.StoresStat, error) {
var storesStat helper.StoresStat
res, err := doRequest(ctx, m.addrs, pdapi.Stores, "GET", nil)
res, err := doRequest(ctx, m.etcdCli.Endpoints(), pdapi.Stores, "GET", nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

0 comments on commit 14f4888

Please sign in to comment.