diff --git a/server/storage/mvcc/watchable_store.go b/server/storage/mvcc/watchable_store.go index adf07f7755b..f0d056f286d 100644 --- a/server/storage/mvcc/watchable_store.go +++ b/server/storage/mvcc/watchable_store.go @@ -448,12 +448,15 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { pendingEventsGauge.Add(float64(len(eb.evs))) } else { // move slow watcher to victims - w.minRev = rev + 1 w.victim = true victim[w] = eb s.synced.delete(w) slowWatcherGauge.Inc() } + // always update minRev + // in case 'send' returns true and watcher stays synced, this is needed for Restore when all watchers become unsynced + // in case 'send' returns false, this is needed for syncWatchers + w.minRev = rev + 1 } s.addVictim(victim) } diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index a98106bcae8..86e35697f32 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -296,34 +296,22 @@ func TestWatchRestore(t *testing.T) { testKey := []byte("foo") testValue := []byte("bar") - rev := s.Put(testKey, testValue, lease.NoLease) - - newBackend, _ := betesting.NewDefaultTmpBackend(t) - newStore := newWatchableStore(zaptest.NewLogger(t), newBackend, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(newStore, newBackend) - - w := newStore.NewWatchStream() + w := s.NewWatchStream() defer w.Close() - - w.Watch(0, testKey, nil, rev-1) + w.Watch(0, testKey, nil, 1) time.Sleep(delay) + wantRev := s.Put(testKey, testValue, lease.NoLease) - newStore.Restore(b) - select { - case resp := <-w.Chan(): - if resp.Revision != rev { - t.Fatalf("rev = %d, want %d", resp.Revision, rev) - } - if len(resp.Events) != 1 { - t.Fatalf("failed to get events from the response") - } - if resp.Events[0].Kv.ModRevision != rev { - t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev) - } - case <-time.After(time.Second): - t.Fatal("failed to receive event in 1 second.") + s.Restore(b) + events := readEventsForSecond(w.Chan()) + if len(events) != 1 { + t.Errorf("Expected only one event, got %d", len(events)) + } + if events[0].Kv.ModRevision != wantRev { + t.Errorf("Expected revision to match, got %d, want %d", events[0].Kv.ModRevision, wantRev) } + } } @@ -331,6 +319,17 @@ func TestWatchRestore(t *testing.T) { t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration } +func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) { + for { + select { + case resp := <-ws: + events = append(events, resp.Events...) + case <-time.After(time.Second): + return events + } + } +} + // TestWatchRestoreSyncedWatcher tests such a case that: // 1. watcher is created with a future revision "math.MaxInt64 - 2" // 2. watcher with a future revision is added to "synced" watcher group diff --git a/tests/robustness/failpoints.go b/tests/robustness/failpoints.go index 255837ba804..1f72ad5248e 100644 --- a/tests/robustness/failpoints.go +++ b/tests/robustness/failpoints.go @@ -29,7 +29,7 @@ import ( ) const ( - triggerTimeout = 5 * time.Second + triggerTimeout = 10 * time.Second ) var ( @@ -53,7 +53,8 @@ var ( CompactBeforeCommitBatchPanic Failpoint = goPanicFailpoint{"compactBeforeCommitBatch", triggerCompact, AnyMember} CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember} RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader} - BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{} + BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: false} + BlackholeUntilSnapshot Failpoint = blackholePeerNetworkFailpoint{waitTillSnapshot: true} DelayPeerNetwork Failpoint = delayPeerNetworkFailpoint{duration: time.Second, baseLatency: 75 * time.Millisecond, randomizedLatency: 50 * time.Millisecond} oneNodeClusterFailpoints = []Failpoint{ KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, @@ -78,6 +79,7 @@ var ( RaftAfterSaveSnapPanic Failpoint = goPanicFailpoint{"raftAfterSaveSnap", triggerBlackholeUntilSnapshot, Follower} RandomSnapshotFailpoint Failpoint = randomFailpoint{[]Failpoint{ RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, + BlackholeUntilSnapshot, }} ) @@ -308,11 +310,13 @@ func (f randomFailpoint) Available(e2e.EtcdProcess) bool { return true } -type blackholePeerNetworkFailpoint struct{} +type blackholePeerNetworkFailpoint struct { + waitTillSnapshot bool +} func (f blackholePeerNetworkFailpoint) Trigger(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster) error { member := clus.Procs[rand.Int()%len(clus.Procs)] - return triggerBlackhole(t, ctx, member, clus, false) + return triggerBlackhole(t, ctx, member, clus, f.waitTillSnapshot) } func triggerBlackhole(t *testing.T, ctx context.Context, member e2e.EtcdProcess, clus *e2e.EtcdProcessCluster, shouldWaitTillSnapshot bool) error {