Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stability: add sst-file-corruption case #382

Merged
merged 6 commits into from
Apr 15, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion tests/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ type OperatorActions interface {
CreateSecret(info *TidbClusterConfig) error
GetPodUIDMap(info *TidbClusterConfig) (map[string]types.UID, error)
GetNodeMap(info *TidbClusterConfig, component string) (map[string][]string, error)
TruncateSSTFileThenCheckFailover(info *TidbClusterConfig, tikvFailoverPeriod time.Duration) error
TruncateSSTFileThenCheckFailoverOrDie(info *TidbClusterConfig, tikvFailoverPeriod time.Duration)
CheckFailoverPending(info *TidbClusterConfig, faultPoint *time.Time) (bool, error)
CheckFailoverPendingOrDie(clusters []*TidbClusterConfig, faultPoint *time.Time)
CheckFailover(info *TidbClusterConfig, faultNode string) (bool, error)
Expand Down Expand Up @@ -1331,7 +1333,7 @@ func cloneOperatorRepo() error {
cmd := fmt.Sprintf("git clone https://github.com/pingcap/tidb-operator.git /tidb-operator")
glog.Info(cmd)
res, err := exec.Command("/bin/sh", "-c", cmd).CombinedOutput()
if err != nil {
if err != nil && !strings.Contains(string(res), "already exists") {
return fmt.Errorf("failed to clone tidb-operator repository: %v, %s", err, string(res))
}

Expand Down
3 changes: 3 additions & 0 deletions tests/cmd/stability/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func main() {
// restore
backup.NewBackupCase(oa, clusterBackupFrom, clusterRestoreTo).RunOrDie()

// truncate a sst file and check failover
oa.TruncateSSTFileThenCheckFailoverOrDie(cluster1, 30*time.Minute)

// stop a node and failover automatically
fta := tests.NewFaultTriggerAction(cli, kubeCli, conf)
physicalNode, node, faultTime := fta.StopNodeOrDie()
Expand Down
120 changes: 120 additions & 0 deletions tests/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,140 @@ import (
"fmt"
"sort"
"strings"
"syscall"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/golang/glog"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap.com/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/tests/pkg/client"
"github.com/pingcap/tidb-operator/tests/pkg/util"
unioncli "github.com/pingcap/tidb-operator/tests/pkg/util/client"
"github.com/pingcap/tidb-operator/tests/pkg/util/ops"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
)

func (oa *operatorActions) TruncateSSTFileThenCheckFailover(info *TidbClusterConfig, tikvFailoverPeriod time.Duration) error {
const failoverTimeout = 5 * time.Minute

cli := unioncli.Union(oa.kubeCli, oa.cli)
tikvOps := ops.TiKVOps{ClientOps: unioncli.ClientOps{Client: cli}}

// checkout latest tidb cluster
tc, err := cli.PingcapV1alpha1().TidbClusters(info.Namespace).Get(info.ClusterName, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get the cluster: ns=%s tc=%s err=%s", info.Namespace, info.ClusterName, err.Error())
return err
}
// checkout pd config
pdCfg, err := oa.pdControl.GetPDClient(tc).GetConfig()
if err != nil {
glog.Errorf("failed to get the pd config: tc=%s err=%s", info.ClusterName, err.Error())
return err
}
maxStoreDownTime := pdCfg.Schedule.MaxStoreDownTime.Duration

// find an up store
var store v1alpha1.TiKVStore
for _, v := range tc.Status.TiKV.Stores {
if v.State != v1alpha1.TiKVStateUp {
continue
}
store = v
break
}
if len(store.ID) == 0 {
glog.Errorf("failed to find an up store")
return errors.New("no up store for truncating sst file")
} else {
glog.Infof("target store: id=%s pod=%s", store.ID, store.PodName)
}

// checkout pod status
podBeforeRestart, err := cli.CoreV1().Pods(info.Namespace).Get(store.PodName, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get target pod: pod=%s err=%s", store.PodName, err.Error())
return err
}

var rc int32
if c := util.GetContainerStatusFromPod(podBeforeRestart, func(status corev1.ContainerStatus) bool {
return status.Name == "tikv"
}); c != nil {
rc = c.RestartCount
} else {
glog.Errorf("failed to get container status from tikv pod")
return errors.New("failed to get container status from tikv pod")
}

// restart tikv to ensure sst files
err = tikvOps.KillProcess(info.Namespace, store.PodName, "tikv", 1, syscall.SIGTERM)
if err != nil {
glog.Errorf("kill tikv: pod=%s err=%s", store.PodName, err.Error())
return err
}

err = tikvOps.WaitForPod(info.Namespace, store.PodName,
func(pod *corev1.Pod, err error) (bool, error) {
if pod == nil {
glog.Warningf("pod is nil: err=%s", err.Error())
return false, nil
}
tikv := util.GetContainerStatusFromPod(pod, func(status corev1.ContainerStatus) bool {
return status.Name == "tikv"
})

if pod.Status.Phase == corev1.PodRunning && tikv != nil && tikv.RestartCount > rc {
return true, nil
}
return false, nil
})
if err != nil {
glog.Errorf("tikv process hasn't been restarted: err=%s", err.Error())
return err
}

// truncate the sst file and wait for failover
return tikvOps.TruncateSSTFile(ops.TruncateOptions{
Namespace: info.Namespace,
Cluster: info.ClusterName,
Store: store.ID,
}, func(sst string) error {
tc, err := cli.PingcapV1alpha1().TidbClusters(info.Namespace).Get(info.ClusterName, metav1.GetOptions{})
if err != nil {
return err
}
failures := len(tc.Status.TiKV.FailureStores)

err = tikvOps.KillProcess(info.Namespace, store.PodName, "tikv", 1, syscall.SIGTERM)
if err != nil {
glog.Errorf("kill tikv: pod=%s err=%s", store.PodName, err.Error())
return err
}

return tikvOps.WaitForTiDBCluster(info.Namespace, info.ClusterName,
func(tc *v1alpha1.TidbCluster, err error) (bool, error) {
glog.Infof("check failure stores: current=%d before=%d", len(tc.Status.TiKV.FailureStores), failures)
if len(tc.Status.TiKV.FailureStores) <= failures {
return false, nil
}
return true, nil
}, unioncli.Poll(DefaultPollInterval, maxStoreDownTime+tikvFailoverPeriod+failoverTimeout))
})
}

func (oa *operatorActions) TruncateSSTFileThenCheckFailoverOrDie(info *TidbClusterConfig, tikvFailoverPeriod time.Duration) {
if err := oa.TruncateSSTFileThenCheckFailover(info, tikvFailoverPeriod); err != nil {
panic(err)
}
}

func (oa *operatorActions) CheckFailoverPending(info *TidbClusterConfig, faultPoint *time.Time) (bool, error) {
tc, err := oa.cli.PingcapV1alpha1().TidbClusters(info.Namespace).Get(info.ClusterName, metav1.GetOptions{})
if err != nil {
Expand Down
70 changes: 70 additions & 0 deletions tests/pkg/util/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2019 PingCAP, Inc.
zyguan marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package client

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned"
"github.com/pingcap/tidb-operator/pkg/client/clientset/versioned/typed/pingcap.com/v1alpha1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

var (
masterUrl string
kubeconfigPath string
)

type Client interface {
kubernetes.Interface
PingcapV1alpha1() v1alpha1.PingcapV1alpha1Interface
}

func Union(kube kubernetes.Interface, tidb versioned.Interface) Client {
return &client{Interface: kube, pingcap: tidb}
}

func NewOrDie() Client {
cfg, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath)
if err != nil {
panic(err)
}
return Union(kubernetes.NewForConfigOrDie(cfg), versioned.NewForConfigOrDie(cfg))
}

type client struct {
kubernetes.Interface
pingcap versioned.Interface
}

func (cli *client) PingcapV1alpha1() v1alpha1.PingcapV1alpha1Interface {
return cli.pingcap.PingcapV1alpha1()
}

func SetConfigPath(path string) {
kubeconfigPath = path
}

func SetMasterURL(url string) {
masterUrl = url
}

func LoadConfig() (*rest.Config, error) {
cfg, err := clientcmd.BuildConfigFromFlags(masterUrl, kubeconfigPath)
return cfg, errors.Trace(err)
}

type ClientOps struct {
Client
}
117 changes: 117 additions & 0 deletions tests/pkg/util/client/exec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package client

import (
"bytes"
"io"
"net/url"
"strconv"
"strings"
"syscall"

"github.com/golang/glog"
"github.com/pingcap/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)

// ExecOptions passed to ExecWithOptions
type ExecOptions struct {
Command []string

Namespace string
PodName string
ContainerName string

Stdin io.Reader
CaptureStdout bool
CaptureStderr bool
// If false, whitespace in std{err,out} will be removed.
PreserveWhitespace bool
}

// ExecWithOptions executes a command in the specified container,
// returning stdout, stderr and error. `options` allowed for
// additional parameters to be passed.
func (cli *ClientOps) ExecWithOptions(options ExecOptions) (string, string, error) {
glog.Infof("ExecWithOptions %+v", options)

config, err := LoadConfig()
if err != nil {
return "", "", err
}

const tty = false

req := cli.CoreV1().RESTClient().Post().
Resource("pods").
Name(options.PodName).
Namespace(options.Namespace).
SubResource("exec").
Param("container", options.ContainerName)
req.VersionedParams(&corev1.PodExecOptions{
Container: options.ContainerName,
Command: options.Command,
Stdin: options.Stdin != nil,
Stdout: options.CaptureStdout,
Stderr: options.CaptureStderr,
TTY: tty,
}, codec)

var stdout, stderr bytes.Buffer
err = execute("POST", req.URL(), config, options.Stdin, &stdout, &stderr, tty)

if options.PreserveWhitespace {
return stdout.String(), stderr.String(), err
}
return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), err
}

func (cli *ClientOps) KillProcess(ns string, pod string, container string, pid int, sig syscall.Signal) error {
_, _, err := cli.ExecWithOptions(ExecOptions{
Command: []string{"kill", "-" + strconv.Itoa(int(sig)), strconv.Itoa(pid)},
Namespace: ns,
PodName: pod,
ContainerName: container,
CaptureStderr: true,
CaptureStdout: true,
})
return err
}

func execute(method string, url *url.URL, config *rest.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
exec, err := remotecommand.NewSPDYExecutor(config, method, url)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(exec.Stream(remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
}))
}

var (
scheme *runtime.Scheme
codec runtime.ParameterCodec
)

func init() {
scheme = runtime.NewScheme()
corev1.AddToScheme(scheme)
codec = runtime.NewParameterCodec(scheme)
}
Loading