Skip to content

Commit

Permalink
Cancel watch if cluster not healthy before or after injecting failpoi…
Browse files Browse the repository at this point in the history
…nts.

Signed-off-by: James Blair <mail@jamesblair.net>
  • Loading branch information
jmhbnz authored and serathius committed Apr 4, 2023
1 parent 05e2910 commit 1227754
Showing 1 changed file with 46 additions and 1 deletion.
47 changes: 46 additions & 1 deletion tests/robustness/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"go.uber.org/zap"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/tests/v3/framework/e2e"
Expand Down Expand Up @@ -98,18 +99,62 @@ func triggerFailpoints(ctx context.Context, t *testing.T, lg *zap.Logger, clus *
}
for successes < config.count && failures < config.retries {
time.Sleep(config.waitBetweenTriggers)
lg.Info("Triggering failpoint\n", zap.String("failpoint", config.failpoint.Name()))

lg.Info("Verifying cluster health before failpoint", zap.String("failpoint", config.failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus); err != nil {
t.Errorf("failed to verify cluster health before failpoint injection, err: %v", err)
return
}

lg.Info("Triggering failpoint", zap.String("failpoint", config.failpoint.Name()))
err = config.failpoint.Trigger(ctx, t, lg, clus)
if err != nil {
lg.Info("Failed to trigger failpoint", zap.String("failpoint", config.failpoint.Name()), zap.Error(err))
failures++
continue
}

lg.Info("Verifying cluster health after failpoint", zap.String("failpoint", config.failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus); err != nil {
t.Errorf("failed to verify cluster health after failpoint injection, err: %v", err)
return
}

successes++
}
if successes < config.count || failures >= config.retries {
t.Errorf("failed to trigger failpoints enough times, err: %v", err)
}

return
}

func verifyClusterHealth(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster) error {
for i := 0; i < len(clus.Procs); i++ {
clusterClient, err := clientv3.New(clientv3.Config{
Endpoints: clus.Procs[i].EndpointsGRPC(),
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return fmt.Errorf("Error creating client for cluster %s: %v", clus.Procs[i].Config().Name, err)
}
defer clusterClient.Close()

cli := healthpb.NewHealthClient(clusterClient.ActiveConnection())
resp, err := cli.Check(ctx, &healthpb.HealthCheckRequest{})
if err != nil {
return fmt.Errorf("Error checking member %s health: %v", clus.Procs[i].Config().Name, err)
}
if resp.Status != healthpb.HealthCheckResponse_SERVING {
return fmt.Errorf("Member %s health status expected %s, got %s",
clus.Procs[i].Config().Name,
healthpb.HealthCheckResponse_SERVING,
resp.Status)
}
}
return nil
}

type FailpointConfig struct {
Expand Down

0 comments on commit 1227754

Please sign in to comment.