Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm-operator/: support start a new dm cluster with dm-masters and dm-workers #3146

Merged
merged 40 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
08a8ae4
add dmclusters CRD
lichunzhu Aug 5, 2020
5cb82ac
resolve conflicts
lichunzhu Aug 5, 2020
bcf58ea
Merge branch 'master' into defineDMSpec
lichunzhu Aug 5, 2020
55a6365
address comment
lichunzhu Aug 6, 2020
0aae8c0
Merge branch 'master' into defineDMSpec
lichunzhu Aug 6, 2020
f146be1
address comments
lichunzhu Aug 6, 2020
94c427b
Merge branch 'defineDMSpec' of https://github.com/lichunzhu/tidb-oper…
lichunzhu Aug 6, 2020
4f10a51
delete monitor ref
lichunzhu Aug 6, 2020
a020492
generate dmcluster client
lichunzhu Aug 6, 2020
3edfaa2
address comments
lichunzhu Aug 6, 2020
74aca39
Merge branch 'master' into defineDMSpec
lichunzhu Aug 7, 2020
2c1bec5
address comment
lichunzhu Aug 7, 2020
fbe26f3
tmp commit
lichunzhu Aug 7, 2020
d85a9fc
resolve conflict
lichunzhu Aug 7, 2020
a9da15f
merge master
lichunzhu Aug 11, 2020
ba0f518
remove dm package
lichunzhu Aug 12, 2020
7a51c07
fix bugs
lichunzhu Aug 12, 2020
3ec6c86
fix bug
lichunzhu Aug 12, 2020
8122c71
support start dm-master and dm-worker in cluster
lichunzhu Aug 18, 2020
a38bb0e
fix some bugs
lichunzhu Aug 19, 2020
1e7efd8
merge master branch and resolve conflicts
lichunzhu Aug 19, 2020
850035c
fix ut
lichunzhu Aug 20, 2020
cd3f5b4
fix dm-master start
lichunzhu Aug 24, 2020
f4a641d
fix dm-worker start bug
lichunzhu Aug 24, 2020
5a51cbe
Merge branch 'master' of https://github.com/pingcap/tidb-operator int…
lichunzhu Aug 25, 2020
be4fd5e
add more column info
lichunzhu Aug 25, 2020
1e4f2ac
Merge branch 'master' of https://github.com/pingcap/tidb-operator int…
lichunzhu Aug 25, 2020
daf81f7
Merge branch 'master' into supportStartDMCluster
lichunzhu Aug 25, 2020
f7c70bd
fix ut
lichunzhu Aug 25, 2020
08f4695
fix verify
lichunzhu Aug 25, 2020
e2249f9
fix verify again
lichunzhu Aug 25, 2020
161b862
address comments
lichunzhu Aug 25, 2020
35a69d3
Merge branch 'master' into supportStartDMCluster
lichunzhu Aug 25, 2020
d3da808
regenerate code
lichunzhu Aug 25, 2020
8e52fbb
address comments
lichunzhu Aug 25, 2020
cefe7b8
Merge branch 'master' into supportStartDMCluster
lichunzhu Aug 25, 2020
f1b6029
fix import cycle problem
lichunzhu Aug 26, 2020
7726669
Merge branch 'supportStartDMCluster' of https://github.com/lichunzhu/…
lichunzhu Aug 26, 2020
84a6309
fix check
lichunzhu Aug 26, 2020
5ab015d
address comments
lichunzhu Aug 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/dm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The following steps will create a DM cluster.

