Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: optimize watch stream multiplexing garbage collection #9797

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,82 @@ func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
}
}

// TestWatchIDs tests subsequent watchers have unique watch IDs.
func TestWatchIDs(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

cli := cluster.RandClient()

numWatches := int64(10)
for id := int64(0); id < numWatches; id++ {
ctx, cancel := context.WithCancel(clientv3.WithRequireLeader(context.TODO()))
ww := cli.Watch(ctx, "a", clientv3.WithCreatedNotify())
wresp := <-ww
cancel()
if wresp.Err() != nil {
t.Fatal(wresp.Err())
}
if id != wresp.ID {
t.Fatalf("expected watch ID %d, got %d", id, wresp.ID)
}
}
}

// TestWatchCancelOnClient tests watch cancel operation from client-side.
func TestWatchCancelOnClient(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)

client := cluster.RandClient()
numWatches := 10

// The grpc proxy starts watches to detect leadership after the proxy server
// returns as started; to avoid racing on the proxy's internal watches, wait
// until require leader watches get create responses to ensure the leadership
// watches have started.
for {
ctx, cancel := context.WithCancel(clientv3.WithRequireLeader(context.TODO()))
ww := client.Watch(ctx, "a", clientv3.WithCreatedNotify())
wresp := <-ww
cancel()
if wresp.Err() == nil {
break
}
}

// gRPC proxy creates a watcher with "__lostleader" suffix
w1, werr1 := cluster.Members[0].GetWatcherTotal()
if werr1 != nil {
t.Fatal(werr1)
}
for i := 0; i < numWatches; i++ {
ctx, cancel := context.WithCancel(context.Background())
w := client.Watch(ctx, fmt.Sprintf("%d", i))

// cancel watch operation from client-side
cancel()

wresp, ok := <-w
if ok {
t.Fatalf("#%d: expected closed channel, got watch channel open %v", i, ok)
}

// TODO: propagate context error before closing channel?
if wresp.Err() != nil {
t.Fatalf("#%d: expected nil error on watch cancellation, got %v", i, wresp.Err())
}
}
w2, werr2 := cluster.Members[0].GetWatcherTotal()
if werr2 != nil {
t.Fatal(werr2)
}

if w1 < w2 {
t.Fatalf("expected watchers to be canceled, got %d != %d", w1, w2)
}
}

