Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu committed Aug 25, 2020
1 parent e2249f9 commit 161b862
Show file tree
Hide file tree
Showing 17 changed files with 178 additions and 39 deletions.
65 changes: 65 additions & 0 deletions examples/dm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# A Basic DM cluster

> **Note:**
>
> This setup is for test or demo purpose only and **IS NOT** applicable for critical environment. Refer to the [Documents](https://pingcap.com/docs/stable/tidb-in-kubernetes/deploy/prerequisites/) for production setup.
The following steps will create a DM cluster.

**Prerequisites**:
- Has TiDB operator `v1.2.0` or higher version installed. [Doc](https://pingcap.com/docs/stable/tidb-in-kubernetes/deploy/tidb-operator/)
- Has default `StorageClass` configured, and there are enough PVs (by default, 6 PVs are required) of that storageClass:

This could by verified by the following command:

```bash
> kubectl get storageclass
```

The output is similar to this:

```bash
NAME PROVISIONER AGE
standard (default) kubernetes.io/gce-pd 1d
gold kubernetes.io/gce-pd 1d
```

Alternatively, you could specify the storageClass explicitly by modifying `tidb-cluster.yaml`.

## Install

The following commands is assumed to be executed in this directory.

Install the cluster:

```bash
> kubectl -n <namespace> apply -f ./
```

Wait for cluster Pods ready:

```bash
watch kubectl -n <namespace> get pod
```

## Explore

Explore the DM master interface:

```bash
> kubectl -n <namespace> port-forward svc/basic-dm-master 8261:8261 &>/tmp/pf-dm.log &
> dmctl --master-addr 127.0.0.1:8261 list-member
```

## Destroy

```bash
> kubectl -n <namespace> delete -f ./
```

The PVCs used by DM cluster will not be deleted in the above process, therefore, the PVs will be not be released neither. You can delete PVCs and release the PVs by the following command:

```bash
> kubectl -n <namespace> delete pvc -l app.kubernetes.io/instance=basic,app.kubernetes.io/managed-by=tidb-operator
```

13 changes: 5 additions & 8 deletions manifests/dm/dm-cluster.yaml → examples/dm/dm-cluster.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
# IT IS NOT SUITABLE FOR PRODUCTION USE.
# This YAML describes a basic TiDB cluster with minimum resource requirements,
# which should be able to run in any Kubernetes cluster with storage support.
apiVersion: pingcap.com/v1alpha1
kind: DMCluster
metadata:
Expand All @@ -10,20 +7,20 @@ spec:
version: v2.0.0-beta.2
pvReclaimPolicy: Delete
discovery:
address: "http://basic-discovery.demo.svc:10261"
address: "http://basic-discovery.demo:10261"
master:
baseImage: pingcap/dm
replicas: 3
# if storageClassName is not set, the default Storage Class of the Kubernetes cluster will be used
# storageClassName: local-storage
requests:
storage: "1Gi"
storageSize: "1Gi"
requests: {}
config: {}
worker:
baseImage: pingcap/dm
replicas: 3
# if storageClassName is not set, the default Storage Class of the Kubernetes cluster will be used
# storageClassName: local-storage
requests:
storage: "1Gi"
storageSize: "1Gi"
requests: {}
config: {}
3 changes: 3 additions & 0 deletions pkg/apis/pingcap/v1alpha1/defaulting/dmcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,7 @@ func setWorkerSpecDefault(dc *v1alpha1.DMCluster) {
dc.Spec.Worker.BaseImage = defaultWorkerImage
}
}
if dc.Spec.Worker.MaxFailoverCount == nil {
dc.Spec.Worker.MaxFailoverCount = pointer.Int32Ptr(3)
}
}
40 changes: 30 additions & 10 deletions pkg/apis/pingcap/v1alpha1/dmcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@ package v1alpha1
import (
"fmt"
"strings"

"github.com/pingcap/tidb-operator/pkg/label"
)

var latestImageVersion = "latest"

func (dc *DMCluster) Scheme() string {
if dc.IsTLSClusterEnabled() {
return "https"
}
return "http"
}

func (dc *DMCluster) Timezone() string {
tz := dc.Spec.Timezone
if tz == "" {
return defaultTimeZone
}
return tz
}

func (dc *DMCluster) IsTLSClusterEnabled() bool {
return dc.Spec.TLSCluster != nil && dc.Spec.TLSCluster.Enabled
}
Expand All @@ -44,6 +52,19 @@ func (dc *DMCluster) MasterAllMembersReady() bool {
return true
}

func (dc *DMCluster) WorkerAllMembersReady() bool {
if int(dc.WorkerStsDesiredReplicas()) != len(dc.Status.Worker.Members) {
return false
}

for _, member := range dc.Status.Worker.Members {
if member.Stage == "offline" {
return false
}
}
return true
}