**Prerequisites**:
- Has TiDB operator `v1.2.0` or higher version installed. [Doc](https://pingcap.com/docs/stable/tidb-in-kubernetes/deploy/tidb-operator/)
- Has default `StorageClass` configured, and there are enough PVs (by default, 6 PVs are required) of that storageClass:
- Has default `StorageClass` configured, and there are enough PVs (by default, 2 PVs are required) of that storageClass:

This could by verified by the following command:

Expand All @@ -24,7 +24,7 @@ The following steps will create a DM cluster.
gold kubernetes.io/gce-pd 1d
```

Alternatively, you could specify the storageClass explicitly by modifying `tidb-cluster.yaml`.
Alternatively, you could specify the storageClass explicitly by modifying `dm-cluster.yaml`.

## Install

Expand All @@ -47,7 +47,7 @@ watch kubectl -n <namespace> get pod
Explore the DM master interface:

```bash
> kubectl -n <namespace> port-forward svc/basic-dm-master 8261:8261 &>/tmp/pf-dm.log &
> kubectl -n <namespace> port-forward svc/basic-dm-master 8261:8261
> dmctl --master-addr 127.0.0.1:8261 list-member
```

Expand Down
4 changes: 2 additions & 2 deletions examples/dm/dm-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ spec:
address: "http://basic-discovery.demo:10261"
master:
baseImage: pingcap/dm
replicas: 3
replicas: 1
# if storageClassName is not set, the default Storage Class of the Kubernetes cluster will be used
# storageClassName: local-storage
storageSize: "1Gi"
requests: {}
config: {}
worker:
baseImage: pingcap/dm
replicas: 3
replicas: 1
# if storageClassName is not set, the default Storage Class of the Kubernetes cluster will be used
# storageClassName: local-storage
storageSize: "1Gi"
Expand Down
7 changes: 0 additions & 7 deletions pkg/apis/pingcap/v1alpha1/defaulting/dmcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package defaulting
import (
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"
)

const (
Expand Down Expand Up @@ -56,9 +55,6 @@ func setMasterSpecDefault(dc *v1alpha1.DMCluster) {
dc.Spec.Master.BaseImage = defaultMasterImage
}
}
if dc.Spec.Master.MaxFailoverCount == nil {
dc.Spec.Master.MaxFailoverCount = pointer.Int32Ptr(3)
}
}

func setWorkerSpecDefault(dc *v1alpha1.DMCluster) {
Expand All @@ -67,7 +63,4 @@ func setWorkerSpecDefault(dc *v1alpha1.DMCluster) {
dc.Spec.Worker.BaseImage = defaultWorkerImage
}
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
}
if dc.Spec.Worker.MaxFailoverCount == nil {
dc.Spec.Worker.MaxFailoverCount = pointer.Int32Ptr(3)
}
}
43 changes: 16 additions & 27 deletions pkg/apis/pingcap/v1alpha1/dmcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"strings"
)

var latestImageVersion = "latest"

func (dc *DMCluster) Scheme() string {
if dc.IsTLSClusterEnabled() {
return "https"
Expand Down Expand Up @@ -69,6 +67,7 @@ func (dc *DMCluster) MasterStsDesiredReplicas() int32 {
return dc.Spec.Master.Replicas + int32(len(dc.Status.Master.FailureMembers))
}

// TODO: support fail-over
func (dc *DMCluster) WorkerStsDesiredReplicas() int32 {
if dc.Spec.Worker == nil {
return 0
Expand All @@ -82,35 +81,25 @@ func (dc *DMCluster) GetInstanceName() string {
}

func (dc *DMCluster) MasterImage() string {
image := dc.Spec.Master.Image
baseImage := dc.Spec.Master.BaseImage
// base image takes higher priority
if baseImage != "" {
version := dc.Spec.Master.Version
if version == nil {
version = &dc.Spec.Version
}
if version == nil {
version = &latestImageVersion
}
image = fmt.Sprintf("%s:%s", baseImage, *version)
image := dc.Spec.Master.BaseImage
version := dc.Spec.Master.Version
if version == nil {
version = &dc.Spec.Version
}
if *version != "" {
image = fmt.Sprintf("%s:%s", image, *version)
}
return image
}

func (dc *DMCluster) WorkerImage() string {
image := dc.Spec.Worker.Image
baseImage := dc.Spec.Worker.BaseImage
// base image takes higher priority
if baseImage != "" {
version := dc.Spec.Worker.Version
if version == nil {
version = &dc.Spec.Version
}
if version == nil {
version = &latestImageVersion
}
image = fmt.Sprintf("%s:%s", baseImage, *version)
image := dc.Spec.Worker.BaseImage
version := dc.Spec.Worker.Version
if version == nil {
version = &dc.Spec.Version
}
if *version != "" {
image = fmt.Sprintf("%s:%s", image, *version)
}
return image
}
Expand All @@ -122,7 +111,7 @@ func (dc *DMCluster) MasterVersion() string {
return image[colonIdx+1:]
}

return latestImageVersion
return "latest"
}

func (dc *DMCluster) MasterUpgrading() bool {
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,14 @@ func DMMasterPeerMemberName(clusterName string) string {
return fmt.Sprintf("%s-dm-master-peer", clusterName)
}

// DMWorkerMemberName returns dm-worker member name
func DMWorkerMemberName(clusterName string) string {
return fmt.Sprintf("%s-dm-worker", clusterName)
}

// DMWorkerPeerMemberName returns dm-worker peer service name
func DMWorkerPeerMemberName(clusterName string) string {
return fmt.Sprintf("%s-dm-worker", clusterName)
return fmt.Sprintf("%s-dm-worker-peer", clusterName)
}

// AnnProm adds annotations for prometheus scraping metrics
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/dmcluster/dm_cluster_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewDefaultDMClusterControl(
//metaManager manager.DMManager,
//orphanPodsCleaner member.OrphanPodsCleaner,
pvcCleaner member.PVCCleanerInterface,
pvcResizer member.PVCResizerInterface,
//podRestarter member.PodRestarter,
conditionUpdater DMClusterConditionUpdater,
recorder record.EventRecorder) ControlInterface {
Expand All @@ -57,6 +58,7 @@ func NewDefaultDMClusterControl(
//orphanPodsCleaner,
pvcCleaner,
//podRestarter,
pvcResizer,
conditionUpdater,
recorder,
}
Expand All @@ -71,6 +73,7 @@ type defaultDMClusterControl struct {
//orphanPodsCleaner member.OrphanPodsCleaner
pvcCleaner member.PVCCleanerInterface
//podRestarter member.PodRestarter
pvcResizer member.PVCResizerInterface
conditionUpdater DMClusterConditionUpdater
recorder record.EventRecorder
}
Expand Down Expand Up @@ -194,5 +197,10 @@ func (dcc *defaultDMClusterControl) updateDMCluster(dc *v1alpha1.DMCluster) erro
// syncing the some tidbcluster status attributes
// - sync tidbmonitor reference
// return dcc.tidbClusterStatusManager.Sync(dc)

// resize PVC if necessary
if err := dcc.pvcResizer.ResizeDM(dc); err != nil {
errs = append(errs, err)
}
return errorutils.NewAggregate(errs)
}
23 changes: 23 additions & 0 deletions pkg/controller/dmcluster/dm_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"time"

"github.com/Masterminds/semver"
perrors "github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/dmapi"
mm "github.com/pingcap/tidb-operator/pkg/manager/member"

apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -85,6 +87,7 @@ func NewController(
pvcInformer := kubeInformerFactory.Core().V1().PersistentVolumeClaims()
pvInformer := kubeInformerFactory.Core().V1().PersistentVolumes()
podInformer := kubeInformerFactory.Core().V1().Pods()
scInformer := kubeInformerFactory.Storage().V1().StorageClasses()
//nodeInformer := kubeInformerFactory.Core().V1().Nodes()
//secretInformer := kubeInformerFactory.Core().V1().Secrets()

Expand Down Expand Up @@ -150,6 +153,11 @@ func NewController(
pvInformer.Lister(),
pvControl,
),
mm.NewPVCResizer(
kubeCli,
pvcInformer,
scInformer,
),
//mm.NewDMClusterStatusManager(kubeCli, cli, scalerInformer.Lister(), tikvGroupInformer.Lister()),
//podRestarter,
&dmClusterConditionUpdater{},
Expand Down Expand Up @@ -245,6 +253,12 @@ func (dcc *Controller) sync(key string) error {
if err != nil {
return err
}
clusterVersionLT2, err := clusterVersionLessThan2(dc.MasterVersion())
if err != nil {
klog.V(4).Infof("cluster version: %s is not semantic versioning compatible", dc.MasterVersion())
} else if clusterVersionLT2 {
return fmt.Errorf("dm-operator only supports to deploy dm-2.0")
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
}

return dcc.syncDMCluster(dc.DeepCopy())
}
Expand Down Expand Up @@ -362,3 +376,12 @@ func (dcc *Controller) resolveDMClusterFromSet(namespace string, set *apps.State
}
return dc
}

func clusterVersionLessThan2(version string) (bool, error) {
v, err := semver.NewVersion(version)
if err != nil {
return true, err
}

return v.Major() < 2, nil
}
4 changes: 3 additions & 1 deletion pkg/dmapi/master_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"net/http"
"sync"

"github.com/pingcap/tidb-operator/pkg/controller"

"github.com/pingcap/tidb-operator/pkg/pdapi"

"github.com/pingcap/tidb-operator/pkg/util"
Expand Down Expand Up @@ -82,7 +84,7 @@ func masterClientKey(scheme string, namespace Namespace, clusterName string) str

// MasterClientURL builds the url of master client
func MasterClientURL(namespace Namespace, clusterName string, scheme string) string {
return fmt.Sprintf("%s://%s-dm-master.%s:8261", scheme, clusterName, string(namespace))
return fmt.Sprintf("%s://%s.%s:8261", scheme, controller.DMMasterMemberName(clusterName), string(namespace))
}

// FakeMasterControl implements a fake version of MasterControlInterface.
Expand Down
9 changes: 1 addition & 8 deletions pkg/manager/member/dm_master_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,6 @@ func NewMasterMemberManager(masterControl dmapi.MasterControlInterface,
}

func (mmm *masterMemberManager) Sync(dc *v1alpha1.DMCluster) error {
clusterVersionLT2, err := clusterVersionLessThan2(dc.MasterVersion())
if err != nil {
klog.V(4).Infof("cluster version: %s is not semantic versioning compatible", dc.MasterVersion())
} else if clusterVersionLT2 {
return fmt.Errorf("dm-operator only supports to deploy dm-2.0")
}

// Sync dm-master Service
if err := mmm.syncMasterServiceForDMCluster(dc); err != nil {
return err
Expand Down Expand Up @@ -324,7 +317,7 @@ func (mmm *masterMemberManager) syncDMClusterStatus(dc *v1alpha1.DMCluster, set
}
name := master.Name
if len(name) == 0 {
klog.Warningf("dm-master member: [%s] doesn't have a name, and can't get it from clientUrls: [%s], dm-master Info: [%v] in [%s/%s]",
klog.Warningf("dm-master member: [%s] doesn't have a name, clientUrls: [%s], dm-master Info: [%#v] in [%s/%s]",
id, master.ClientURLs, master, ns, dcName)
continue
}
Expand Down
13 changes: 4 additions & 9 deletions pkg/manager/member/dm_worker_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ func (wmm *workerMemberManager) syncWorkerStatefulSetForDMCluster(dc *v1alpha1.D
ns := dc.GetNamespace()
dcName := dc.GetName()

oldStsTmp, err := wmm.setLister.StatefulSets(ns).Get(controller.DMWorkerPeerMemberName(dcName))
oldStsTmp, err := wmm.setLister.StatefulSets(ns).Get(controller.DMWorkerMemberName(dcName))
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("syncWorkerStatefulSetForDMCluster: failed to get sts %s for cluster %s/%s, error: %s", controller.DMWorkerPeerMemberName(dcName), ns, dcName, err)
return fmt.Errorf("syncWorkerStatefulSetForDMCluster: failed to get sts %s for cluster %s/%s, error: %s", controller.DMWorkerMemberName(dcName), ns, dcName, err)
}

stsNotExist := errors.IsNotFound(err)
Expand Down Expand Up @@ -211,11 +211,6 @@ func (wmm *workerMemberManager) syncWorkerStatefulSetForDMCluster(dc *v1alpha1.D
return nil
}

if dc.MasterUpgrading() {
klog.Warningf("dm-master is upgrading, skipping upgrade dm-worker")
return nil
}

return updateStatefulSet(wmm.setControl, dc, newSts, oldSts)
}

Expand Down Expand Up @@ -384,7 +379,7 @@ func getNewWorkerSetForDMCluster(dc *v1alpha1.DMCluster, cm *corev1.ConfigMap) (
}

workerLabel := label.NewDM().Instance(instanceName).DMWorker()
setName := controller.DMWorkerPeerMemberName(dcName)
setName := controller.DMWorkerMemberName(dcName)
podAnnotations := CombineAnnotations(controller.AnnProm(8262), baseWorkerSpec.Annotations())
stsAnnotations := getStsAnnotations(dc.Annotations, label.DMWorkerLabelVal)

Expand Down Expand Up @@ -521,7 +516,7 @@ func getWorkerConfigMap(dc *v1alpha1.DMCluster) (*corev1.ConfigMap, error) {
workerLabel := label.NewDM().Instance(instanceName).DMWorker().Labels()
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: controller.DMWorkerPeerMemberName(dc.Name),
Name: controller.DMWorkerMemberName(dc.Name),
Namespace: dc.Namespace,
Labels: workerLabel,
OwnerReferences: []metav1.OwnerReference{controller.GetDMOwnerRef(dc)},
Expand Down
Loading