From 91b056968d0968a8f150a1dbbfcadab204c3e61e Mon Sep 17 00:00:00 2001 From: Bogdan Kanivets Date: Mon, 9 Jan 2023 10:58:38 -0800 Subject: [PATCH] tests linearizability: trigger snapshot related failpoints Signed-off-by: Bogdan Kanivets --- tests/linearizability/failpoints.go | 98 +++++++++++++++++-- tests/linearizability/linearizability_test.go | 13 +++ 2 files changed, 101 insertions(+), 10 deletions(-) diff --git a/tests/linearizability/failpoints.go b/tests/linearizability/failpoints.go index 0a76e382ed9..675e18b164b 100644 --- a/tests/linearizability/failpoints.go +++ b/tests/linearizability/failpoints.go @@ -29,7 +29,7 @@ import ( ) const ( - triggerTimeout = 2 * time.Second + triggerTimeout = 5 * time.Second ) var ( @@ -68,13 +68,16 @@ var ( BlackholePeerNetwork, DelayPeerNetwork, }} + RaftBeforeApplySnapPanic Failpoint = goPanicFailpoint{"raftBeforeApplySnap", triggerBlackholeUntilSnapshot, Follower} + RaftAfterApplySnapPanic Failpoint = goPanicFailpoint{"raftAfterApplySnap", triggerBlackholeUntilSnapshot, Follower} + RaftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", triggerBlackholeUntilSnapshot, Follower} + RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackholeUntilSnapshot, Follower} + RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackholeUntilSnapshot, Follower} + RandomSnapshotFailpoint Failpoint = randomFailpoint{[]Failpoint{ + RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, + }} // TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint - raftBeforeApplySnapPanic Failpoint = goPanicFailpoint{"raftBeforeApplySnap", nil, AnyMember} - raftAfterApplySnapPanic Failpoint = goPanicFailpoint{"raftAfterApplySnap", nil, AnyMember} - raftAfterWALReleasePanic Failpoint = goPanicFailpoint{"raftAfterWALRelease", nil, AnyMember} raftBeforeFollowerSendPanic Failpoint = goPanicFailpoint{"raftBeforeFollowerSend", nil, AnyMember} - raftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", nil, AnyMember} - raftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", nil, AnyMember} ) type Failpoint interface { @@ -119,7 +122,7 @@ func (f killFailpoint) Available(e2e.EtcdProcess) bool { type goPanicFailpoint struct { failpoint string - trigger func(ctx context.Context, member e2e.EtcdProcess) error + trigger func(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error target failpointTarget } @@ -128,6 +131,7 @@ type failpointTarget string const ( AnyMember failpointTarget = "AnyMember" Leader failpointTarget = "Leader" + Follower failpointTarget = "Follower" ) func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { @@ -148,7 +152,7 @@ func (f goPanicFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Log } if f.trigger != nil { lg.Info("Triggering gofailpoint", zap.String("failpoint", f.Name())) - err = f.trigger(triggerCtx, member) + err = f.trigger(t, triggerCtx, member, clus) if err != nil { lg.Info("gofailpoint trigger failed", zap.String("failpoint", f.Name()), zap.Error(err)) } @@ -175,6 +179,8 @@ func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster) return clus.Procs[rand.Int()%len(clus.Procs)] case Leader: return clus.Procs[clus.WaitLeader(t)] + case Follower: + return clus.Procs[(clus.WaitLeader(t)+1)%len(clus.Procs)] default: panic("unknown target") } @@ -194,7 +200,7 @@ func (f goPanicFailpoint) Name() string { return f.failpoint } -func triggerDefrag(ctx context.Context, member e2e.EtcdProcess) error { +func triggerDefrag(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error { cc, err := clientv3.New(clientv3.Config{ Endpoints: member.EndpointsV3(), Logger: zap.NewNop(), @@ -212,7 +218,7 @@ func triggerDefrag(ctx context.Context, member e2e.EtcdProcess) error { return nil } -func triggerCompact(ctx context.Context, member e2e.EtcdProcess) error { +func triggerCompact(_ *testing.T, ctx context.Context, member e2e.EtcdProcess, _ *e2e.EtcdProcessCluster) error { cc, err := clientv3.New(clientv3.Config{ Endpoints: member.EndpointsV3(), Logger: zap.NewNop(), @@ -234,6 +240,78 @@ func triggerCompact(ctx context.Context, member e2e.EtcdProcess) error { return nil } +// latestRevisionForEndpoint gets latest revision of the first endpoint in Client.Endpoints list +func latestRevisionForEndpoint(ctx context.Context, c *clientv3.Client) (int64, error) { + cntx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + resp, err := c.Status(cntx, c.Endpoints()[0]) + if err != nil { + return 0, err + } + return resp.Header.Revision, err +} + +func triggerBlackholeUntilSnapshot(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error { + leader := clus.Procs[clus.WaitLeader(t)] + lc, err := clientv3.New(clientv3.Config{ + Endpoints: []string{leader.Config().ClientURL}, + Logger: zap.NewNop(), + DialKeepAliveTime: 1 * time.Millisecond, + DialKeepAliveTimeout: 5 * time.Millisecond, + }) + if err != nil { + return err + } + defer lc.Close() + + mc, err := clientv3.New(clientv3.Config{ + Endpoints: []string{member.Config().ClientURL}, + Logger: zap.NewNop(), + DialKeepAliveTime: 1 * time.Millisecond, + DialKeepAliveTimeout: 5 * time.Millisecond, + }) + if err != nil { + return err + } + defer mc.Close() + + proxy := member.PeerProxy() + // Blackholing will cause peers to not be able to use streamWriters registered with member + // but peer traffic is still possible because member has 'pipeline' with peers + // TODO: find a way to stop all traffic + proxy.BlackholeTx() + proxy.BlackholeRx() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Have to refresh revBlackholedMem. It can still increase as member processes changes that are received but not yet applied. + revBlackholedMem, err := latestRevisionForEndpoint(ctx, mc) + if err != nil { + return err + } + revLeader, err := latestRevisionForEndpoint(ctx, lc) + if err != nil { + return err + } + t.Logf("Leader: [%s], Member: [%s], revLeader: %d, revBlackholedMem: %d", leader.Config().Name, member.Config().Name, revLeader, revBlackholedMem) + // Blackholed member has to be sufficiently behind to trigger snapshot transfer. + // Need to make sure leader compacted latest revBlackholedMem inside EtcdServer.snapshot. + // That's why we wait for clus.Cfg.SnapshotCount (to trigger snapshot) + clus.Cfg.SnapshotCatchUpEntries (EtcdServer.snapshot compaction offset) + if revLeader-revBlackholedMem > int64(clus.Cfg.SnapshotCount+clus.Cfg.SnapshotCatchUpEntries) { + break + } + time.Sleep(100 * time.Millisecond) + } + + proxy.UnblackholeTx() + proxy.UnblackholeRx() + return nil +} + type randomFailpoint struct { failpoints []Failpoint } diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 92866020dca..7844461d69e 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -142,6 +142,19 @@ func TestLinearizability(t *testing.T) { e2e.WithSnapshotCount(100), ), }, + // TODO: investigate periodic `Model is not linearizable` failures + // see https://github.com/etcd-io/etcd/pull/15104#issuecomment-1416371288 + /*{ + name: "Snapshot", + failpoint: RandomSnapshotFailpoint, + traffic: &HighTraffic, + config: *e2e.NewConfig( + e2e.WithGoFailEnabled(true), + e2e.WithSnapshotCount(100), + e2e.WithSnapshotCatchUpEntries(100), + e2e.WithPeerProxy(true), + ), + },*/ }...) for _, scenario := range scenarios { if scenario.traffic == nil {