func (dc *DMCluster) MasterStsDesiredReplicas() int32 {
return dc.Spec.Master.Replicas + int32(len(dc.Status.Master.FailureMembers))
}
Expand All @@ -57,13 +78,6 @@ func (dc *DMCluster) WorkerStsDesiredReplicas() int32 {
}

func (dc *DMCluster) GetInstanceName() string {
labels := dc.ObjectMeta.GetLabels()
// Keep backward compatibility for helm.
// This introduce a hidden danger that change this label will trigger rolling-update of most of the components
// TODO(aylei): disallow mutation of this label or adding this label with value other than the cluster name in ValidateUpdate()
if inst, ok := labels[label.InstanceLabelKey]; ok {
return inst
}
return dc.Name
}

Expand All @@ -76,6 +90,9 @@ func (dc *DMCluster) MasterImage() string {
if version == nil {
version = &dc.Spec.Version
}
if version == nil {
version = &latestImageVersion
}
image = fmt.Sprintf("%s:%s", baseImage, *version)
}
return image
Expand All @@ -90,6 +107,9 @@ func (dc *DMCluster) WorkerImage() string {
if version == nil {
version = &dc.Spec.Version
}
if version == nil {
version = &latestImageVersion
}
image = fmt.Sprintf("%s:%s", baseImage, *version)
}
return image
Expand All @@ -102,7 +122,7 @@ func (dc *DMCluster) MasterVersion() string {
return image[colonIdx+1:]
}

return "latest"
return latestImageVersion
}

