Skip to content

Commit

Permalink
Reproduce etcd-io#15271
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Feb 13, 2023
1 parent b4dfa9d commit c7f33dc
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 72 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linearizability-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
esac
- name: test-linearizability
run: |
EXPECT_DEBUG=true GO_TEST_FLAGS='-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestLinearizability' RESULTS_DIR=/tmp/linearizability make test-linearizability
EXPECT_DEBUG=true GO_TEST_FLAGS='-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestLinearizability/Snapshot' RESULTS_DIR=/tmp/linearizability make test-linearizability
- uses: actions/upload-artifact@v2
if: always()
with:
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.WatchProcessNotifyInterval != 0 {
args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String())
}
if cfg.SnapshotCatchUpEntries > 0 {
if cfg.SnapshotCatchUpEntries > 0 && cfg.SnapshotCatchUpEntries != 10000 {
args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries))
}
envVars := map[string]string{}
Expand Down
140 changes: 84 additions & 56 deletions tests/linearizability/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
triggerTimeout = 5 * time.Second
triggerTimeout = 20 * time.Second
)

var (
Expand All @@ -54,6 +54,7 @@ var (
CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember}
RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader}
BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{duration: time.Second}
BlackholeUntilSnapshot Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: true}
DelayPeerNetwork Failpoint = delayPeerNetworkFailpoint{duration: time.Second, baseLatency: 75 * time.Millisecond, randomizedLatency: 50 * time.Millisecond}
oneNodeClusterFailpoints = []Failpoint{
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic,
Expand All @@ -77,7 +78,12 @@ var (
RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackholeUntilSnapshot, Follower}
RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackholeUntilSnapshot, Follower}
RandomSnapshotFailpoint Failpoint = randomFailpoint{[]Failpoint{
RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic,
//RaftBeforeApplySnapPanic,
//RaftAfterApplySnapPanic,
//RaftAfterWALReleasePanic,
//RaftBeforeSaveSnapPanic,
//RaftAfterSaveSnapPanic,
BlackholeUntilSnapshot,
}}
)

Expand Down Expand Up @@ -252,62 +258,17 @@ func latestRevisionForEndpoint(ctx context.Context, c *clientv3.Client) (int64,
return resp.Header.Revision, err
}

func triggerBlackholeUntilSnapshot(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error {
leader := clus.Procs[clus.WaitLeader(t)]
lc, err := clientv3.New(clientv3.Config{
Endpoints: []string{leader.Config().ClientURL},
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return err
}
defer lc.Close()

mc, err := clientv3.New(clientv3.Config{
Endpoints: []string{member.Config().ClientURL},
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return err
}
defer mc.Close()

func triggerBlackholeUntilSnapshot(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) (err error) {
proxy := member.PeerProxy()

// Blackholing will cause peers to not be able to use streamWriters registered with member
// but peer traffic is still possible because member has 'pipeline' with peers
// TODO: find a way to stop all traffic
t.Logf("Blackholing traffic from and to member %q", member.Config().Name)
proxy.BlackholeTx()
proxy.BlackholeRx()

for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Have to refresh revBlackholedMem. It can still increase as member processes changes that are received but not yet applied.
revBlackholedMem, err := latestRevisionForEndpoint(ctx, mc)
if err != nil {
return err
}
revLeader, err := latestRevisionForEndpoint(ctx, lc)
if err != nil {
return err
}
t.Logf("Leader: [%s], Member: [%s], revLeader: %d, revBlackholedMem: %d", leader.Config().Name, member.Config().Name, revLeader, revBlackholedMem)
// Blackholed member has to be sufficiently behind to trigger snapshot transfer.
// Need to make sure leader compacted latest revBlackholedMem inside EtcdServer.snapshot.
// That's why we wait for clus.Cfg.SnapshotCount (to trigger snapshot) + clus.Cfg.SnapshotCatchUpEntries (EtcdServer.snapshot compaction offset)
if revLeader-revBlackholedMem > int64(clus.Cfg.SnapshotCount+clus.Cfg.SnapshotCatchUpEntries) {
break
}
time.Sleep(100 * time.Millisecond)
}

err = waitTillSnapshot(ctx, t, clus, member)
t.Logf("Traffic restored from and to member %q", member.Config().Name)
proxy.UnblackholeTx()
proxy.UnblackholeRx()
return nil
Expand Down Expand Up @@ -344,20 +305,87 @@ func (f randomFailpoint) Available(e2e.EtcdProcess) bool {
}

type blackholePeerNetworkFailpoint struct {
duration time.Duration
waitTillSnapshot bool
duration time.Duration
}

func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error {
func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (err error) {
member := clus.Procs[rand.Int()%len(clus.Procs)]
proxy := member.PeerProxy()

// Blackholing will cause peers to not be able to use streamWriters registered with member
// but peer traffic is still possible because member has 'pipeline' with peers
// TODO: find a way to stop all traffic
lg.Info("Blackholing traffic from and to member", zap.String("member", member.Config().Name))
proxy.BlackholeTx()
proxy.BlackholeRx()
lg.Info("Blackholing traffic from and to member", zap.String("member", member.Config().Name))
time.Sleep(f.duration)
if f.waitTillSnapshot {
err = waitTillSnapshot(ctx, t, clus, member)
} else {
time.Sleep(f.duration)
}
lg.Info("Traffic restored from and to member", zap.String("member", member.Config().Name))
proxy.UnblackholeTx()
proxy.UnblackholeRx()
return err
}

func waitTillSnapshot(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess) error {
endpoints := clus.EndpointsV3()
i := 0
for _, endpoint := range endpoints {
if endpoint == member.Config().ClientURL {
continue
}
endpoints[i] = endpoint
i++
}
endpoints = endpoints[:i]
lc, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return err
}
defer lc.Close()

mc, err := clientv3.New(clientv3.Config{
Endpoints: []string{member.Config().ClientURL},
Logger: zap.NewNop(),
DialKeepAliveTime: 1 * time.Millisecond,
DialKeepAliveTimeout: 5 * time.Millisecond,
})
if err != nil {
return err
}
defer mc.Close()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Have to refresh revBlackholedMem. It can still increase as member processes changes that are received but not yet applied.
revBlackholedMem, err := latestRevisionForEndpoint(ctx, mc)
if err != nil {
return err
}
clusterRevision, err := latestRevisionForEndpoint(ctx, lc)
if err != nil {
return err
}
t.Logf("clusterRevision: %d, memberRevision: %d", clusterRevision, revBlackholedMem)
// Blackholed member has to be sufficiently behind to trigger snapshot transfer.
// Need to make sure leader compacted latest revBlackholedMem inside EtcdServer.snapshot.
// That's why we wait for clus.Cfg.SnapshotCount (to trigger snapshot) + clus.Cfg.SnapshotCatchUpEntries (EtcdServer.snapshot compaction offset)
if clusterRevision-revBlackholedMem > int64(clus.Cfg.SnapshotCount+clus.Cfg.SnapshotCatchUpEntries) {
break
}
time.Sleep(100 * time.Millisecond)
}
return nil
}

Expand Down
29 changes: 15 additions & 14 deletions tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ var (
largePutSize: 32769,
leaseTTL: DefaultLeaseTTL,
writes: []requestChance{
{operation: Put, chance: 90},
{operation: LargePut, chance: 5},
{operation: Put, chance: 100},
},
},
}
Expand Down Expand Up @@ -181,8 +180,15 @@ func TestLinearizability(t *testing.T) {
validateWatchResponses(t, watchResponses, watchProgressNotifyEnabled)
longestHistory, remainingEvents := watchEventHistory(watchResponses)
validateEventsMatch(t, longestHistory, remainingEvents)

operations = patchOperationBasedOnWatchEvents(operations, longestHistory)
checkOperationsAndPersistResults(t, lg, operations, clus)
path, err := testResultsDirectory(t)
if err != nil {
t.Error(err)
}
persistWatchResponses(t, lg, path, watchResponses)
persistWatchEvents(t, lg, path, append(remainingEvents, longestHistory))
checkOperationsAndPersistResults(t, lg, operations, clus, path)
})
}
}
Expand Down Expand Up @@ -400,17 +406,12 @@ func validateEventsMatch(t *testing.T, longestHistory []watchEvent, other [][]wa
length := len(other[i])
// We compare prefix of watch events, as we are not guaranteed to collect all events from each node.
if diff := cmp.Diff(longestHistory[:length], other[i][:length], cmpopts.IgnoreFields(watchEvent{}, "Time")); diff != "" {
t.Errorf("Events in watches do not match, %s", diff)
t.Errorf("Events in watches do not match")
}
}
}

