diff --git a/clientv3/watch.go b/clientv3/watch.go index 0aaefdc2f6b1..8f4d8f23af8c 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -881,7 +881,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{ } } else { // current progress of watch; <= store revision - nextRev = wr.Header.Revision + nextRev = wr.Header.Revision + 1 } if len(wr.Events) > 0 { diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 4ad7b2fcb82c..4d8935d710f2 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver/api/v3rpc" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" @@ -877,6 +879,103 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { } } +// TestV3WatchProgressOnMemberRestart verifies the client side doesn't +// receive duplicated events. +// Refer to https://github.com/etcd-io/etcd/pull/15248#issuecomment-1423225742. +func TestV3WatchProgressOnMemberRestart(t *testing.T) { + defer testutil.AfterTest(t) + + clus := NewClusterV3(t, &ClusterConfig{ + Size: 3, + WatchProgressNotifyInterval: time.Second, + }) + defer clus.Terminate(t) + + client := clus.RandClient() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + errC := make(chan error, 1) + doneC := make(chan struct{}, 1) + progressNotifyC := make(chan struct{}, 1) + go func() { + defer close(doneC) + + var ( + lastWatchedModRevision int64 + gotProgressNotification bool + ) + + wch := client.Watch(ctx, "foo", clientv3.WithProgressNotify()) + for wr := range wch { + if wr.Err() != nil { + errC <- fmt.Errorf("watch error: %w", wr.Err()) + return + } + + if wr.IsProgressNotify() { + // We need to make sure at least one progress notification + // is received after receiving the normal watch response + // and before restarting the member. + if lastWatchedModRevision > 0 { + gotProgressNotification = true + progressNotifyC <- struct{}{} + } + continue + } + + if len(wr.Events) == 0 { + continue + } + + for _, event := range wr.Events { + if event.Kv.ModRevision <= lastWatchedModRevision { + errC <- fmt.Errorf("got an unexpected revision: %d, lastWatchedModRevision: %d", + event.Kv.ModRevision, + lastWatchedModRevision) + return + } + lastWatchedModRevision = event.Kv.ModRevision + } + + if gotProgressNotification { + return + } + } + }() + + // write the key before the member restarts + t.Log("Writing key 'foo'") + _, err := client.Put(ctx, "foo", "bar1") + require.NoError(t, err) + + // make sure at least one progress notification is received + // before restarting the member + t.Log("Waiting for the progress notification") + <-progressNotifyC + + // restart the member + t.Log("Restarting the member") + clus.Members[0].Stop(t) + clus.Members[0].Restart(t) + clus.WaitLeader(t) + + // write the same key again after the member restarted + t.Log("Writing the same key 'foo' again after restarting the member") + _, err = client.Put(ctx, "foo", "bar2") + require.NoError(t, err) + + t.Log("Waiting for result") + select { + case err := <-errC: + t.Fatal(err) + case <-doneC: + t.Log("Done") + case <-time.After(20 * time.Second): + t.Fatal("Timed out waiting for the response") + } +} + func TestV3WatchMultipleStreamsSynced(t *testing.T) { defer testutil.AfterTest(t) testV3WatchMultipleStreams(t, 0)