diff --git a/.github/workflows/linearizability-template.yaml b/.github/workflows/linearizability-template.yaml index 5f97a0010c85..5540fba3a5f5 100644 --- a/.github/workflows/linearizability-template.yaml +++ b/.github/workflows/linearizability-template.yaml @@ -45,7 +45,7 @@ jobs: esac - name: test-linearizability run: | - EXPECT_DEBUG=true GO_TEST_FLAGS='-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestLinearizability' RESULTS_DIR=/tmp/linearizability make test-linearizability + EXPECT_DEBUG=true GO_TEST_FLAGS='-v --count ${{ inputs.count }} --timeout ${{ inputs.testTimeout }} --failfast --run TestLinearizability/Snapshot' RESULTS_DIR=/tmp/linearizability make test-linearizability - uses: actions/upload-artifact@v2 if: always() with: diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 442921e60295..83503999c8df 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -581,7 +581,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in if cfg.WatchProcessNotifyInterval != 0 { args = append(args, "--experimental-watch-progress-notify-interval", cfg.WatchProcessNotifyInterval.String()) } - if cfg.SnapshotCatchUpEntries > 0 { + if cfg.SnapshotCatchUpEntries > 0 && cfg.SnapshotCatchUpEntries != 10000 { args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries)) } envVars := map[string]string{} diff --git a/tests/linearizability/failpoints.go b/tests/linearizability/failpoints.go index 1792c173e7e8..a2217d6380da 100644 --- a/tests/linearizability/failpoints.go +++ b/tests/linearizability/failpoints.go @@ -29,7 +29,7 @@ import ( ) const ( - triggerTimeout = 5 * time.Second + triggerTimeout = 20 * time.Second ) var ( @@ -54,6 +54,7 @@ var ( CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember} RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader} BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{duration: time.Second} + BlackholeUntilSnapshot Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: true} DelayPeerNetwork Failpoint = delayPeerNetworkFailpoint{duration: time.Second, baseLatency: 75 * time.Millisecond, randomizedLatency: 50 * time.Millisecond} oneNodeClusterFailpoints = []Failpoint{ KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, @@ -77,7 +78,12 @@ var ( RaftBeforeSaveSnapPanic Failpoint = goPanicFailpoint{"raftBeforeSaveSnap", triggerBlackholeUntilSnapshot, Follower} RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackholeUntilSnapshot, Follower} RandomSnapshotFailpoint Failpoint = randomFailpoint{[]Failpoint{ - RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, + //RaftBeforeApplySnapPanic, + //RaftAfterApplySnapPanic, + //RaftAfterWALReleasePanic, + //RaftBeforeSaveSnapPanic, + //RaftAfterSaveSnapPanic, + BlackholeUntilSnapshot, }} ) @@ -252,62 +258,17 @@ func latestRevisionForEndpoint(ctx context.Context, c *clientv3.Client) (int64, return resp.Header.Revision, err } -func triggerBlackholeUntilSnapshot(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) error { - leader := clus.Procs[clus.WaitLeader(t)] - lc, err := clientv3.New(clientv3.Config{ - Endpoints: []string{leader.Config().ClientURL}, - Logger: zap.NewNop(), - DialKeepAliveTime: 1 * time.Millisecond, - DialKeepAliveTimeout: 5 * time.Millisecond, - }) - if err != nil { - return err - } - defer lc.Close() - - mc, err := clientv3.New(clientv3.Config{ - Endpoints: []string{member.Config().ClientURL}, - Logger: zap.NewNop(), - DialKeepAliveTime: 1 * time.Millisecond, - DialKeepAliveTimeout: 5 * time.Millisecond, - }) - if err != nil { - return err - } - defer mc.Close() - +func triggerBlackholeUntilSnapshot(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster) (err error) { proxy := member.PeerProxy() + // Blackholing will cause peers to not be able to use streamWriters registered with member // but peer traffic is still possible because member has 'pipeline' with peers // TODO: find a way to stop all traffic + t.Logf("Blackholing traffic from and to member %q", member.Config().Name) proxy.BlackholeTx() proxy.BlackholeRx() - - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - // Have to refresh revBlackholedMem. It can still increase as member processes changes that are received but not yet applied. - revBlackholedMem, err := latestRevisionForEndpoint(ctx, mc) - if err != nil { - return err - } - revLeader, err := latestRevisionForEndpoint(ctx, lc) - if err != nil { - return err - } - t.Logf("Leader: [%s], Member: [%s], revLeader: %d, revBlackholedMem: %d", leader.Config().Name, member.Config().Name, revLeader, revBlackholedMem) - // Blackholed member has to be sufficiently behind to trigger snapshot transfer. - // Need to make sure leader compacted latest revBlackholedMem inside EtcdServer.snapshot. - // That's why we wait for clus.Cfg.SnapshotCount (to trigger snapshot) + clus.Cfg.SnapshotCatchUpEntries (EtcdServer.snapshot compaction offset) - if revLeader-revBlackholedMem > int64(clus.Cfg.SnapshotCount+clus.Cfg.SnapshotCatchUpEntries) { - break - } - time.Sleep(100 * time.Millisecond) - } - + err = waitTillSnapshot(ctx, t, clus, member) + t.Logf("Traffic restored from and to member %q", member.Config().Name) proxy.UnblackholeTx() proxy.UnblackholeRx() return nil @@ -344,20 +305,87 @@ func (f randomFailpoint) Available(e2e.EtcdProcess) bool { } type blackholePeerNetworkFailpoint struct { - duration time.Duration + waitTillSnapshot bool + duration time.Duration } -func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { +func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) (err error) { member := clus.Procs[rand.Int()%len(clus.Procs)] proxy := member.PeerProxy() + // Blackholing will cause peers to not be able to use streamWriters registered with member + // but peer traffic is still possible because member has 'pipeline' with peers + // TODO: find a way to stop all traffic + lg.Info("Blackholing traffic from and to member", zap.String("member", member.Config().Name)) proxy.BlackholeTx() proxy.BlackholeRx() - lg.Info("Blackholing traffic from and to member", zap.String("member", member.Config().Name)) - time.Sleep(f.duration) + if f.waitTillSnapshot { + err = waitTillSnapshot(ctx, t, clus, member) + } else { + time.Sleep(f.duration) + } lg.Info("Traffic restored from and to member", zap.String("member", member.Config().Name)) proxy.UnblackholeTx() proxy.UnblackholeRx() + return err +} + +func waitTillSnapshot(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessCluster, member e2e.EtcdProcess) error { + endpoints := clus.EndpointsV3() + i := 0 + for _, endpoint := range endpoints { + if endpoint == member.Config().ClientURL { + continue + } + endpoints[i] = endpoint + i++ + } + endpoints = endpoints[:i] + lc, err := clientv3.New(clientv3.Config{ + Endpoints: endpoints, + Logger: zap.NewNop(), + DialKeepAliveTime: 1 * time.Millisecond, + DialKeepAliveTimeout: 5 * time.Millisecond, + }) + if err != nil { + return err + } + defer lc.Close() + + mc, err := clientv3.New(clientv3.Config{ + Endpoints: []string{member.Config().ClientURL}, + Logger: zap.NewNop(), + DialKeepAliveTime: 1 * time.Millisecond, + DialKeepAliveTimeout: 5 * time.Millisecond, + }) + if err != nil { + return err + } + defer mc.Close() + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Have to refresh revBlackholedMem. It can still increase as member processes changes that are received but not yet applied. + revBlackholedMem, err := latestRevisionForEndpoint(ctx, mc) + if err != nil { + return err + } + clusterRevision, err := latestRevisionForEndpoint(ctx, lc) + if err != nil { + return err + } + t.Logf("clusterRevision: %d, memberRevision: %d", clusterRevision, revBlackholedMem) + // Blackholed member has to be sufficiently behind to trigger snapshot transfer. + // Need to make sure leader compacted latest revBlackholedMem inside EtcdServer.snapshot. + // That's why we wait for clus.Cfg.SnapshotCount (to trigger snapshot) + clus.Cfg.SnapshotCatchUpEntries (EtcdServer.snapshot compaction offset) + if clusterRevision-revBlackholedMem > int64(clus.Cfg.SnapshotCount+clus.Cfg.SnapshotCatchUpEntries) { + break + } + time.Sleep(100 * time.Millisecond) + } return nil } diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index aa2791542a19..5e1e87ce79c9 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -73,8 +73,7 @@ var ( largePutSize: 32769, leaseTTL: DefaultLeaseTTL, writes: []requestChance{ - {operation: Put, chance: 90}, - {operation: LargePut, chance: 5}, + {operation: Put, chance: 100}, }, }, } @@ -181,8 +180,15 @@ func TestLinearizability(t *testing.T) { validateWatchResponses(t, watchResponses, watchProgressNotifyEnabled) longestHistory, remainingEvents := watchEventHistory(watchResponses) validateEventsMatch(t, longestHistory, remainingEvents) + operations = patchOperationBasedOnWatchEvents(operations, longestHistory) - checkOperationsAndPersistResults(t, lg, operations, clus) + path, err := testResultsDirectory(t) + if err != nil { + t.Error(err) + } + persistWatchResponses(t, lg, path, watchResponses) + persistWatchEvents(t, lg, path, append(remainingEvents, longestHistory)) + checkOperationsAndPersistResults(t, lg, operations, clus, path) }) } } @@ -400,17 +406,12 @@ func validateEventsMatch(t *testing.T, longestHistory []watchEvent, other [][]wa length := len(other[i]) // We compare prefix of watch events, as we are not guaranteed to collect all events from each node. if diff := cmp.Diff(longestHistory[:length], other[i][:length], cmpopts.IgnoreFields(watchEvent{}, "Time")); diff != "" { - t.Errorf("Events in watches do not match, %s", diff) + t.Errorf("Events in watches do not match") } } } -func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, clus *e2e.EtcdProcessCluster) { - path, err := testResultsDirectory(t) - if err != nil { - t.Error(err) - } - +func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations []porcupine.Operation, clus *e2e.EtcdProcessCluster, testResultsPath string) { linearizable, info := porcupine.CheckOperationsVerbose(model.Etcd, operations, 5*time.Minute) if linearizable == porcupine.Illegal { t.Error("Model is not linearizable") @@ -419,13 +420,13 @@ func checkOperationsAndPersistResults(t *testing.T, lg *zap.Logger, operations [ t.Error("Linearization timed out") } if linearizable != porcupine.Ok { - persistOperationHistory(t, lg, path, operations) - persistMemberDataDir(t, lg, clus, path) + persistOperationHistory(t, lg, testResultsPath, operations) + persistMemberDataDir(t, lg, clus, testResultsPath) } - visualizationPath := filepath.Join(path, "history.html") + visualizationPath := filepath.Join(testResultsPath, "history.html") lg.Info("Saving visualization", zap.String("path", visualizationPath)) - err = porcupine.VisualizePath(model.Etcd, info, visualizationPath) + err := porcupine.VisualizePath(model.Etcd, info, visualizationPath) if err != nil { t.Errorf("Failed to visualize, err: %v", err) } diff --git a/tests/linearizability/watch.go b/tests/linearizability/watch.go index 4466a4c3ab98..2675b087a67c 100644 --- a/tests/linearizability/watch.go +++ b/tests/linearizability/watch.go @@ -16,6 +16,10 @@ package linearizability import ( "context" + "encoding/json" + "fmt" + "os" + "path/filepath" "sync" "testing" "time" @@ -153,3 +157,44 @@ type watchEvent struct { Revision int64 Time time.Time } + +func persistWatchResponses(t *testing.T, lg *zap.Logger, path string, responses [][]watchResponse) { + for i, resps := range responses { + watchFilePath := filepath.Join(path, fmt.Sprintf("watch-responses-%d.json", i)) + lg.Info("Saving watch responses", zap.String("path", watchFilePath)) + file, err := os.OpenFile(watchFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) + if err != nil { + t.Errorf("Failed to save watch history: %v", err) + return + } + defer file.Close() + encoder := json.NewEncoder(file) + for _, resp := range resps { + err := encoder.Encode(resp) + if err != nil { + t.Errorf("Failed to encode response: %v", err) + } + } + } +} + +func persistWatchEvents(t *testing.T, lg *zap.Logger, path string, events [][]watchEvent) { + for i, evs := range events { + eventsFilePath := filepath.Join(path, fmt.Sprintf("watch-events-%d.json", i)) + lg.Info("Saving watch events", zap.String("path", eventsFilePath)) + file, err := os.OpenFile(eventsFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755) + if err != nil { + t.Errorf("Failed to save watch history: %v", err) + return + } + defer file.Close() + encoder := json.NewEncoder(file) + for _, event := range evs { + event.Time = time.Time{} + err := encoder.Encode(event) + if err != nil { + t.Errorf("Failed to encode response: %v", err) + } + } + } +}