diff --git a/tests/actions.go b/tests/actions.go index ddbc3774b6c..35d6d521815 100644 --- a/tests/actions.go +++ b/tests/actions.go @@ -107,6 +107,8 @@ type OperatorActions interface { CreateSecret(info *TidbClusterConfig) error GetPodUIDMap(info *TidbClusterConfig) (map[string]types.UID, error) GetNodeMap(info *TidbClusterConfig, component string) (map[string][]string, error) + TruncateSSTFileThenCheckFailover(info *TidbClusterConfig, tikvFailoverPeriod time.Duration) error + TruncateSSTFileThenCheckFailoverOrDie(info *TidbClusterConfig, tikvFailoverPeriod time.Duration) CheckFailoverPending(info *TidbClusterConfig, faultPoint *time.Time) (bool, error) CheckFailoverPendingOrDie(clusters []*TidbClusterConfig, faultPoint *time.Time) CheckFailover(info *TidbClusterConfig, faultNode string) (bool, error) @@ -1331,7 +1333,7 @@ func cloneOperatorRepo() error { cmd := fmt.Sprintf("git clone https://github.com/pingcap/tidb-operator.git /tidb-operator") glog.Info(cmd) res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput() - if err != nil { + if err != nil && !strings.Contains(string(res), "already exists") { return fmt.Errorf("failed to clone tidb-operator repository: %v, %s", err, string(res)) } diff --git a/tests/cmd/stability/main.go b/tests/cmd/stability/main.go index e26177b88d1..df2f5b32d1d 100644 --- a/tests/cmd/stability/main.go +++ b/tests/cmd/stability/main.go @@ -214,5 +214,8 @@ func main() { oa.CheckTidbClusterStatusOrDie(cluster) } + // truncate a sst file and check failover + oa.TruncateSSTFileThenCheckFailoverOrDie(cluster1, 5*time.Minute) + glog.Infof("\nFinished.") } diff --git a/tests/failover.go b/tests/failover.go index 59aa87cce44..20d7538ce4a 100644 --- a/tests/failover.go +++ b/tests/failover.go @@ -4,13 +4,17 @@ import ( "fmt" "sort" "strings" + "syscall" "time" _ "github.com/go-sql-driver/mysql" "github.com/golang/glog" + "github.com/pingcap/errors" "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" "github.com/pingcap/tidb-operator/pkg/label" "github.com/pingcap/tidb-operator/tests/pkg/client" + "github.com/pingcap/tidb-operator/tests/pkg/ops" + "github.com/pingcap/tidb-operator/tests/pkg/util" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -18,6 +22,137 @@ import ( "k8s.io/apimachinery/pkg/util/wait" ) +func (oa *operatorActions) TruncateSSTFileThenCheckFailover(info *TidbClusterConfig, tikvFailoverPeriod time.Duration) error { + const failoverTimeout = 5 * time.Minute + + cli := client.Union(oa.kubeCli, oa.cli) + tikvOps := ops.TiKVOps{ClientOps: ops.ClientOps{Client: cli}} + + // checkout latest tidb cluster + tc, err := cli.PingcapV1alpha1().TidbClusters(info.Namespace).Get(info.ClusterName, metav1.GetOptions{}) + if err != nil { + glog.Errorf("failed to get the cluster: ns=%s tc=%s err=%s", info.Namespace, info.ClusterName, err.Error()) + return err + } + countUpStores := func(tc *v1alpha1.TidbCluster) int { + cnt := 0 + for _, s := range tc.Status.TiKV.Stores { + if s.State == v1alpha1.TiKVStateUp { + cnt++ + } + } + return cnt + } + + origFailures := len(tc.Status.TiKV.FailureStores) + origUps := countUpStores(tc) + + // checkout pd config + pdCfg, err := oa.pdControl.GetPDClient(tc).GetConfig() + if err != nil { + glog.Errorf("failed to get the pd config: tc=%s err=%s", info.ClusterName, err.Error()) + return err + } + maxStoreDownTime := pdCfg.Schedule.MaxStoreDownTime.Duration + glog.Infof("failover config: maxStoreDownTime=%v tikvFailoverPeriod=%v", maxStoreDownTime, tikvFailoverPeriod) + + // find an up store + var store v1alpha1.TiKVStore + for _, v := range tc.Status.TiKV.Stores { + if v.State != v1alpha1.TiKVStateUp { + continue + } + store = v + break + } + if len(store.ID) == 0 { + glog.Errorf("failed to find an up store") + return errors.New("no up store for truncating sst file") + } else { + glog.Infof("target store: id=%s pod=%s", store.ID, store.PodName) + } + + // checkout pod status + podBeforeRestart, err := cli.CoreV1().Pods(info.Namespace).Get(store.PodName, metav1.GetOptions{}) + if err != nil { + glog.Errorf("failed to get target pod: pod=%s err=%s", store.PodName, err.Error()) + return err + } + + var rc int32 + if c := util.GetContainerStatusFromPod(podBeforeRestart, func(status corev1.ContainerStatus) bool { + return status.Name == "tikv" + }); c != nil { + rc = c.RestartCount + } else { + glog.Errorf("failed to get container status from tikv pod") + return errors.New("failed to get container status from tikv pod") + } + + // restart tikv to ensure sst files + err = tikvOps.KillProcess(info.Namespace, store.PodName, "tikv", 1, syscall.SIGTERM) + if err != nil { + glog.Errorf("kill tikv: pod=%s err=%s", store.PodName, err.Error()) + return err + } + + err = tikvOps.PollPod(info.Namespace, store.PodName, + func(pod *corev1.Pod, err error) (bool, error) { + if pod == nil { + glog.Warningf("pod is nil: err=%s", err.Error()) + return false, nil + } + tikv := util.GetContainerStatusFromPod(pod, func(status corev1.ContainerStatus) bool { + return status.Name == "tikv" + }) + + if pod.Status.Phase == corev1.PodRunning && tikv != nil && tikv.RestartCount > rc { + return true, nil + } + return false, nil + }) + if err != nil { + glog.Errorf("tikv process hasn't been restarted: err=%s", err.Error()) + return err + } + + // truncate the sst file and wait for failover + err = tikvOps.TruncateSSTFile(ops.TruncateOptions{ + Namespace: info.Namespace, + Cluster: info.ClusterName, + Store: store.ID, + }) + + // make tikv crash + err = tikvOps.KillProcess(info.Namespace, store.PodName, "tikv", 1, syscall.SIGTERM) + if err != nil { + glog.Errorf("kill tikv: pod=%s err=%s", store.PodName, err.Error()) + return err + } + + tikvOps.SetPoll(DefaultPollInterval, maxStoreDownTime+tikvFailoverPeriod+failoverTimeout) + + return tikvOps.PollTiDBCluster(info.Namespace, info.ClusterName, + func(tc *v1alpha1.TidbCluster, err error) (bool, error) { + glog.Infof("check failure stores: current=%d origin=%d", len(tc.Status.TiKV.FailureStores), origFailures) + if len(tc.Status.TiKV.FailureStores) <= origFailures { + return false, nil + } + ups := countUpStores(tc) + glog.Infof("check up stores: current=%d origin=%d", ups, origUps) + if ups < origUps { + return false, nil + } + return true, nil + }) +} + +func (oa *operatorActions) TruncateSSTFileThenCheckFailoverOrDie(info *TidbClusterConfig, tikvFailoverPeriod time.Duration) { + if err := oa.TruncateSSTFileThenCheckFailover(info, tikvFailoverPeriod); err != nil { + panic(err) + } +} + func (oa *operatorActions) CheckFailoverPending(info *TidbClusterConfig, faultPoint *time.Time) (bool, error) { tc, err := oa.cli.PingcapV1alpha1().TidbClusters(info.Namespace).Get(info.ClusterName, metav1.GetOptions{}) if err != nil { diff --git a/tests/pkg/client/client.go b/tests/pkg/client/client.go index ec2caa63c78..9c9e6f4354e 100644 --- a/tests/pkg/client/client.go +++ b/tests/pkg/client/client.go @@ -3,9 +3,12 @@ package client import ( "time" + "github.com/juju/errors" "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned" + "github.com/pingcap/tidb-operator/pkg/client/clientset/versioned/typed/pingcap.com/v1alpha1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" ) func NewCliOrDie() (versioned.Interface, kubernetes.Interface) { @@ -27,3 +30,47 @@ func NewCliOrDie() (versioned.Interface, kubernetes.Interface) { return cli, kubeCli } + +var ( + masterUrl string + kubeconfigPath string +) + +type Client interface { + kubernetes.Interface + PingcapV1alpha1() v1alpha1.PingcapV1alpha1Interface +} + +func Union(kube kubernetes.Interface, tidb versioned.Interface) Client { + return &client{Interface: kube, pingcap: tidb} +} + +func NewOrDie() Client { + cfg, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath) + if err != nil { + panic(err) + } + return Union(kubernetes.NewForConfigOrDie(cfg), versioned.NewForConfigOrDie(cfg)) +} + +type client struct { + kubernetes.Interface + pingcap versioned.Interface +} + +func (cli *client) PingcapV1alpha1() v1alpha1.PingcapV1alpha1Interface { + return cli.pingcap.PingcapV1alpha1() +} + +func SetConfigPath(path string) { + kubeconfigPath = path +} + +func SetMasterURL(url string) { + masterUrl = url +} + +func LoadConfig() (*rest.Config, error) { + cfg, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath) + return cfg, errors.Trace(err) +} diff --git a/tests/pkg/ops/common.go b/tests/pkg/ops/common.go new file mode 100644 index 00000000000..69239a81137 --- /dev/null +++ b/tests/pkg/ops/common.go @@ -0,0 +1,81 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package ops + +import ( + "time" + + "github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1" + "github.com/pingcap/tidb-operator/tests/pkg/client" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" +) + +const ( + DefaultPollInterval = 10 * time.Second + DefaultPollTimeout = 10 * time.Minute +) + +type PollFn func(time.Duration, time.Duration, wait.ConditionFunc) error + +type ClientOps struct { + client.Client + + PollInterval *time.Duration + PollTimeout *time.Duration +} + +func (cli *ClientOps) pollArgs(cond wait.ConditionFunc) (time.Duration, time.Duration, wait.ConditionFunc) { + interval := DefaultPollInterval + if cli.PollInterval != nil { + interval = *cli.PollInterval + } + timeout := DefaultPollTimeout + if cli.PollTimeout != nil { + timeout = *cli.PollTimeout + } + return interval, timeout, cond +} + +func (cli *ClientOps) SetPoll(interval time.Duration, timeout time.Duration) { + cli.PollInterval = &interval + cli.PollTimeout = &timeout +} + +func (cli *ClientOps) Poll(cond wait.ConditionFunc) error { + return wait.Poll(cli.pollArgs(cond)) +} + +func (cli *ClientOps) PollImmediate(cond wait.ConditionFunc) error { + return wait.PollImmediate(cli.pollArgs(cond)) +} + +func (cli *ClientOps) PollPod(ns string, name string, cond func(po *corev1.Pod, err error) (bool, error)) error { + return cli.Poll(func() (done bool, err error) { + return cond(cli.CoreV1().Pods(ns).Get(name, metav1.GetOptions{})) + }) +} + +func (cli *ClientOps) PollStatefulSet(ns string, name string, cond func(ss *appsv1.StatefulSet, err error) (bool, error)) error { + return cli.Poll(func() (done bool, err error) { + return cond(cli.AppsV1().StatefulSets(ns).Get(name, metav1.GetOptions{})) + }) +} + +func (cli *ClientOps) PollTiDBCluster(ns string, name string, cond func(tc *v1alpha1.TidbCluster, err error) (bool, error)) error { + return cli.Poll(func() (done bool, err error) { + return cond(cli.PingcapV1alpha1().TidbClusters(ns).Get(name, metav1.GetOptions{})) + }) +} diff --git a/tests/pkg/ops/exec.go b/tests/pkg/ops/exec.go new file mode 100644 index 00000000000..a904d9697a5 --- /dev/null +++ b/tests/pkg/ops/exec.go @@ -0,0 +1,118 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package ops + +import ( + "bytes" + "io" + "net/url" + "strconv" + "strings" + "syscall" + + "github.com/golang/glog" + "github.com/pingcap/errors" + "github.com/pingcap/tidb-operator/tests/pkg/client" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/remotecommand" +) + +// ExecOptions passed to ExecWithOptions +type ExecOptions struct { + Command []string + + Namespace string + PodName string + ContainerName string + + Stdin io.Reader + CaptureStdout bool + CaptureStderr bool + // If false, whitespace in std{err,out} will be removed. + PreserveWhitespace bool +} + +// ExecWithOptions executes a command in the specified container, +// returning stdout, stderr and error. `options` allowed for +// additional parameters to be passed. +func (cli *ClientOps) ExecWithOptions(options ExecOptions) (string, string, error) { + glog.Infof("ExecWithOptions %+v", options) + + config, err := client.LoadConfig() + if err != nil { + return "", "", err + } + + const tty = false + + req := cli.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(options.PodName). + Namespace(options.Namespace). + SubResource("exec"). + Param("container", options.ContainerName) + req.VersionedParams(&corev1.PodExecOptions{ + Container: options.ContainerName, + Command: options.Command, + Stdin: options.Stdin != nil, + Stdout: options.CaptureStdout, + Stderr: options.CaptureStderr, + TTY: tty, + }, codec) + + var stdout, stderr bytes.Buffer + err = execute("POST", req.URL(), config, options.Stdin, &stdout, &stderr, tty) + + if options.PreserveWhitespace { + return stdout.String(), stderr.String(), err + } + return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), err +} + +func (cli *ClientOps) KillProcess(ns string, pod string, container string, pid int, sig syscall.Signal) error { + _, _, err := cli.ExecWithOptions(ExecOptions{ + Command: []string{"kill", "-" + strconv.Itoa(int(sig)), strconv.Itoa(pid)}, + Namespace: ns, + PodName: pod, + ContainerName: container, + CaptureStderr: true, + CaptureStdout: true, + }) + return err +} + +func execute(method string, url *url.URL, config *rest.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error { + exec, err := remotecommand.NewSPDYExecutor(config, method, url) + if err != nil { + return errors.Trace(err) + } + return errors.Trace(exec.Stream(remotecommand.StreamOptions{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + })) +} + +var ( + scheme *runtime.Scheme + codec runtime.ParameterCodec +) + +func init() { + scheme = runtime.NewScheme() + corev1.AddToScheme(scheme) + codec = runtime.NewParameterCodec(scheme) +} diff --git a/tests/pkg/ops/tikv.go b/tests/pkg/ops/tikv.go new file mode 100644 index 00000000000..0e06e264cd7 --- /dev/null +++ b/tests/pkg/ops/tikv.go @@ -0,0 +1,94 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package ops + +import ( + "strings" + + "github.com/golang/glog" + "github.com/pingcap/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +type TruncateOptions struct { + Namespace string + Cluster string + Store string +} + +type TiKVOps struct { + ClientOps +} + +func (ops *TiKVOps) TruncateSSTFile(opts TruncateOptions) error { + glog.Infof("truncate sst option: %+v", opts) + + tc, err := ops.PingcapV1alpha1().TidbClusters(opts.Namespace).Get(opts.Cluster, metav1.GetOptions{}) + if err != nil { + return errors.Trace(err) + } + store, ok := tc.Status.TiKV.Stores[opts.Store] + if !ok { + return errors.New("no such store") + } + + exec := func(cmd ...string) (string, string, error) { + return ops.ExecWithOptions(ExecOptions{ + Command: cmd, + Namespace: opts.Namespace, + PodName: store.PodName, + ContainerName: "tikv", + CaptureStderr: true, + CaptureStdout: true, + }) + } + + stdout, stderr, err := exec("find", "/var/lib/tikv/db", "-name", "*.sst", "-o", "-name", "*.save") + if err != nil { + glog.Errorf("list sst files: stderr=%s err=%s", stderr, err.Error()) + return errors.Annotate(err, "list sst files") + } + + sstCandidates := make(map[string]bool) + + for _, f := range strings.Split(stdout, "\n") { + f = strings.TrimSpace(f) + if len(f) > 0 { + sstCandidates[f] = true + } + } + + sst := "" + for k := range sstCandidates { + if strings.HasSuffix(k, ".sst") && !sstCandidates[k+".save"] { + sst = k + } + } + if len(sst) == 0 { + return errors.New("cannot find a sst file") + } + + _, stderr, err = exec("cp", sst, sst+".save") + if err != nil { + glog.Errorf("backup sst file: stderr=%s err=%s", stderr, err.Error()) + return errors.Annotate(err, "backup sst file") + } + + _, stderr, err = exec("truncate", "-s", "0", sst) + if err != nil { + glog.Errorf("truncate sst file: stderr=%s err=%s", stderr, err.Error()) + return errors.Annotate(err, "truncate sst file") + } + + return nil +} diff --git a/tests/pkg/util/misc.go b/tests/pkg/util/misc.go new file mode 100644 index 00000000000..0540924bb78 --- /dev/null +++ b/tests/pkg/util/misc.go @@ -0,0 +1,29 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +package util + +import ( + corev1 "k8s.io/api/core/v1" +) + +func GetContainerStatusFromPod(pod *corev1.Pod, cond func(corev1.ContainerStatus) bool) *corev1.ContainerStatus { + if pod == nil { + return nil + } + for _, c := range pod.Status.ContainerStatuses { + if cond(c) { + return &c + } + } + return nil +}