Skip to content

Commit

Permalink
Enable placement rules automatically (#2328) (#2340)
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored Apr 29, 2020
1 parent 1cb1571 commit d545aa0
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 15 deletions.
12 changes: 1 addition & 11 deletions pkg/manager/member/pd_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,17 +741,7 @@ func getPDConfigMap(tc *v1alpha1.TidbCluster) (*corev1.ConfigMap, error) {
config.Dashboard.TiDBCertPath = path.Join(tidbClientCertPath, corev1.TLSCertKey)
config.Dashboard.TiDBKeyPath = path.Join(tidbClientCertPath, corev1.TLSPrivateKeyKey)
}
// TiFlash requires PD to enable the `replication.enable-placement-rules`
// Check detail in https://pingcap.com/docs/stable/reference/tiflash/deploy/
if tc.Spec.TiFlash != nil {
if config.Replication == nil {
config.Replication = &v1alpha1.PDReplicationConfig{}
}
if config.Replication.EnablePlacementRules == nil {
enable := true
config.Replication.EnablePlacementRules = &enable
}
}

confText, err := MarshalTOML(config)
if err != nil {
return nil, err
Expand Down
24 changes: 23 additions & 1 deletion pkg/manager/member/tiflash_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,36 @@ func (tfmm *tiflashMemberManager) Sync(tc *v1alpha1.TidbCluster) error {
return controller.RequeueErrorf("TidbCluster: [%s/%s], waiting for PD cluster running", ns, tcName)
}

err := tfmm.enablePlacementRules(tc)
if err != nil {
klog.Errorf("Enable placement rules failed, error: %v", err)
// No need to return err here, just continue to sync tiflash
}
// Sync TiFlash Headless Service
if err := tfmm.syncHeadlessService(tc); err != nil {
if err = tfmm.syncHeadlessService(tc); err != nil {
return err
}

return tfmm.syncStatefulSet(tc)
}

func (tfmm *tiflashMemberManager) enablePlacementRules(tc *v1alpha1.TidbCluster) error {
pdCli := controller.GetPDClient(tfmm.pdControl, tc)
config, err := pdCli.GetConfig()
if err != nil {
return err
}
if config.Replication.EnablePlacementRules != nil && (!*config.Replication.EnablePlacementRules) {
klog.Infof("Cluster %s/%s enable-placement-rules is %v, set it to true", tc.Namespace, tc.Name, *config.Replication.EnablePlacementRules)
enable := true
rep := pdapi.PDReplicationConfig{
EnablePlacementRules: &enable,
}
return pdCli.UpdateReplicationConfig(rep)
}
return nil
}

func (tfmm *tiflashMemberManager) syncHeadlessService(tc *v1alpha1.TidbCluster) error {
if tc.Spec.Paused {
klog.V(4).Infof("tiflash cluster %s/%s is paused, skip syncing for tiflash service", tc.GetNamespace(), tc.GetName())
Expand Down
39 changes: 36 additions & 3 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ type PDClient interface {
// storeLabelsEqualNodeLabels compares store labels with node labels
// for historic reasons, PD stores TiKV labels as []*StoreLabel which is a key-value pair slice
SetStoreLabels(storeID uint64, labels map[string]string) (bool, error)
// UpdateReplicationConfig updates the replication config
UpdateReplicationConfig(config PDReplicationConfig) error
// DeleteStore deletes a TiKV store from cluster
DeleteStore(storeID uint64) error
// SetStoreState sets store to specified state.
Expand Down Expand Up @@ -185,6 +187,7 @@ var (
schedulersPrefix = "pd/api/v1/schedulers"
pdLeaderPrefix = "pd/api/v1/leader"
pdLeaderTransferPrefix = "pd/api/v1/leader/transfer"
pdReplicationPrefix = "pd/api/v1/config/replicate"
)

// pdClient is default implementation of PDClient
Expand Down Expand Up @@ -512,6 +515,24 @@ func (pc *pdClient) SetStoreLabels(storeID uint64, labels map[string]string) (bo
return false, fmt.Errorf("failed %v to set store labels: %v", res.StatusCode, err2)
}

func (pc *pdClient) UpdateReplicationConfig(config PDReplicationConfig) error {
apiURL := fmt.Sprintf("%s/%s", pc.url, pdReplicationPrefix)
data, err := json.Marshal(config)
if err != nil {
return err
}
res, err := pc.httpClient.Post(apiURL, "application/json", bytes.NewBuffer(data))
if err != nil {
return err
}
defer httputil.DeferClose(res.Body)
if res.StatusCode == http.StatusOK {
return nil
}
err = httputil.ReadErrorBody(res.Body)
return fmt.Errorf("failed %v to update replication: %v", res.StatusCode, err)
}

func (pc *pdClient) BeginEvictLeader(storeID uint64) error {
leaderEvictInfo := getLeaderEvictSchedulerInfo(storeID)
apiURL := fmt.Sprintf("%s/%s", pc.url, schedulersPrefix)
Expand Down Expand Up @@ -697,6 +718,7 @@ const (
DeleteMemberByIDActionType ActionType = "DeleteMemberByID"
DeleteMemberActionType ActionType = "DeleteMember "
SetStoreLabelsActionType ActionType = "SetStoreLabels"
UpdateReplicationActionType ActionType = "UpdateReplicationConfig"
BeginEvictLeaderActionType ActionType = "BeginEvictLeader"
EndEvictLeaderActionType ActionType = "EndEvictLeader"
GetEvictLeaderSchedulersActionType ActionType = "GetEvictLeaderSchedulers"
Expand All @@ -713,9 +735,10 @@ func (nfr *NotFoundReaction) Error() string {
}

type Action struct {
ID uint64
Name string
Labels map[string]string
ID uint64
Name string
Labels map[string]string
Replication PDReplicationConfig
}

type Reaction func(action *Action) (interface{}, error)
Expand Down Expand Up @@ -855,6 +878,16 @@ func (pc *FakePDClient) SetStoreLabels(storeID uint64, labels map[string]string)
return true, nil
}

// UpdateReplicationConfig updates the replication config
func (pc *FakePDClient) UpdateReplicationConfig(config PDReplicationConfig) error {
if reaction, ok := pc.reactions[UpdateReplicationActionType]; ok {
action := &Action{Replication: config}
_, err := reaction(action)
return err
}
return nil
}

func (pc *FakePDClient) BeginEvictLeader(storeID uint64) error {
if reaction, ok := pc.reactions[BeginEvictLeaderActionType]; ok {
action := &Action{ID: storeID}
Expand Down

0 comments on commit d545aa0

Please sign in to comment.