diff --git a/tests/actions.go b/tests/actions.go index 308bd6934ed..49f8f642b40 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -14,7 +14,6 @@ package tests import ( - "bytes" "database/sql" "encoding/json" "fmt" @@ -67,8 +66,8 @@ const ( DefaultPollTimeout time.Duration = 10 * time.Minute DefaultPollInterval time.Duration = 10 * time.Second getBackupDirPodName = "get-backup-dir" - grafanaUsername = "admin" - grafanaPassword = "admin" + grafanaUsername = "admin" + grafanaPassword = "admin" ) type OperatorActions interface { @@ -131,9 +130,12 @@ type TidbClusterInfo struct { Args map[string]string blockWriter *blockwriter.BlockWriterCase Monitor bool + UserName string + InitSecretName string + BackupSecretName string } -func (tc *TidbClusterInfo) HelmSetString() string { +func (tc *TidbClusterInfo) HelmSetString(m map[string]string) string { set := map[string]string{ "clusterName": tc.ClusterName, @@ -145,9 +147,10 @@ func (tc *TidbClusterInfo) HelmSetString() string { "pd.image": tc.PDImage, "tikv.image": tc.TiKVImage, "tidb.image": tc.TiDBImage, - "tidb.passwordSecretName": "set-secret", + "tidb.passwordSecretName": tc.InitSecretName, "tidb.initSql": tc.InitSql, "monitor.create": strconv.FormatBool(tc.Monitor), + "secretName": tc.BackupSecretName, } for k, v := range tc.Resources { @@ -156,6 +159,9 @@ func (tc *TidbClusterInfo) HelmSetString() string { for k, v := range tc.Args { set[k] = v } + for k, v := range m { + set[k] = v + } arr := make([]string, 0, len(set)) for k, v := range set { @@ -224,8 +230,14 @@ func (oa *operatorActions) DeployTidbCluster(info *TidbClusterInfo) error { defer func() { glog.Infof("deploy tidb cluster end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace) }() + + err := oa.CreateSecret(info) + if err != nil { + return fmt.Errorf("failed to create secret of cluster [%s]: %v", info.ClusterName, err) + } + cmd := fmt.Sprintf("helm install /charts/%s/tidb-cluster --name %s --namespace %s --set-string %s", - info.OperatorTag, info.ClusterName, info.Namespace, info.HelmSetString()) + info.OperatorTag, info.ClusterName, info.Namespace, info.HelmSetString(nil)) if res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput(); err != nil { return fmt.Errorf("failed to deploy tidbcluster: %s/%s, %v, %s", info.Namespace, info.ClusterName, err, string(res)) @@ -393,7 +405,7 @@ func chartPath(name string, tag string) string { func (oa *operatorActions) ScaleTidbCluster(info *TidbClusterInfo) error { cmd := fmt.Sprintf("helm upgrade %s %s --set-string %s", - info.ClusterName, chartPath("tidb-cluster", info.OperatorTag), info.HelmSetString()) + info.ClusterName, chartPath("tidb-cluster", info.OperatorTag), info.HelmSetString(nil)) glog.Info("[SCALE] " + cmd) res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() if err != nil { @@ -404,7 +416,7 @@ func (oa *operatorActions) ScaleTidbCluster(info *TidbClusterInfo) error { func (oa *operatorActions) UpgradeTidbCluster(info *TidbClusterInfo) error { cmd := fmt.Sprintf("helm upgrade %s %s --set-string %s", - info.ClusterName, chartPath("tidb-cluster", info.OperatorTag), info.HelmSetString()) + info.ClusterName, chartPath("tidb-cluster", info.OperatorTag), info.HelmSetString(nil)) glog.Info("[UPGRADE] " + cmd) res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() if err != nil { @@ -1037,26 +1049,18 @@ func (oa *operatorActions) DeployAdHocBackup(info *TidbClusterInfo) error { glog.Infof("deploy adhoc backup end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace) }() sets := map[string]string{ - "clusterName": info.ClusterName, "name": info.BackupPVC, "mode": "backup", "user": "root", "password": info.Password, "storage.size": "10Gi", } - var buffer bytes.Buffer - for k, v := range sets { - set := fmt.Sprintf(" --set %s=%s", k, v) - _, err := buffer.WriteString(set) - if err != nil { - return err - } - } - setStr := buffer.String() + setString := info.HelmSetString(sets) + fullbackupName := fmt.Sprintf("%s-backup", info.ClusterName) - cmd := fmt.Sprintf("helm install -n %s --namespace %s /charts/%s/tidb-backup %s", - fullbackupName, info.Namespace, info.OperatorTag, setStr) + cmd := fmt.Sprintf("helm install -n %s --namespace %s /charts/%s/tidb-backup --set-string %s", + fullbackupName, info.Namespace, info.OperatorTag, setString) glog.Infof("install adhoc deployment [%s]", cmd) res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() if err != nil { @@ -1101,26 +1105,18 @@ func (oa *operatorActions) Restore(from *TidbClusterInfo, to *TidbClusterInfo) e glog.Infof("deploy restore end cluster[%s] namespace[%s]", to.ClusterName, to.Namespace) }() sets := map[string]string{ - "clusterName": to.ClusterName, "name": to.BackupPVC, "mode": "restore", "user": "root", "password": to.Password, "storage.size": "10Gi", } - var buffer bytes.Buffer - for k, v := range sets { - set := fmt.Sprintf(" --set %s=%s", k, v) - _, err := buffer.WriteString(set) - if err != nil { - return err - } - } - setStr := buffer.String() + setString := to.HelmSetString(sets) + restoreName := fmt.Sprintf("%s-restore", from.ClusterName) - cmd := fmt.Sprintf("helm install -n %s --namespace %s /charts/%s/tidb-backup %s", - restoreName, to.Namespace, to.OperatorTag, setStr) + cmd := fmt.Sprintf("helm install -n %s --namespace %s /charts/%s/tidb-backup --set-string %s", + restoreName, to.Namespace, to.OperatorTag, setString) glog.Infof("install restore [%s]", cmd) res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() if err != nil { @@ -1213,31 +1209,29 @@ func (info *TidbClusterInfo) QueryCount() (int, error) { } func (oa *operatorActions) CreateSecret(info *TidbClusterInfo) error { - initSecretName := "set-secret" - backupSecretName := "backup-secret" initSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ - Name: initSecretName, + Name: info.InitSecretName, Namespace: info.Namespace, }, Data: map[string][]byte{ - "root": []byte(info.Password), + info.UserName: []byte(info.Password), }, Type: corev1.SecretTypeOpaque, } _, err := oa.kubeCli.CoreV1().Secrets(info.Namespace).Create(&initSecret) if err != nil && !releaseIsExist(err) { - return err + return err } backupSecret := corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ - Name: backupSecretName, + Name: info.BackupSecretName, Namespace: info.Namespace, }, Data: map[string][]byte{ - "user": []byte("root"), + "user": []byte(info.UserName), "password": []byte(info.Password), }, Type: corev1.SecretTypeOpaque, @@ -1270,19 +1264,11 @@ func (oa *operatorActions) DeployScheduledBackup(info *TidbClusterInfo) error { "scheduledBackup.schedule": cron, "scheduledBackup.storage": "10Gi", } - var buffer bytes.Buffer - for k, v := range sets { - set := fmt.Sprintf(" --set %s=%s", k, v) - _, err := buffer.WriteString(set) - if err != nil { - return err - } - } - setStr := buffer.String() + setString := info.HelmSetString(sets) - cmd := fmt.Sprintf("helm upgrade %s /charts/%s/tidb-cluster %s", - info.ClusterName, info.OperatorTag, setStr) + cmd := fmt.Sprintf("helm upgrade %s /charts/%s/tidb-cluster --set-string %s", + info.ClusterName, info.OperatorTag, setString) glog.Infof("scheduled-backup delploy [%s]", cmd) res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() @@ -1437,10 +1423,12 @@ func (oa *operatorActions) getBackupDir(info *TidbClusterInfo) ([]string, error) } cmd := fmt.Sprintf("kubectl exec %s -n %s ls /data", getBackupDirPodName, info.Namespace) - glog.Infof(cmd) + + time.Sleep(20 * time.Second) + res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() if err != nil { - glog.Errorf("cluster:[%s/%s] exec :%s failed,error:%v,result:%s", info.Namespace, info.ClusterName, cmd, err, res) + glog.Errorf("cluster:[%s/%s] exec :%s failed,error:%v,result:%s", info.Namespace, info.ClusterName, cmd, err, string(res)) return nil, err } @@ -1454,9 +1442,174 @@ func (info *TidbClusterInfo) FullName() string { } func (oa *operatorActions) DeployIncrementalBackup(from *TidbClusterInfo, to *TidbClusterInfo) error { + glog.Infof("begin to deploy incremental backup cluster[%s] namespace[%s]", from.ClusterName, from.Namespace) + defer func() { + glog.Infof("deploy incremental backup end cluster[%s] namespace[%s]", to.ClusterName, to.Namespace) + }() + sets := map[string]string{ + "binlog.pump.create": "true", + "binlog.drainer.destDBType": "mysql", + "binlog.drainer.create": "true", + "binlog.drainer.mysql.host": fmt.Sprintf("%s-tidb.%s", to.ClusterName, to.Namespace), + "binlog.drainer.mysql.user": "root", + "binlog.drainer.mysql.password": to.Password, + "binlog.drainer.mysql.port": "4000", + } + + setString := from.HelmSetString(sets) + + cmd := fmt.Sprintf("helm upgrade %s /charts/%s/tidb-cluster --set-string %s", + from.ClusterName, from.OperatorTag, setString) + glog.Infof(cmd) + res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() + if err != nil { + return fmt.Errorf("failed to launch scheduler backup job: %v, %s", err, string(res)) + } return nil } func (oa *operatorActions) CheckIncrementalBackup(info *TidbClusterInfo) error { + glog.Infof("begin to check incremental backup cluster[%s] namespace[%s]", info.ClusterName, info.Namespace) + defer func() { + glog.Infof("check incremental backup end cluster[%s] namespace[%s]", info.ClusterName, info.Namespace) + }() + + pumpStatefulSetName := fmt.Sprintf("%s-pump", info.ClusterName) + fn := func() (bool, error) { + pumpStatefulSet, err := oa.kubeCli.AppsV1().StatefulSets(info.Namespace).Get(pumpStatefulSetName, metav1.GetOptions{}) + if err != nil { + glog.Errorf("failed to get jobs %s ,%v", pumpStatefulSetName, err) + return false, nil + } + if pumpStatefulSet.Status.Replicas != pumpStatefulSet.Status.ReadyReplicas { + glog.Errorf("pump replicas is not ready, please wait ! %s ", pumpStatefulSetName) + return false, nil + } + + listOps := metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet( + pumpStatefulSet.Labels, + ).String(), + } + + pods, err := oa.kubeCli.CoreV1().Pods(info.Namespace).List(listOps) + if err != nil { + glog.Errorf("failed to get pods via pump labels %s ,%v", pumpStatefulSetName, err) + return false, nil + } + + for _, pod := range pods.Items { + if !oa.pumpHealth(info, pod.Spec.Hostname) { + glog.Errorf("some pods is not health %s ,%v", pumpStatefulSetName, err) + return false, nil + } + } + + drainerStatefulSetName := fmt.Sprintf("%s-drainer", info.ClusterName) + drainerStatefulSet, err := oa.kubeCli.AppsV1().StatefulSets(info.Namespace).Get(drainerStatefulSetName, metav1.GetOptions{}) + if err != nil { + glog.Errorf("failed to get jobs %s ,%v", pumpStatefulSetName, err) + return false, nil + } + if drainerStatefulSet.Status.Replicas != drainerStatefulSet.Status.ReadyReplicas { + glog.Errorf("drainer replicas is not ready, please wait ! %s ", pumpStatefulSetName) + return false, nil + } + + listOps = metav1.ListOptions{ + LabelSelector: labels.SelectorFromSet( + drainerStatefulSet.Labels, + ).String(), + } + + pods, err = oa.kubeCli.CoreV1().Pods(info.Namespace).List(listOps) + if err != nil { + return false, nil + } + for _, pod := range pods.Items { + if !oa.drainerHealth(info, pod.Spec.Hostname) { + return false, nil + } + } + + return true, nil + } + + err := wait.Poll(DefaultPollInterval, DefaultPollTimeout, fn) + if err != nil { + return fmt.Errorf("failed to launch scheduler backup job: %v", err) + } return nil + +} + +type pumpStatus struct { + StatusMap map[string]*nodeStatus +} + +type nodeStatus struct { + State string `json:"state"` +} + +func (oa *operatorActions) pumpHealth(info *TidbClusterInfo, hostName string) bool { + pumpHealthUrl := fmt.Sprintf("%s.%s-pump.%s:8250/status", hostName, info.ClusterName, info.Namespace) + res, err := http.Get(pumpHealthUrl) + if err != nil { + glog.Errorf("cluster:[%s] call %s failed,error:%v", info.ClusterName, pumpHealthUrl, err) + return false + } + if res.StatusCode >= 400 { + glog.Errorf("Error response %v", res.StatusCode) + return false + } + body, err := ioutil.ReadAll(res.Body) + if err != nil { + glog.Errorf("cluster:[%s] read response body failed,error:%v", info.ClusterName, err) + return false + } + healths := pumpStatus{} + err = json.Unmarshal(body, &healths) + if err != nil { + glog.Errorf("cluster:[%s] unmarshal failed,error:%v", info.ClusterName, err) + return false + } + for _, status := range healths.StatusMap { + if status.State != "online" { + glog.Errorf("cluster:[%s] pump's state is not online", info.ClusterName) + return false + } + } + return true +} + +type drainerStatus struct { + PumpPos map[string]int64 `json:"PumpPos"` + Synced bool `json:"Synced"` + LastTS int64 `json:"LastTS"` + TsMap string `json:"TsMap"` +} + +func (oa *operatorActions) drainerHealth(info *TidbClusterInfo, hostName string) bool { + drainerHealthUrl := fmt.Sprintf("%s.%s-drainer.%s:8249/status", hostName, info.ClusterName, info.Namespace) + res, err := http.Get(drainerHealthUrl) + if err != nil { + glog.Errorf("cluster:[%s] call %s failed,error:%v", info.ClusterName, drainerHealthUrl, err) + return false + } + if res.StatusCode >= 400 { + glog.Errorf("Error response %v", res.StatusCode) + return false + } + body, err := ioutil.ReadAll(res.Body) + if err != nil { + glog.Errorf("cluster:[%s] read response body failed,error:%v", info.ClusterName, err) + return false + } + healths := drainerStatus{} + err = json.Unmarshal(body, &healths) + if err != nil { + glog.Errorf("cluster:[%s] unmarshal failed,error:%v", info.ClusterName, err) + return false + } + return len(healths.PumpPos) > 0 && healths.Synced } diff --git a/tests/backup/backupcase.go b/tests/backup/backupcase.go index dea051e9a4f..3510164e0b5 100644 --- a/tests/backup/backupcase.go +++ b/tests/backup/backupcase.go @@ -14,6 +14,9 @@ package backup import ( + "fmt" + "time" + "github.com/golang/glog" "github.com/pingcap/tidb-operator/tests" ) @@ -78,5 +81,40 @@ func (bc *BackupCase) Run() error { return err } + err = bc.operator.DeployIncrementalBackup(bc.srcCluster, bc.desCluster) + if err != nil { + return err + } + + err = bc.operator.CheckIncrementalBackup(bc.srcCluster) + if err != nil { + return err + } + + glog.Infof("waiting 1 minutes for binlog to work") + time.Sleep(1 * time.Minute) + + glog.Infof("cluster[%s] begin insert data") + go bc.operator.BeginInsertDataTo(bc.srcCluster) + + time.Sleep(30 * time.Second) + + glog.Infof("cluster[%s] stop insert data") + bc.operator.StopInsertDataTo(bc.srcCluster) + + time.Sleep(5 * time.Second) + + srcCount, err := bc.srcCluster.QueryCount() + if err != nil { + return err + } + desCount, err := bc.desCluster.QueryCount() + if err != nil { + return err + } + if srcCount != desCount { + return fmt.Errorf("cluster:[%s] the src cluster data[%d] is not equals des cluster data[%d]", bc.srcCluster.FullName(), srcCount, desCount) + } + return nil } diff --git a/tests/cmd/e2e/main.go b/tests/cmd/e2e/main.go index 90f17408912..313c89c2510 100644 --- a/tests/cmd/e2e/main.go +++ b/tests/cmd/e2e/main.go @@ -95,6 +95,9 @@ func main() { StorageClassName: "local-storage", Password: "admin", InitSql: initSql, + UserName: "root", + InitSecretName: "demo-set-secret", + BackupSecretName: "demo-backup-secret", Resources: map[string]string{ "pd.resources.limits.cpu": "1000m", "pd.resources.limits.memory": "2Gi", @@ -187,6 +190,9 @@ func main() { StorageClassName: "local-storage", Password: "admin", InitSql: initSql, + UserName: "root", + InitSecretName: "demo2-set-secret", + BackupSecretName: "demo2-backup-secret", Resources: map[string]string{ "pd.resources.limits.cpu": "1000m", "pd.resources.limits.memory": "2Gi",