// TestWatchCancelOnServer ensures client watcher cancels propagate back to the server.
func TestWatchCancelOnServer(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
Expand Down
15 changes: 13 additions & 2 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type Watcher interface {

type WatchResponse struct {
Header pb.ResponseHeader

// ID is the registered watch ID.
ID int64

Events []*Event

// CompactRevision is the minimum revision the watcher may receive.
Expand Down Expand Up @@ -403,8 +407,10 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
}
// close subscriber's channel
if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr})
go w.sendCloseSubstream(ws, &WatchResponse{ID: ws.id, closeErr: w.closeErr})
} else if ws.outc != nil {
// TODO: propagate context errors to client?
// ws.outc <- WatchResponse{ID: ws.id, Canceled: true, closeErr: ws.initReq.ctx.Err()}
close(ws.outc)
}
if ws.id != -1 {
Expand Down Expand Up @@ -571,6 +577,11 @@ func (w *watchGrpcStream) run() {
return

case ws := <-w.closingc:
// request to garbage collect cancelled watcher in watch storage backend
wc.Send(&pb.WatchRequest{
RequestUnion: &pb.WatchRequest_CancelRequest{
CancelRequest: &pb.WatchCancelRequest{WatchId: ws.id},
}})
w.closeSubstream(ws)
delete(closing, ws)
// no more watchers on this stream, shutdown
Expand Down Expand Up @@ -599,9 +610,9 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
for i, ev := range pbresp.Events {
events[i] = (*Event)(ev)
}
// TODO: return watch ID?
wr := &WatchResponse{
Header: *pbresp.Header,
ID: pbresp.WatchId,
Events: events,
CompactRevision: pbresp.CompactRevision,
Created: pbresp.Created,
Expand Down
10 changes: 10 additions & 0 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1125,6 +1126,15 @@ func (m *member) Metric(metricName string) (string, error) {
return "", nil
}

// GetWatcherTotal gets the number of watchers.
func (m *member) GetWatcherTotal() (int64, error) {
ws, err := m.Metric("etcd_debugging_mvcc_watcher_total")
if err != nil {
return -1, err
}
return strconv.ParseInt(ws, 10, 64)
}

// InjectPartition drops connections from m to others, vice versa.
func (m *member) InjectPartition(t *testing.T, others ...*member) {
for _, other := range others {
Expand Down
47 changes: 46 additions & 1 deletion integration/v3_double_barrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package integration

import (
"fmt"
"os"
"testing"
"time"

Expand All @@ -24,71 +26,114 @@ import (

func TestDoubleBarrier(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

os.Setenv("CLUSTER_DEBUG", "1")
defer func() {
os.Unsetenv("CLUSTER_DEBUG")
clus.Terminate(t)
}()

waiters := 10
fmt.Println("concurrency.NewSession 1")
session, err := concurrency.NewSession(clus.RandClient())
fmt.Println("concurrency.NewSession 2", err)
if err != nil {
t.Error(err)
}
defer session.Orphan()

fmt.Println("NewDoubleBarrier 1")
b := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
fmt.Println("NewDoubleBarrier 2")
donec := make(chan struct{})
for i := 0; i < waiters-1; i++ {
go func() {
fmt.Println("concurrency.NewSession 3")
session, err := concurrency.NewSession(clus.RandClient())
fmt.Println("concurrency.NewSession 4")
if err != nil {
t.Error(err)
}
defer session.Orphan()

fmt.Println("NewDoubleBarrier 3")
bb := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
fmt.Println("NewDoubleBarrier 4")

fmt.Println("Enter 1")
if err := bb.Enter(); err != nil {
fmt.Println("Enter 2", err)
t.Fatalf("could not enter on barrier (%v)", err)
}
fmt.Println("Enter 3")
donec <- struct{}{}
fmt.Println("Enter 4")
fmt.Println("Lease 1")
if err := bb.Leave(); err != nil {
fmt.Println("Lease 2", err)
t.Fatalf("could not leave on barrier (%v)", err)
}
fmt.Println("Lease 3")
donec <- struct{}{}
fmt.Println("Lease 4")
}()
}

fmt.Println("<-donec 1")
time.Sleep(10 * time.Millisecond)
select {
case <-donec:
fmt.Println("<-donec 2")
t.Fatalf("barrier did not enter-wait")
default:
fmt.Println("<-donec 3")
}
fmt.Println("<-donec 4")

fmt.Println("Enter 10")
if err := b.Enter(); err != nil {
fmt.Println("Enter 11", err)
t.Fatalf("could not enter last barrier (%v)", err)
}
fmt.Println("Enter 12")

timerC := time.After(time.Duration(waiters*100) * time.Millisecond)
for i := 0; i < waiters-1; i++ {
fmt.Println("waiters 1", i)
select {
case <-timerC:
fmt.Println("waiters 2", i)
t.Fatalf("barrier enter timed out")
case <-donec:
fmt.Println("waiters 3", i)
}
}

fmt.Println("donec 10-1")
time.Sleep(10 * time.Millisecond)
select {
case <-donec:
fmt.Println("donec 10-2")
t.Fatalf("barrier did not leave-wait")
default:
fmt.Println("donec 10-3")
}
fmt.Println("donec 10-4")

fmt.Println("Leave 1")
b.Leave()
fmt.Println("Leave 2")

fmt.Println("waiter 100-1")
timerC = time.After(time.Duration(waiters*100) * time.Millisecond)
for i := 0; i < waiters-1; i++ {
fmt.Println("waiter 100-2", i)
select {
case <-timerC:
fmt.Println("waiter 100-3", i)
t.Fatalf("barrier leave timed out")
case <-donec:
fmt.Println("waiter 100-4", i)
}
}
}
Expand Down
Loading