Skip to content

Commit

Permalink
dm-operator: support discovery dm-master service in current discovery…
Browse files Browse the repository at this point in the history
… service (#3098)

* add dmclusters CRD

Co-authored-by: DanielZhangQD <36026334+DanielZhangQD@users.noreply.github.com>
Co-authored-by: Lonng <heng@lonng.org>
  • Loading branch information
3 people authored Aug 19, 2020
1 parent 89a7393 commit 10941eb
Show file tree
Hide file tree
Showing 10 changed files with 786 additions and 18 deletions.
3 changes: 2 additions & 1 deletion cmd/discovery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/discovery/server"
"github.com/pingcap/tidb-operator/pkg/dmapi"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/version"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -87,7 +88,7 @@ func main() {
go wait.Forever(func() {
addr := fmt.Sprintf("0.0.0.0:%d", port)
klog.Infof("starting TiDB Discovery server, listening on %s", addr)
discoveryServer := server.NewServer(pdapi.NewDefaultPDControl(kubeCli), cli, kubeCli)
discoveryServer := server.NewServer(pdapi.NewDefaultPDControl(kubeCli), dmapi.NewDefaultMasterControl(kubeCli), cli, kubeCli)
discoveryServer.ListenAndServe(addr)
}, 5*time.Second)
go wait.Forever(func() {
Expand Down
25 changes: 25 additions & 0 deletions pkg/apis/pingcap/v1alpha1/dmcluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v1alpha1

func (dc *DMCluster) Scheme() string {
if dc.IsTLSClusterEnabled() {
return "https"
}
return "http"
}

func (dc *DMCluster) IsTLSClusterEnabled() bool {
return dc.Spec.TLSCluster != nil && dc.Spec.TLSCluster.Enabled
}
83 changes: 74 additions & 9 deletions pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,26 @@ import (
"sync"

"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/dmapi"
"github.com/pingcap/tidb-operator/pkg/pdapi"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
)

// TiDBDiscovery helps new PD member to discover all other members in cluster bootstrap phase.
// TiDBDiscovery helps new PD and dm-master member to discover all other members in cluster bootstrap phase.
type TiDBDiscovery interface {
Discover(string) (string, error)
DiscoverDM(string) (string, error)
}

type tidbDiscovery struct {
cli versioned.Interface
lock sync.Mutex
clusters map[string]*clusterInfo
pdControl pdapi.PDControlInterface
cli versioned.Interface
lock sync.Mutex
clusters map[string]*clusterInfo
dmClusters map[string]*clusterInfo
pdControl pdapi.PDControlInterface
masterControl dmapi.MasterControlInterface
}

type clusterInfo struct {
Expand All @@ -44,11 +48,13 @@ type clusterInfo struct {
}

// NewTiDBDiscovery returns a TiDBDiscovery
func NewTiDBDiscovery(pdControl pdapi.PDControlInterface, cli versioned.Interface, kubeCli kubernetes.Interface) TiDBDiscovery {
func NewTiDBDiscovery(pdControl pdapi.PDControlInterface, masterControl dmapi.MasterControlInterface, cli versioned.Interface, kubeCli kubernetes.Interface) TiDBDiscovery {
return &tidbDiscovery{
cli: cli,
pdControl: pdControl,
clusters: map[string]*clusterInfo{},
cli: cli,
pdControl: pdControl,
masterControl: masterControl,
clusters: map[string]*clusterInfo{},
dmClusters: map[string]*clusterInfo{},
}
}

Expand Down Expand Up @@ -114,3 +120,62 @@ func (td *tidbDiscovery) Discover(advertisePeerUrl string) (string, error) {
delete(currentCluster.peers, podName)
return fmt.Sprintf("--join=%s", strings.Join(membersArr, ",")), nil
}

func (td *tidbDiscovery) DiscoverDM(advertisePeerUrl string) (string, error) {
td.lock.Lock()
defer td.lock.Unlock()

if advertisePeerUrl == "" {
return "", fmt.Errorf("dm advertisePeerUrl is empty")
}
klog.Infof("dm advertisePeerUrl is: %s", advertisePeerUrl)
strArr := strings.Split(advertisePeerUrl, ".")
if len(strArr) != 2 {
return "", fmt.Errorf("dm advertisePeerUrl format is wrong: %s", advertisePeerUrl)
}

podName, peerServiceNameWithPort := strArr[0], strArr[1]
strArr = strings.Split(peerServiceNameWithPort, ":")
if len(strArr) != 2 {
return "", fmt.Errorf("dm advertisePeerUrl format is wrong: %s", advertisePeerUrl)
}
peerServiceName := strArr[0]
dcName := strings.TrimSuffix(peerServiceName, "-dm-master-peer")
ns := os.Getenv("MY_POD_NAMESPACE")

dc, err := td.cli.PingcapV1alpha1().DMClusters(ns).Get(dcName, metav1.GetOptions{})
if err != nil {
return "", err
}
keyName := fmt.Sprintf("%s/%s", ns, dcName)
// TODO: the replicas should be the total replicas of dm master sets.
replicas := dc.Spec.Master.Replicas

currentCluster := td.dmClusters[keyName]
if currentCluster == nil || currentCluster.resourceVersion != dc.ResourceVersion {
td.dmClusters[keyName] = &clusterInfo{
resourceVersion: dc.ResourceVersion,
peers: map[string]struct{}{},
}
}
currentCluster = td.dmClusters[keyName]
currentCluster.peers[podName] = struct{}{}

if len(currentCluster.peers) == int(replicas) {
delete(currentCluster.peers, podName)
return fmt.Sprintf("--initial-cluster=%s=%s://%s", podName, dc.Scheme(), advertisePeerUrl), nil
}

masterClient := td.masterControl.GetMasterClient(dmapi.Namespace(dc.GetNamespace()), dc.GetName(), dc.IsTLSClusterEnabled())
mastersInfos, err := masterClient.GetMasters()
if err != nil {
return "", err
}

mastersArr := make([]string, 0)
for _, master := range mastersInfos {
mastersArr = append(mastersArr, master.ClientURLs[0])
}
delete(currentCluster.peers, podName)
return fmt.Sprintf("--join=%s", strings.Join(mastersArr, ",")), nil
}
Loading

0 comments on commit 10941eb

Please sign in to comment.