Skip to content

Commit

Permalink
stability: add sst-file-corruption case (#382)
Browse files Browse the repository at this point in the history
* stability: add a demo sst-corruption case
  • Loading branch information
zyguan authored and weekface committed Apr 15, 2019
1 parent 3a60132 commit c683ac2
Show file tree
Hide file tree
Showing 8 changed files with 510 additions and 1 deletion.
4 changes: 3 additions & 1 deletion tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type OperatorActions interface {
CreateSecret(info *TidbClusterConfig) error
GetPodUIDMap(info *TidbClusterConfig) (map[string]types.UID, error)
GetNodeMap(info *TidbClusterConfig, component string) (map[string][]string, error)
TruncateSSTFileThenCheckFailover(info *TidbClusterConfig, tikvFailoverPeriod time.Duration) error
TruncateSSTFileThenCheckFailoverOrDie(info *TidbClusterConfig, tikvFailoverPeriod time.Duration)
CheckFailoverPending(info *TidbClusterConfig, faultPoint *time.Time) (bool, error)
CheckFailoverPendingOrDie(clusters []*TidbClusterConfig, faultPoint *time.Time)
CheckFailover(info *TidbClusterConfig, faultNode string) (bool, error)
Expand Down Expand Up @@ -1331,7 +1333,7 @@ func cloneOperatorRepo() error {
cmd := fmt.Sprintf("git clone https://github.com/pingcap/tidb-operator.git /tidb-operator")
glog.Info(cmd)
res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
if err != nil {
if err != nil && !strings.Contains(string(res), "already exists") {
return fmt.Errorf("failed to clone tidb-operator repository: %v, %s", err, string(res))
}

Expand Down
3 changes: 3 additions & 0 deletions tests/cmd/stability/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,5 +214,8 @@ func main() {
oa.CheckTidbClusterStatusOrDie(cluster)
}

// truncate a sst file and check failover
oa.TruncateSSTFileThenCheckFailoverOrDie(cluster1, 5*time.Minute)

glog.Infof("\nFinished.")
}
135 changes: 135 additions & 0 deletions tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,155 @@ import (
"fmt"
"sort"
"strings"
"syscall"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/golang/glog"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/tests/pkg/client"
"github.com/pingcap/tidb-operator/tests/pkg/ops"
"github.com/pingcap/tidb-operator/tests/pkg/util"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
)

func (oa *operatorActions) TruncateSSTFileThenCheckFailover(info *TidbClusterConfig, tikvFailoverPeriod time.Duration) error {
const failoverTimeout = 5 * time.Minute

cli := client.Union(oa.kubeCli, oa.cli)
tikvOps := ops.TiKVOps{ClientOps: ops.ClientOps{Client: cli}}

// checkout latest tidb cluster
tc, err := cli.PingcapV1alpha1().TidbClusters(info.Namespace).Get(info.ClusterName, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get the cluster: ns=%s tc=%s err=%s", info.Namespace, info.ClusterName, err.Error())
return err
}
countUpStores := func(tc *v1alpha1.TidbCluster) int {
cnt := 0
for _, s := range tc.Status.TiKV.Stores {
if s.State == v1alpha1.TiKVStateUp {
cnt++
}
}
return cnt
}

origFailures := len(tc.Status.TiKV.FailureStores)
origUps := countUpStores(tc)

// checkout pd config
pdCfg, err := oa.pdControl.GetPDClient(tc).GetConfig()
if err != nil {
glog.Errorf("failed to get the pd config: tc=%s err=%s", info.ClusterName, err.Error())
return err
}
maxStoreDownTime := pdCfg.Schedule.MaxStoreDownTime.Duration
glog.Infof("failover config: maxStoreDownTime=%v tikvFailoverPeriod=%v", maxStoreDownTime, tikvFailoverPeriod)

// find an up store
var store v1alpha1.TiKVStore
for _, v := range tc.Status.TiKV.Stores {
if v.State != v1alpha1.TiKVStateUp {
continue
}
store = v
break
}
if len(store.ID) == 0 {
glog.Errorf("failed to find an up store")
return errors.New("no up store for truncating sst file")
} else {
glog.Infof("target store: id=%s pod=%s", store.ID, store.PodName)
}

// checkout pod status
podBeforeRestart, err := cli.CoreV1().Pods(info.Namespace).Get(store.PodName, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get target pod: pod=%s err=%s", store.PodName, err.Error())
return err
}

var rc int32
if c := util.GetContainerStatusFromPod(podBeforeRestart, func(status corev1.ContainerStatus) bool {
return status.Name == "tikv"
}); c != nil {
rc = c.RestartCount
} else {
glog.Errorf("failed to get container status from tikv pod")
return errors.New("failed to get container status from tikv pod")
}

// restart tikv to ensure sst files
err = tikvOps.KillProcess(info.Namespace, store.PodName, "tikv", 1, syscall.SIGTERM)
if err != nil {
glog.Errorf("kill tikv: pod=%s err=%s", store.PodName, err.Error())
return err
}

err = tikvOps.PollPod(info.Namespace, store.PodName,
func(pod *corev1.Pod, err error) (bool, error) {
if pod == nil {
glog.Warningf("pod is nil: err=%s", err.Error())
return false, nil
}
tikv := util.GetContainerStatusFromPod(pod, func(status corev1.ContainerStatus) bool {
return status.Name == "tikv"
})

if pod.Status.Phase == corev1.PodRunning && tikv != nil && tikv.RestartCount > rc {
return true, nil
}
return false, nil
})
if err != nil {
glog.Errorf("tikv process hasn't been restarted: err=%s", err.Error())
return err
}

// truncate the sst file and wait for failover
err = tikvOps.TruncateSSTFile(ops.TruncateOptions{
Namespace: info.Namespace,
Cluster: info.ClusterName,
Store: store.ID,
})

// make tikv crash
err = tikvOps.KillProcess(info.Namespace, store.PodName, "tikv", 1, syscall.SIGTERM)
if err != nil {
glog.Errorf("kill tikv: pod=%s err=%s", store.PodName, err.Error())
return err
}

tikvOps.SetPoll(DefaultPollInterval, maxStoreDownTime+tikvFailoverPeriod+failoverTimeout)

return tikvOps.PollTiDBCluster(info.Namespace, info.ClusterName,
func(tc *v1alpha1.TidbCluster, err error) (bool, error) {
glog.Infof("check failure stores: current=%d origin=%d", len(tc.Status.TiKV.FailureStores), origFailures)
if len(tc.Status.TiKV.FailureStores) <= origFailures {
return false, nil
}
ups := countUpStores(tc)
glog.Infof("check up stores: current=%d origin=%d", ups, origUps)
if ups < origUps {
return false, nil
}
return true, nil
})
}

func (oa *operatorActions) TruncateSSTFileThenCheckFailoverOrDie(info *TidbClusterConfig, tikvFailoverPeriod time.Duration) {
if err := oa.TruncateSSTFileThenCheckFailover(info, tikvFailoverPeriod); err != nil {
panic(err)
}
}

func (oa *operatorActions) CheckFailoverPending(info *TidbClusterConfig, faultPoint *time.Time) (bool, error) {
tc, err := oa.cli.PingcapV1alpha1().TidbClusters(info.Namespace).Get(info.ClusterName, metav1.GetOptions{})
if err != nil {
Expand Down
47 changes: 47 additions & 0 deletions tests/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package client
import (
"time"

"github.com/juju/errors"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned/typed/pingcap.com/v1alpha1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

func NewCliOrDie() (versioned.Interface, kubernetes.Interface) {
Expand All @@ -27,3 +30,47 @@ func NewCliOrDie() (versioned.Interface, kubernetes.Interface) {

return cli, kubeCli
}

var (
masterUrl string
kubeconfigPath string
)

type Client interface {
kubernetes.Interface
PingcapV1alpha1() v1alpha1.PingcapV1alpha1Interface
}

func Union(kube kubernetes.Interface, tidb versioned.Interface) Client {
return &client{Interface: kube, pingcap: tidb}
}

func NewOrDie() Client {
cfg, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath)
if err != nil {
panic(err)
}
return Union(kubernetes.NewForConfigOrDie(cfg), versioned.NewForConfigOrDie(cfg))
}

type client struct {
kubernetes.Interface
pingcap versioned.Interface
}

func (cli *client) PingcapV1alpha1() v1alpha1.PingcapV1alpha1Interface {
return cli.pingcap.PingcapV1alpha1()
}

func SetConfigPath(path string) {
kubeconfigPath = path
}

func SetMasterURL(url string) {
masterUrl = url
}

func LoadConfig() (*rest.Config, error) {
cfg, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath)
return cfg, errors.Trace(err)
}
81 changes: 81 additions & 0 deletions tests/pkg/ops/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package ops

import (
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
"github.com/pingcap/tidb-operator/tests/pkg/client"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
DefaultPollInterval = 10 * time.Second
DefaultPollTimeout = 10 * time.Minute
)

type PollFn func(time.Duration, time.Duration, wait.ConditionFunc) error

type ClientOps struct {
client.Client

PollInterval *time.Duration
PollTimeout *time.Duration
}

func (cli *ClientOps) pollArgs(cond wait.ConditionFunc) (time.Duration, time.Duration, wait.ConditionFunc) {
interval := DefaultPollInterval
if cli.PollInterval != nil {
interval = *cli.PollInterval
}
timeout := DefaultPollTimeout
if cli.PollTimeout != nil {
timeout = *cli.PollTimeout
}
return interval, timeout, cond
}

func (cli *ClientOps) SetPoll(interval time.Duration, timeout time.Duration) {
cli.PollInterval = &interval
cli.PollTimeout = &timeout
}

func (cli *ClientOps) Poll(cond wait.ConditionFunc) error {
return wait.Poll(cli.pollArgs(cond))
}

func (cli *ClientOps) PollImmediate(cond wait.ConditionFunc) error {
return wait.PollImmediate(cli.pollArgs(cond))
}

func (cli *ClientOps) PollPod(ns string, name string, cond func(po *corev1.Pod, err error) (bool, error)) error {
return cli.Poll(func() (done bool, err error) {
return cond(cli.CoreV1().Pods(ns).Get(name, metav1.GetOptions{}))
})
}

func (cli *ClientOps) PollStatefulSet(ns string, name string, cond func(ss *appsv1.StatefulSet, err error) (bool, error)) error {
return cli.Poll(func() (done bool, err error) {
return cond(cli.AppsV1().StatefulSets(ns).Get(name, metav1.GetOptions{}))
})
}

func (cli *ClientOps) PollTiDBCluster(ns string, name string, cond func(tc *v1alpha1.TidbCluster, err error) (bool, error)) error {
return cli.Poll(func() (done bool, err error) {
return cond(cli.PingcapV1alpha1().TidbClusters(ns).Get(name, metav1.GetOptions{}))
})
}
Loading

0 comments on commit c683ac2

Please sign in to comment.