diff --git a/client/http/api.go b/client/http/api.go index f744fd0c395..2153cd286e8 100644 --- a/client/http/api.go +++ b/client/http/api.go @@ -38,6 +38,9 @@ const ( store = "/pd/api/v1/store" Stores = "/pd/api/v1/stores" StatsRegion = "/pd/api/v1/stats/region" + membersPrefix = "/pd/api/v1/members" + leaderPrefix = "/pd/api/v1/leader" + transferLeader = "/pd/api/v1/leader/transfer" // Config Config = "/pd/api/v1/config" ClusterVersion = "/pd/api/v1/config/cluster-version" @@ -124,6 +127,16 @@ func StoreLabelByID(id uint64) string { return fmt.Sprintf("%s/%d/label", store, id) } +// LabelByStoreID returns the path of PD HTTP API to set store label. +func LabelByStoreID(storeID int64) string { + return fmt.Sprintf("%s/%d/label", store, storeID) +} + +// TransferLeaderByID returns the path of PD HTTP API to transfer leader by ID. +func TransferLeaderByID(leaderID string) string { + return fmt.Sprintf("%s/%s", transferLeader, leaderID) +} + // ConfigWithTTLSeconds returns the config API with the TTL seconds parameter. func ConfigWithTTLSeconds(ttlSeconds float64) string { return fmt.Sprintf("%s?ttlSecond=%.0f", Config, ttlSeconds) diff --git a/client/http/client.go b/client/http/client.go index 613ebf33294..958c52489fb 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -26,6 +26,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -54,9 +55,16 @@ type Client interface { GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error) GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error) GetStores(context.Context) (*StoresInfo, error) + SetStoreLabels(context.Context, int64, map[string]string) error + GetMembers(context.Context) (*MembersInfo, error) + GetLeader(context.Context) (*pdpb.Member, error) + TransferLeader(context.Context, string) error /* Config-related interfaces */ GetScheduleConfig(context.Context) (map[string]interface{}, error) SetScheduleConfig(context.Context, map[string]interface{}) error + /* Scheduler-related interfaces */ + GetSchedulers(context.Context) ([]string, error) + CreateScheduler(ctx context.Context, name string, storeID uint64) error /* Rule-related interfaces */ GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error) GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error) @@ -458,6 +466,44 @@ func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRan return ®ionStats, nil } +// SetStoreLabels sets the labels of a store. +func (c *client) SetStoreLabels(ctx context.Context, storeID int64, storeLabels map[string]string) error { + jsonInput, err := json.Marshal(storeLabels) + if err != nil { + return errors.Trace(err) + } + return c.requestWithRetry(ctx, "SetStoreLabel", LabelByStoreID(storeID), + http.MethodPost, bytes.NewBuffer(jsonInput), nil) +} + +func (c *client) GetMembers(ctx context.Context) (*MembersInfo, error) { + var members MembersInfo + err := c.requestWithRetry(ctx, + "GetMembers", membersPrefix, + http.MethodGet, http.NoBody, &members) + if err != nil { + return nil, err + } + return &members, nil +} + +// GetLeader gets the leader of PD cluster. +func (c *client) GetLeader(ctx context.Context) (*pdpb.Member, error) { + var leader pdpb.Member + err := c.requestWithRetry(ctx, "GetLeader", leaderPrefix, + http.MethodGet, http.NoBody, &leader) + if err != nil { + return nil, err + } + return &leader, nil +} + +// TransferLeader transfers the PD leader. +func (c *client) TransferLeader(ctx context.Context, newLeader string) error { + return c.requestWithRetry(ctx, "TransferLeader", TransferLeaderByID(newLeader), + http.MethodPost, http.NoBody, nil) +} + // GetScheduleConfig gets the schedule configurations. func (c *client) GetScheduleConfig(ctx context.Context) (map[string]interface{}, error) { var config map[string]interface{} @@ -662,6 +708,31 @@ func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *Labe http.MethodPatch, bytes.NewBuffer(labelRulePatchJSON), nil) } +// GetSchedulers gets the schedulers from PD cluster. +func (c *client) GetSchedulers(ctx context.Context) ([]string, error) { + var schedulers []string + err := c.requestWithRetry(ctx, "GetSchedulers", Schedulers, + http.MethodGet, http.NoBody, &schedulers) + if err != nil { + return nil, err + } + return schedulers, nil +} + +// CreateScheduler creates a scheduler to PD cluster. +func (c *client) CreateScheduler(ctx context.Context, name string, storeID uint64) error { + inputJSON, err := json.Marshal(map[string]interface{}{ + "name": name, + "store_id": storeID, + }) + if err != nil { + return errors.Trace(err) + } + return c.requestWithRetry(ctx, + "CreateScheduler", Schedulers, + http.MethodPost, bytes.NewBuffer(inputJSON), nil) +} + // AccelerateSchedule accelerates the scheduling of the regions within the given key range. // The keys in the key range should be encoded in the hex bytes format (without encoding to the UTF-8 bytes). func (c *client) AccelerateSchedule(ctx context.Context, keyRange *KeyRange) error { diff --git a/client/http/types.go b/client/http/types.go index 1d8db36d100..b05e8e0efba 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/kvproto/pkg/encryptionpb" + "github.com/pingcap/kvproto/pkg/pdpb" ) // KeyRange defines a range of keys in bytes. @@ -574,3 +575,12 @@ type LabelRulePatch struct { SetRules []*LabelRule `json:"sets"` DeleteRules []string `json:"deletes"` } + +// MembersInfo is PD members info returned from PD RESTful interface +// type Members map[string][]*pdpb.Member +type MembersInfo struct { + Header *pdpb.ResponseHeader `json:"header,omitempty"` + Members []*pdpb.Member `json:"members,omitempty"` + Leader *pdpb.Member `json:"leader,omitempty"` + EtcdLeader *pdpb.Member `json:"etcd_leader,omitempty"` +} diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 6c636d2a2a1..476b4d2f541 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -49,7 +49,7 @@ func (suite *httpClientTestSuite) SetupSuite() { re := suite.Require() var err error suite.ctx, suite.cancelFunc = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 2) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) @@ -384,3 +384,63 @@ func (suite *httpClientTestSuite) TestScheduleConfig() { re.Equal(float64(8), config["leader-schedule-limit"]) re.Equal(float64(2048), config["region-schedule-limit"]) } + +func (suite *httpClientTestSuite) TestSchedulers() { + re := suite.Require() + schedulers, err := suite.client.GetSchedulers(suite.ctx) + re.NoError(err) + re.Len(schedulers, 0) + + err = suite.client.CreateScheduler(suite.ctx, "evict-leader-scheduler", 1) + re.NoError(err) + schedulers, err = suite.client.GetSchedulers(suite.ctx) + re.NoError(err) + re.Len(schedulers, 1) +} + +func (suite *httpClientTestSuite) TestSetStoreLabels() { + re := suite.Require() + resp, err := suite.client.GetStores(suite.ctx) + re.NoError(err) + setStore := resp.Stores[0] + re.Empty(setStore.Store.Labels, nil) + storeLabels := map[string]string{ + "zone": "zone1", + } + err = suite.client.SetStoreLabels(suite.ctx, 1, storeLabels) + re.NoError(err) + + resp, err = suite.client.GetStores(suite.ctx) + re.NoError(err) + for _, store := range resp.Stores { + if store.Store.ID == setStore.Store.ID { + for _, label := range store.Store.Labels { + re.Equal(label.Value, storeLabels[label.Key]) + } + } + } +} + +func (suite *httpClientTestSuite) TestTransferLeader() { + re := suite.Require() + members, err := suite.client.GetMembers(suite.ctx) + re.NoError(err) + re.Len(members.Members, 2) + + oldLeader, err := suite.client.GetLeader(suite.ctx) + re.NoError(err) + + // Transfer leader to another pd + for _, member := range members.Members { + if member.Name != oldLeader.Name { + err = suite.client.TransferLeader(suite.ctx, member.Name) + re.NoError(err) + break + } + } + + newLeader := suite.cluster.WaitLeader() + re.NotEmpty(newLeader) + re.NoError(err) + re.NotEqual(oldLeader.Name, newLeader) +}