Skip to content

Commit

Permalink
clientv3: correct the nextRev on receving progress notification response
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Wang <wachao@vmware.com>
  • Loading branch information
ahrtr committed Feb 10, 2023
1 parent d32dceb commit 5232955
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 1 deletion.
2 changes: 1 addition & 1 deletion clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
}
} else {
// current progress of watch; <= store revision
nextRev = wr.Header.Revision
nextRev = wr.Header.Revision + 1
}

if len(wr.Events) > 0 {
Expand Down
99 changes: 99 additions & 0 deletions integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
Expand Down Expand Up @@ -877,6 +879,103 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) {
}
}

// TestV3WatchProgressOnMemberRestart verifies the client side doesn't
// receive duplicated events.
// Refer to https://github.com/etcd-io/etcd/pull/15248#issuecomment-1423225742.
func TestV3WatchProgressOnMemberRestart(t *testing.T) {
defer testutil.AfterTest(t)

clus := NewClusterV3(t, &ClusterConfig{
Size: 3,
WatchProgressNotifyInterval: time.Second,
})
defer clus.Terminate(t)

client := clus.RandClient()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

errC := make(chan error, 1)
doneC := make(chan struct{}, 1)
progressNotifyC := make(chan struct{}, 1)
go func() {
defer close(doneC)

var (
lastWatchedModRevision int64
gotProgressNotification bool
)

wch := client.Watch(ctx, "foo", clientv3.WithProgressNotify())
for wr := range wch {
if wr.Err() != nil {
errC <- fmt.Errorf("watch error: %w", wr.Err())
return
}

if wr.IsProgressNotify() {
// We need to make sure at least one progress notification
// is received after receiving the normal watch response
// and before restarting the member.
if lastWatchedModRevision > 0 {
gotProgressNotification = true
progressNotifyC <- struct{}{}
}
continue
}

if len(wr.Events) == 0 {
continue
}

for _, event := range wr.Events {
if event.Kv.ModRevision <= lastWatchedModRevision {
errC <- fmt.Errorf("got an unexpected revision: %d, lastWatchedModRevision: %d",
event.Kv.ModRevision,
lastWatchedModRevision)
return
}
lastWatchedModRevision = event.Kv.ModRevision
}

if gotProgressNotification {
return
}
}
}()

// write the key before the member restarts
t.Log("Writing key 'foo'")
_, err := client.Put(ctx, "foo", "bar1")
require.NoError(t, err)

// make sure at least one progress notification is received
// before restarting the member
t.Log("Waiting for the progress notification")
<-progressNotifyC

// restart the member
t.Log("Restarting the member")
clus.Members[0].Stop(t)
clus.Members[0].Restart(t)
clus.WaitLeader(t)

// write the same key again after the member restarted
t.Log("Writing the same key 'foo' again after restarting the member")
_, err = client.Put(ctx, "foo", "bar2")
require.NoError(t, err)

t.Log("Waiting for result")
select {
case err := <-errC:
t.Fatal(err)
case <-doneC:
t.Log("Done")
case <-time.After(20 * time.Second):
t.Fatal("Timed out waiting for the response")
}
}

func TestV3WatchMultipleStreamsSynced(t *testing.T) {
defer testutil.AfterTest(t)
testV3WatchMultipleStreams(t, 0)
Expand Down

0 comments on commit 5232955

Please sign in to comment.