diff --git a/domain/infosync/info.go b/domain/infosync/info.go index bd44b41696f5b..04f7eb051e625 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -183,15 +183,9 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() if err != nil { return nil, err } - if etcdCli != nil { - is.labelRuleManager = initLabelRuleManager(etcdCli.Endpoints()) - is.placementManager = initPlacementManager(etcdCli.Endpoints()) - is.tiflashPlacementManager = initTiFlashPlacementManager(etcdCli.Endpoints()) - } else { - is.labelRuleManager = initLabelRuleManager([]string{}) - is.placementManager = initPlacementManager([]string{}) - is.tiflashPlacementManager = initTiFlashPlacementManager([]string{}) - } + is.labelRuleManager = initLabelRuleManager(etcdCli) + is.placementManager = initPlacementManager(etcdCli) + is.tiflashPlacementManager = initTiFlashPlacementManager(etcdCli) setGlobalInfoSyncer(is) return is, nil } @@ -218,27 +212,27 @@ func (is *InfoSyncer) GetSessionManager() util2.SessionManager { return is.manager } -func initLabelRuleManager(addrs []string) LabelRuleManager { - if len(addrs) == 0 { +func initLabelRuleManager(etcdCli *clientv3.Client) LabelRuleManager { + if etcdCli == nil { return &mockLabelManager{labelRules: map[string][]byte{}} } - return &PDLabelManager{addrs: addrs} + return &PDLabelManager{etcdCli: etcdCli} } -func initPlacementManager(addrs []string) PlacementManager { - if len(addrs) == 0 { +func initPlacementManager(etcdCli *clientv3.Client) PlacementManager { + if etcdCli == nil { return &mockPlacementManager{} } - return &PDPlacementManager{addrs: addrs} + return &PDPlacementManager{etcdCli: etcdCli} } -func initTiFlashPlacementManager(addrs []string) TiFlashPlacementManager { - if len(addrs) == 0 { +func initTiFlashPlacementManager(etcdCli *clientv3.Client) TiFlashPlacementManager { + if etcdCli == nil { m := mockTiFlashPlacementManager{} return &m } - logutil.BgLogger().Warn("init TiFlashPlacementManager", zap.Strings("pd addrs", addrs)) - return &TiFlashPDPlacementManager{addrs: addrs} + logutil.BgLogger().Warn("init TiFlashPlacementManager", zap.Strings("pd addrs", etcdCli.Endpoints())) + return &TiFlashPDPlacementManager{etcdCli: etcdCli} } // GetMockTiFlash can only be used in tests to get MockTiFlash diff --git a/domain/infosync/label_manager.go b/domain/infosync/label_manager.go index 663d3f01976fc..c6a3ef98da268 100644 --- a/domain/infosync/label_manager.go +++ b/domain/infosync/label_manager.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/ddl/label" "github.com/pingcap/tidb/util/pdapi" + clientv3 "go.etcd.io/etcd/client/v3" ) // LabelRuleManager manages label rules @@ -35,7 +36,7 @@ type LabelRuleManager interface { // PDLabelManager manages rules with pd type PDLabelManager struct { - addrs []string + etcdCli *clientv3.Client } // PutLabelRule implements PutLabelRule @@ -44,7 +45,7 @@ func (lm *PDLabelManager) PutLabelRule(ctx context.Context, rule *label.Rule) er if err != nil { return err } - _, err = doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rule"), "POST", bytes.NewReader(r)) + _, err = doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rule"), "POST", bytes.NewReader(r)) return err } @@ -55,14 +56,14 @@ func (lm *PDLabelManager) UpdateLabelRules(ctx context.Context, patch *label.Rul return err } - _, err = doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules"), "PATCH", bytes.NewReader(r)) + _, err = doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules"), "PATCH", bytes.NewReader(r)) return err } // GetAllLabelRules implements GetAllLabelRules func (lm *PDLabelManager) GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) { var rules []*label.Rule - res, err := doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules"), "GET", nil) + res, err := doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules"), "GET", nil) if err == nil && res != nil { err = json.Unmarshal(res, &rules) @@ -78,7 +79,7 @@ func (lm *PDLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) ( } rules := []*label.Rule{} - res, err := doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids)) + res, err := doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids)) if err == nil && res != nil { err = json.Unmarshal(res, &rules) diff --git a/domain/infosync/placement_manager.go b/domain/infosync/placement_manager.go index 7c4db7dcd61e3..0a36de70715a8 100644 --- a/domain/infosync/placement_manager.go +++ b/domain/infosync/placement_manager.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/util/pdapi" + clientv3 "go.etcd.io/etcd/client/v3" ) // PlacementManager manages placement settings @@ -37,13 +38,13 @@ type PlacementManager interface { // PDPlacementManager manages placement with pd type PDPlacementManager struct { - addrs []string + etcdCli *clientv3.Client } // GetRuleBundle is used to get one specific rule bundle from PD. func (m *PDPlacementManager) GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) { bundle := &placement.Bundle{ID: name} - res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule", name), "GET", nil) + res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule", name), "GET", nil) if err == nil && res != nil { err = json.Unmarshal(res, bundle) } @@ -53,7 +54,7 @@ func (m *PDPlacementManager) GetRuleBundle(ctx context.Context, name string) (*p // GetAllRuleBundles is used to get all rule bundles from PD. It is used to load full rules from PD while fullload infoschema. func (m *PDPlacementManager) GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) { var bundles []*placement.Bundle - res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule"), "GET", nil) + res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule"), "GET", nil) if err == nil && res != nil { err = json.Unmarshal(res, &bundles) } @@ -71,7 +72,7 @@ func (m *PDPlacementManager) PutRuleBundles(ctx context.Context, bundles []*plac return err } - _, err = doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule")+"?partial=true", "POST", bytes.NewReader(b)) + _, err = doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule")+"?partial=true", "POST", bytes.NewReader(b)) return err } diff --git a/domain/infosync/tiflash_manager.go b/domain/infosync/tiflash_manager.go index 4f655c3206df6..53c664091b6cc 100644 --- a/domain/infosync/tiflash_manager.go +++ b/domain/infosync/tiflash_manager.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/pdapi" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -60,7 +61,7 @@ type TiFlashPlacementManager interface { // TiFlashPDPlacementManager manages placement with pd for TiFlash. type TiFlashPDPlacementManager struct { - addrs []string + etcdCli *clientv3.Client } // Close is called to close TiFlashPDPlacementManager. @@ -75,7 +76,7 @@ func (m *TiFlashPDPlacementManager) SetPlacementRule(ctx context.Context, rule p } j, _ := json.Marshal(rule) buf := bytes.NewBuffer(j) - res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "rule"), "POST", buf) + res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rule"), "POST", buf) if err != nil { return errors.Trace(err) } @@ -87,7 +88,7 @@ func (m *TiFlashPDPlacementManager) SetPlacementRule(ctx context.Context, rule p // DeletePlacementRule is to delete placement rule for certain group. func (m *TiFlashPDPlacementManager) DeletePlacementRule(ctx context.Context, group string, ruleID string) error { - res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "rule", group, ruleID), "DELETE", nil) + res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rule", group, ruleID), "DELETE", nil) if err != nil { return errors.Trace(err) } @@ -99,7 +100,7 @@ func (m *TiFlashPDPlacementManager) DeletePlacementRule(ctx context.Context, gro // GetGroupRules to get all placement rule in a certain group. func (m *TiFlashPDPlacementManager) GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) { - res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "rules", "group", group), "GET", nil) + res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rules", "group", group), "GET", nil) if err != nil { return nil, errors.Trace(err) } @@ -132,7 +133,7 @@ func (m *TiFlashPDPlacementManager) PostAccelerateSchedule(ctx context.Context, return errors.Trace(err) } buf := bytes.NewBuffer(j) - res, err := doRequest(ctx, m.addrs, "/pd/api/v1/regions/accelerate-schedule", "POST", buf) + res, err := doRequest(ctx, m.etcdCli.Endpoints(), "/pd/api/v1/regions/accelerate-schedule", "POST", buf) if err != nil { return errors.Trace(err) } @@ -152,7 +153,7 @@ func (m *TiFlashPDPlacementManager) GetPDRegionRecordStats(ctx context.Context, p := fmt.Sprintf("/pd/api/v1/stats/region?start_key=%s&end_key=%s", url.QueryEscape(string(startKey)), url.QueryEscape(string(endKey))) - res, err := doRequest(ctx, m.addrs, p, "GET", nil) + res, err := doRequest(ctx, m.etcdCli.Endpoints(), p, "GET", nil) if err != nil { return errors.Trace(err) } @@ -170,7 +171,7 @@ func (m *TiFlashPDPlacementManager) GetPDRegionRecordStats(ctx context.Context, // GetStoresStat gets the TiKV store information by accessing PD's api. func (m *TiFlashPDPlacementManager) GetStoresStat(ctx context.Context) (*helper.StoresStat, error) { var storesStat helper.StoresStat - res, err := doRequest(ctx, m.addrs, pdapi.Stores, "GET", nil) + res, err := doRequest(ctx, m.etcdCli.Endpoints(), pdapi.Stores, "GET", nil) if err != nil { return nil, errors.Trace(err) }