Skip to content

Commit

Permalink
client/http: add func for jenkins test (#7516)
Browse files Browse the repository at this point in the history
ref #7300

Signed-off-by: husharp <jinhao.hu@pingcap.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] authored Dec 13, 2023
1 parent f71de23 commit 1eed494
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 1 deletion.
13 changes: 13 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
membersPrefix = "/pd/api/v1/members"
leaderPrefix = "/pd/api/v1/leader"
transferLeader = "/pd/api/v1/leader/transfer"
// Config
Config = "/pd/api/v1/config"
ClusterVersion = "/pd/api/v1/config/cluster-version"
Expand Down Expand Up @@ -124,6 +127,16 @@ func StoreLabelByID(id uint64) string {
return fmt.Sprintf("%s/%d/label", store, id)
}

// LabelByStoreID returns the path of PD HTTP API to set store label.
func LabelByStoreID(storeID int64) string {
return fmt.Sprintf("%s/%d/label", store, storeID)
}

// TransferLeaderByID returns the path of PD HTTP API to transfer leader by ID.
func TransferLeaderByID(leaderID string) string {
return fmt.Sprintf("%s/%s", transferLeader, leaderID)
}

// ConfigWithTTLSeconds returns the config API with the TTL seconds parameter.
func ConfigWithTTLSeconds(ttlSeconds float64) string {
return fmt.Sprintf("%s?ttlSecond=%.0f", Config, ttlSeconds)
Expand Down
71 changes: 71 additions & 0 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -54,9 +55,16 @@ type Client interface {
GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error)
GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
SetStoreLabels(context.Context, int64, map[string]string) error
GetMembers(context.Context) (*MembersInfo, error)
GetLeader(context.Context) (*pdpb.Member, error)
TransferLeader(context.Context, string) error
/* Config-related interfaces */
GetScheduleConfig(context.Context) (map[string]interface{}, error)
SetScheduleConfig(context.Context, map[string]interface{}) error
/* Scheduler-related interfaces */
GetSchedulers(context.Context) ([]string, error)
CreateScheduler(ctx context.Context, name string, storeID uint64) error
/* Rule-related interfaces */
GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error)
GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error)
Expand Down Expand Up @@ -458,6 +466,44 @@ func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRan
return &regionStats, nil
}

// SetStoreLabels sets the labels of a store.
func (c *client) SetStoreLabels(ctx context.Context, storeID int64, storeLabels map[string]string) error {
jsonInput, err := json.Marshal(storeLabels)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx, "SetStoreLabel", LabelByStoreID(storeID),
http.MethodPost, bytes.NewBuffer(jsonInput), nil)
}

func (c *client) GetMembers(ctx context.Context) (*MembersInfo, error) {
var members MembersInfo
err := c.requestWithRetry(ctx,
"GetMembers", membersPrefix,
http.MethodGet, http.NoBody, &members)
if err != nil {
return nil, err
}
return &members, nil
}

// GetLeader gets the leader of PD cluster.
func (c *client) GetLeader(ctx context.Context) (*pdpb.Member, error) {
var leader pdpb.Member
err := c.requestWithRetry(ctx, "GetLeader", leaderPrefix,
http.MethodGet, http.NoBody, &leader)
if err != nil {
return nil, err
}
return &leader, nil
}

// TransferLeader transfers the PD leader.
func (c *client) TransferLeader(ctx context.Context, newLeader string) error {
return c.requestWithRetry(ctx, "TransferLeader", TransferLeaderByID(newLeader),
http.MethodPost, http.NoBody, nil)
}

// GetScheduleConfig gets the schedule configurations.
func (c *client) GetScheduleConfig(ctx context.Context) (map[string]interface{}, error) {
var config map[string]interface{}
Expand Down Expand Up @@ -662,6 +708,31 @@ func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *Labe
http.MethodPatch, bytes.NewBuffer(labelRulePatchJSON), nil)
}