func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, clus *e2e.EtcdProcessCluster) {
path, err := testResultsDirectory(t)
if err != nil {
t.Error(err)
}

func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, clus *e2e.EtcdProcessCluster, testResultsPath string) {
linearizable, info := porcupine.CheckOperationsVerbose(model.Etcd, operations, 5*time.Minute)
if linearizable == porcupine.Illegal {
t.Error("Model is not linearizable")
Expand All @@ -419,13 +420,13 @@ func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations [
t.Error("Linearization timed out")
}
if linearizable != porcupine.Ok {
persistOperationHistory(t, lg, path, operations)
persistMemberDataDir(t, lg, clus, path)
persistOperationHistory(t, lg, testResultsPath, operations)
persistMemberDataDir(t, lg, clus, testResultsPath)
}

visualizationPath := filepath.Join(path, "history.html")
visualizationPath := filepath.Join(testResultsPath, "history.html")
lg.Info("Saving visualization", zap.String("path", visualizationPath))
err = porcupine.VisualizePath(model.Etcd, info, visualizationPath)
err := porcupine.VisualizePath(model.Etcd, info, visualizationPath)
if err != nil {
t.Errorf("Failed to visualize, err: %v", err)
}
Expand Down
45 changes: 45 additions & 0 deletions tests/linearizability/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ package linearizability

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -153,3 +157,44 @@ type watchEvent struct {
Revision int64
Time time.Time
}

func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses [][]watchResponse) {
for i, resps := range responses {
watchFilePath := filepath.Join(path, fmt.Sprintf("watch-responses-%d.json", i))
lg.Info("Saving watch responses", zap.String("path", watchFilePath))
file, err := os.OpenFile(watchFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
t.Errorf("Failed to save watch history: %v", err)
return
}
defer file.Close()
encoder := json.NewEncoder(file)
for _, resp := range resps {
err := encoder.Encode(resp)
if err != nil {
t.Errorf("Failed to encode response: %v", err)
}
}
}
}

func persistWatchEvents(t *testing.T, lg *zap.Logger, path string, events [][]watchEvent) {
for i, evs := range events {
eventsFilePath := filepath.Join(path, fmt.Sprintf("watch-events-%d.json", i))
lg.Info("Saving watch events", zap.String("path", eventsFilePath))
file, err := os.OpenFile(eventsFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
t.Errorf("Failed to save watch history: %v", err)
return
}
defer file.Close()
encoder := json.NewEncoder(file)
for _, event := range evs {
event.Time = time.Time{}
err := encoder.Encode(event)
if err != nil {
t.Errorf("Failed to encode response: %v", err)
}
}
}
}

0 comments on commit c7f33dc

Please sign in to comment.