Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm-operator: support discovery dm-master service in current discovery service #3098

Merged
merged 36 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
08a8ae4
add dmclusters CRD
lichunzhu Aug 5, 2020
5cb82ac
resolve conflicts
lichunzhu Aug 5, 2020
bcf58ea
Merge branch 'master' into defineDMSpec
lichunzhu Aug 5, 2020
55a6365
address comment
lichunzhu Aug 6, 2020
0aae8c0
Merge branch 'master' into defineDMSpec
lichunzhu Aug 6, 2020
f146be1
address comments
lichunzhu Aug 6, 2020
94c427b
Merge branch 'defineDMSpec' of https://github.com/lichunzhu/tidb-oper…
lichunzhu Aug 6, 2020
4f10a51
delete monitor ref
lichunzhu Aug 6, 2020
a020492
generate dmcluster client
lichunzhu Aug 6, 2020
3edfaa2
address comments
lichunzhu Aug 6, 2020
74aca39
Merge branch 'master' into defineDMSpec
lichunzhu Aug 7, 2020
2c1bec5
address comment
lichunzhu Aug 7, 2020
fbe26f3
tmp commit
lichunzhu Aug 7, 2020
d85a9fc
resolve conflict
lichunzhu Aug 7, 2020
a9da15f
merge master
lichunzhu Aug 11, 2020
ba0f518
remove dm package
lichunzhu Aug 12, 2020
7a51c07
fix bugs
lichunzhu Aug 12, 2020
3ec6c86
fix bug
lichunzhu Aug 12, 2020
609e797
address comment
lichunzhu Aug 13, 2020
ac8348c
Merge branch 'master' into discoverySupportDM
lichunzhu Aug 13, 2020
e3699ca
Merge branch 'master' into discoverySupportDM
lichunzhu Aug 13, 2020
2c7e525
fix bug
lichunzhu Aug 13, 2020
a254750
Merge branch 'master' into discoverySupportDM
DanielZhangQD Aug 18, 2020
3325710
Merge branch 'master' into discoverySupportDM
lonng Aug 18, 2020
61fd15c
address comments
lichunzhu Aug 18, 2020
8424f32
Merge branch 'discoverySupportDM' of https://github.com/lichunzhu/tid…
lichunzhu Aug 18, 2020
c15a013
Merge branch 'master' into discoverySupportDM
lichunzhu Aug 18, 2020
546deb3
address comments
lichunzhu Aug 19, 2020
4ec300e
Merge branch 'master' into discoverySupportDM
lichunzhu Aug 19, 2020
421e9b8
fix bug
lichunzhu Aug 19, 2020
e87c28f
Merge branch 'discoverySupportDM' of https://github.com/lichunzhu/tid…
lichunzhu Aug 19, 2020
0416231
fix ut
lichunzhu Aug 19, 2020
0d4430e
Merge branch 'master' into discoverySupportDM
DanielZhangQD Aug 19, 2020
fa05a1b
address comments
lichunzhu Aug 19, 2020
f505d4c
Merge branch 'discoverySupportDM' of https://github.com/lichunzhu/tid…
lichunzhu Aug 19, 2020
b7b0c31
fix bugs and unit tests
lichunzhu Aug 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/discovery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/discovery/server"
"github.com/pingcap/tidb-operator/pkg/dmapi"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/version"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -87,7 +88,7 @@ func main() {
go wait.Forever(func() {
addr := fmt.Sprintf("0.0.0.0:%d", port)
klog.Infof("starting TiDB Discovery server, listening on %s", addr)
discoveryServer := server.NewServer(pdapi.NewDefaultPDControl(kubeCli), cli, kubeCli)
discoveryServer := server.NewServer(pdapi.NewDefaultPDControl(kubeCli), dmapi.NewDefaultMasterControl(kubeCli), cli, kubeCli)
discoveryServer.ListenAndServe(addr)
}, 5*time.Second)
go wait.Forever(func() {
Expand Down
25 changes: 25 additions & 0 deletions pkg/apis/pingcap/v1alpha1/dmcluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v1alpha1

func (dc *DMCluster) Scheme() string {
if dc.IsTLSClusterEnabled() {
return "https"
}
return "http"
}

func (dc *DMCluster) IsTLSClusterEnabled() bool {
return dc.Spec.TLSCluster != nil && dc.Spec.TLSCluster.Enabled
}
78 changes: 69 additions & 9 deletions pkg/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,26 @@ import (
"sync"

"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/dmapi"
"github.com/pingcap/tidb-operator/pkg/pdapi"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog"
)

// TiDBDiscovery helps new PD member to discover all other members in cluster bootstrap phase.
// TiDBDiscovery helps new PD and dm-master member to discover all other members in cluster bootstrap phase.
type TiDBDiscovery interface {
Discover(string) (string, error)
DiscoverDM(string) (string, error)
}

type tidbDiscovery struct {
cli versioned.Interface
lock sync.Mutex
clusters map[string]*clusterInfo
pdControl pdapi.PDControlInterface
cli versioned.Interface
lock sync.Mutex
clusters map[string]*clusterInfo
dmClusters map[string]*clusterInfo
pdControl pdapi.PDControlInterface
masterControl dmapi.MasterControlInterface
}

type clusterInfo struct {
Expand All @@ -44,11 +48,13 @@ type clusterInfo struct {
}

// NewTiDBDiscovery returns a TiDBDiscovery
func NewTiDBDiscovery(pdControl pdapi.PDControlInterface, cli versioned.Interface, kubeCli kubernetes.Interface) TiDBDiscovery {
func NewTiDBDiscovery(pdControl pdapi.PDControlInterface, masterControl dmapi.MasterControlInterface, cli versioned.Interface, kubeCli kubernetes.Interface) TiDBDiscovery {
return &tidbDiscovery{
cli: cli,
pdControl: pdControl,
clusters: map[string]*clusterInfo{},
cli: cli,
pdControl: pdControl,
masterControl: masterControl,
clusters: map[string]*clusterInfo{},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize dmClusters here?

dmClusters: map[string]*clusterInfo{},
}
}

Expand Down Expand Up @@ -114,3 +120,57 @@ func (td *tidbDiscovery) Discover(advertisePeerUrl string) (string, error) {
delete(currentCluster.peers, podName)
return fmt.Sprintf("--join=%s", strings.Join(membersArr, ",")), nil
}

func (td *tidbDiscovery) DiscoverDM(advertisePeerUrl string) (string, error) {
td.lock.Lock()
defer td.lock.Unlock()

if advertisePeerUrl == "" {
return "", fmt.Errorf("dm advertisePeerUrl is empty")
}
klog.Infof("dm advertisePeerUrl is: %s", advertisePeerUrl)
strArr := strings.Split(advertisePeerUrl, ".")
if len(strArr) != 2 {
return "", fmt.Errorf("dm advertisePeerUrl format is wrong: %s", advertisePeerUrl)
}

podName, peerServiceName := strArr[0], strArr[1]
dcName := strings.TrimSuffix(peerServiceName, "-dm-master-peer")
ns := os.Getenv("MY_POD_NAMESPACE")

dc, err := td.cli.PingcapV1alpha1().DMClusters(ns).Get(dcName, metav1.GetOptions{})
DanielZhangQD marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return "", err
}
keyName := fmt.Sprintf("%s/%s", ns, dcName)
// TODO: the replicas should be the total replicas of dm master sets.
replicas := dc.Spec.Master.Replicas

currentCluster := td.dmClusters[keyName]
if currentCluster == nil || currentCluster.resourceVersion != dc.ResourceVersion {
td.dmClusters[keyName] = &clusterInfo{
resourceVersion: dc.ResourceVersion,
peers: map[string]struct{}{},
}
}
currentCluster = td.dmClusters[keyName]
currentCluster.peers[podName] = struct{}{}

if len(currentCluster.peers) == int(replicas) {
delete(currentCluster.peers, podName)
return fmt.Sprintf("--initial-cluster=%s=%s://%s", podName, dc.Scheme(), advertisePeerUrl), nil
}

masterClient := td.masterControl.GetMasterClient(dmapi.Namespace(dc.GetNamespace()), dc.GetName(), dc.IsTLSClusterEnabled())
mastersInfos, err := masterClient.GetMasters()
if err != nil {
return "", err
}

mastersArr := make([]string, 0)
for _, master := range mastersInfos {
mastersArr = append(mastersArr, master.ClientURLs[0])
DanielZhangQD marked this conversation as resolved.
Show resolved Hide resolved
}
delete(currentCluster.peers, podName)
return fmt.Sprintf("--join=%s", strings.Join(mastersArr, ",")), nil
}
Loading