func (dc *DMCluster) MasterUpgrading() bool {
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/pingcap/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func addKnownTypes(scheme *runtime.Scheme) error {
Scheme = scheme
scheme.AddKnownTypes(SchemeGroupVersion,
&TidbCluster{},
&DMCluster{},
&TidbClusterList{},
&Backup{},
&BackupList{},
Expand All @@ -71,6 +70,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&TiDBGroupList{},
&TiKVGroup{},
&TiKVGroupList{},
&DMCluster{},
&DMClusterList{},
)

metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
Expand Down
16 changes: 16 additions & 0 deletions pkg/apis/pingcap/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,11 @@ type MasterSpec struct {
// +optional
StorageClassName *string `json:"storageClassName,omitempty"`

// StorageSize is the request storage size for dm-master.
// Defaults to "10Gi".
// +optional
StorageSize string `json:"storageSize,omitempty"`

// Subdirectory within the volume to store dm-master Data. By default, the data
// is stored in the root directory of volume which is mounted at
// /var/lib/dm-master.
Expand Down Expand Up @@ -1661,11 +1666,22 @@ type WorkerSpec struct {
// +optional
BaseImage string `json:"baseImage,omitempty"`

// MaxFailoverCount limit the max replicas could be added in failover, 0 means no failover.
// Optional: Defaults to 3
// +kubebuilder:validation:Minimum=0
// +optional
MaxFailoverCount *int32 `json:"maxFailoverCount,omitempty"`

// The storageClassName of the persistent volume for dm-worker data storage.
// Defaults to Kubernetes default storage class.
// +optional
StorageClassName *string `json:"storageClassName,omitempty"`

// StorageSize is the request storage size for dm-worker.
// Defaults to "10Gi".
// +optional
StorageSize string `json:"storageSize,omitempty"`

// Subdirectory within the volume to store dm-worker Data. By default, the data
// is stored in the root directory of volume which is mounted at
// /var/lib/dm-worker.
Expand Down
13 changes: 10 additions & 3 deletions pkg/apis/pingcap/v1alpha1/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func ValidateDMCluster(dc *v1alpha1.DMCluster) field.ErrorList {
// validate metadata
fldPath := field.NewPath("metadata")
// validate metadata/annotations
allErrs = append(allErrs, validateAnnotations(dc.ObjectMeta.Annotations, fldPath.Child("annotations"))...)
allErrs = append(allErrs, validateDMAnnotations(dc.ObjectMeta.Annotations, fldPath.Child("annotations"))...)
// validate spec
allErrs = append(allErrs, validateDMClusterSpec(&dc.Spec, field.NewPath("spec"))...)
return allErrs
Expand All @@ -77,6 +77,15 @@ func validateAnnotations(anns map[string]string, fldPath *field.Path) field.Erro
return allErrs
}

func validateDMAnnotations(anns map[string]string, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, apivalidation.ValidateAnnotations(anns, fldPath)...)
for _, key := range []string{label.AnnDMMasterDeleteSlots, label.AnnDMWorkerDeleteSlots} {
allErrs = append(allErrs, validateDeleteSlots(anns, key, fldPath.Child(key))...)
}
return allErrs
}

func validateTiDBClusterSpec(spec *v1alpha1.TidbClusterSpec, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
if spec.PD != nil {
Expand Down Expand Up @@ -223,14 +232,12 @@ func validateDMClusterSpec(spec *v1alpha1.DMClusterSpec, fldPath *field.Path) fi
func validateMasterSpec(spec *v1alpha1.MasterSpec, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, validateComponentSpec(&spec.ComponentSpec, fldPath)...)
allErrs = append(allErrs, validateRequestsStorage(spec.ResourceRequirements.Requests, fldPath)...)
return allErrs
}

func validateWorkerSpec(spec *v1alpha1.WorkerSpec, fldPath *field.Path) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, validateComponentSpec(&spec.ComponentSpec, fldPath)...)
allErrs = append(allErrs, validateRequestsStorage(spec.ResourceRequirements.Requests, fldPath)...)
return allErrs
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func DMMasterPeerMemberName(clusterName string) string {

// DMWorkerPeerMemberName returns dm-worker peer service name
func DMWorkerPeerMemberName(clusterName string) string {
return fmt.Sprintf("%s-dm-worker-peer", clusterName)
return fmt.Sprintf("%s-dm-worker", clusterName)
}

// AnnProm adds annotations for prometheus scraping metrics
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/dmcluster/dm_cluster_condition_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func (u *dmClusterConditionUpdater) updateReadyCondition(dc *v1alpha1.DMCluster)
case !dc.MasterAllMembersReady():
reason = utildmcluster.MasterUnhealthy
message = "dm-master(s) are not healthy"
case !dc.WorkerAllMembersReady():
reason = utildmcluster.MasterUnhealthy
message = "some dm-worker(s) are not up yet"
default:
status = v1.ConditionTrue
reason = utildmcluster.Ready
Expand Down
7 changes: 4 additions & 3 deletions pkg/controller/dmcluster/dm_cluster_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (dcc *defaultDMClusterControl) validate(dc *v1alpha1.DMCluster) bool {
}

func (dcc *defaultDMClusterControl) updateDMCluster(dc *v1alpha1.DMCluster) error {
var errs []error
// TODO: implement reclaimPolicyManager
// syncing all PVs managed by operator's reclaim policy to Retain
// if err := dcc.reclaimPolicyManager.Sync(dc); err != nil {
Expand Down Expand Up @@ -155,7 +156,7 @@ func (dcc *defaultDMClusterControl) updateDMCluster(dc *v1alpha1.DMCluster) erro
// - scale out/in the dm-master cluster
// - failover the dm-master cluster
if err := dcc.masterMemberManager.Sync(dc); err != nil {
return err
errs = append(errs, err)
}

// works that should do to making the dm-worker cluster current state match the desired state:
Expand All @@ -167,7 +168,7 @@ func (dcc *defaultDMClusterControl) updateDMCluster(dc *v1alpha1.DMCluster) erro
// - scale out/in the dm-worker cluster
// - failover the dm-worker cluster
if err := dcc.workerMemberManager.Sync(dc); err != nil {
return err
errs = append(errs, err)
}

// TODO: syncing labels for dm: syncing the labels from Pod to PVC and PV, these labels include:
Expand All @@ -193,5 +194,5 @@ func (dcc *defaultDMClusterControl) updateDMCluster(dc *v1alpha1.DMCluster) erro
// syncing the some tidbcluster status attributes
// - sync tidbmonitor reference
// return dcc.tidbClusterStatusManager.Sync(dc)
return nil
return errorutils.NewAggregate(errs)
}
2 changes: 1 addition & 1 deletion pkg/controller/dmcluster/dm_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (dcc *Controller) resolveDMClusterFromSet(namespace string, set *apps.State

// We can't look up by UID, so look up by Name and then verify UID.
// Don't even try to look up by Name if it's the wrong Kind.
if controllerRef.Kind != controller.ControllerKind.Kind {
if controllerRef.Kind != controller.DMControllerKind.Kind {
return nil
}
dc, err := dcc.dcLister.DMClusters(namespace).Get(controllerRef.Name)
Expand Down
2 changes: 2 additions & 0 deletions pkg/label/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ const (
AnnTiFlashDeleteSlots = "tiflash.tidb.pingcap.com/delete-slots"
// DMMasterDeleteSlots is annotation key of dm-master delete slots.
AnnDMMasterDeleteSlots = "dm-master.tidb.pingcap.com/delete-slots"
// DMWorkerDeleteSlots is annotation key of dm-worker delete slots.
AnnDMWorkerDeleteSlots = "dm-worker.tidb.pingcap.com/delete-slots"

// AnnTiDBLastAutoScalingTimestamp is annotation key of tidbcluster to indicate the last timestamp for tidb auto-scaling
AnnTiDBLastAutoScalingTimestamp = "tidb.tidb.pingcap.com/last-autoscaling-timestamp"
Expand Down
Loading

0 comments on commit 161b862

Please sign in to comment.