// GetSchedulers gets the schedulers from PD cluster.
func (c *client) GetSchedulers(ctx context.Context) ([]string, error) {
var schedulers []string
err := c.requestWithRetry(ctx, "GetSchedulers", Schedulers,
http.MethodGet, http.NoBody, &schedulers)
if err != nil {
return nil, err
}
return schedulers, nil
}

// CreateScheduler creates a scheduler to PD cluster.
func (c *client) CreateScheduler(ctx context.Context, name string, storeID uint64) error {
inputJSON, err := json.Marshal(map[string]interface{}{
"name": name,
"store_id": storeID,
})
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"CreateScheduler", Schedulers,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
// The keys in the key range should be encoded in the hex bytes format (without encoding to the UTF-8 bytes).
func (c *client) AccelerateSchedule(ctx context.Context, keyRange *KeyRange) error {
Expand Down
10 changes: 10 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/kvproto/pkg/pdpb"
)

// KeyRange defines a range of keys in bytes.
Expand Down Expand Up @@ -574,3 +575,12 @@ type LabelRulePatch struct {
SetRules []*LabelRule `json:"sets"`
DeleteRules []string `json:"deletes"`
}

// MembersInfo is PD members info returned from PD RESTful interface
// type Members map[string][]*pdpb.Member
type MembersInfo struct {
Header *pdpb.ResponseHeader `json:"header,omitempty"`
Members []*pdpb.Member `json:"members,omitempty"`
Leader *pdpb.Member `json:"leader,omitempty"`
EtcdLeader *pdpb.Member `json:"etcd_leader,omitempty"`
}
62 changes: 61 additions & 1 deletion tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (suite *httpClientTestSuite) SetupSuite() {
re := suite.Require()
var err error
suite.ctx, suite.cancelFunc = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestCluster(suite.ctx, 1)
suite.cluster, err = tests.NewTestCluster(suite.ctx, 2)
re.NoError(err)
err = suite.cluster.RunInitialServers()
re.NoError(err)
Expand Down Expand Up @@ -384,3 +384,63 @@ func (suite *httpClientTestSuite) TestScheduleConfig() {
re.Equal(float64(8), config["leader-schedule-limit"])
re.Equal(float64(2048), config["region-schedule-limit"])
}

func (suite *httpClientTestSuite) TestSchedulers() {
re := suite.Require()
schedulers, err := suite.client.GetSchedulers(suite.ctx)
re.NoError(err)
re.Len(schedulers, 0)

err = suite.client.CreateScheduler(suite.ctx, "evict-leader-scheduler", 1)
re.NoError(err)
schedulers, err = suite.client.GetSchedulers(suite.ctx)
re.NoError(err)
re.Len(schedulers, 1)
}

func (suite *httpClientTestSuite) TestSetStoreLabels() {
re := suite.Require()
resp, err := suite.client.GetStores(suite.ctx)
re.NoError(err)
setStore := resp.Stores[0]
re.Empty(setStore.Store.Labels, nil)
storeLabels := map[string]string{
"zone": "zone1",
}
err = suite.client.SetStoreLabels(suite.ctx, 1, storeLabels)
re.NoError(err)

resp, err = suite.client.GetStores(suite.ctx)
re.NoError(err)
for _, store := range resp.Stores {
if store.Store.ID == setStore.Store.ID {
for _, label := range store.Store.Labels {
re.Equal(label.Value, storeLabels[label.Key])
}
}
}
}

func (suite *httpClientTestSuite) TestTransferLeader() {
re := suite.Require()
members, err := suite.client.GetMembers(suite.ctx)
re.NoError(err)
re.Len(members.Members, 2)

oldLeader, err := suite.client.GetLeader(suite.ctx)
re.NoError(err)

// Transfer leader to another pd
for _, member := range members.Members {
if member.Name != oldLeader.Name {
err = suite.client.TransferLeader(suite.ctx, member.Name)
re.NoError(err)
break
}
}

newLeader := suite.cluster.WaitLeader()
re.NotEmpty(newLeader)
re.NoError(err)
re.NotEqual(oldLeader.Name, newLeader)
}

0 comments on commit 1eed494

Please sign in to comment.