Skip to content

Commit

Permalink
*: Add progress notify request watch request
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbetz committed Jun 22, 2018
1 parent acf75dd commit 1186f46
Show file tree
Hide file tree
Showing 12 changed files with 803 additions and 415 deletions.
11 changes: 10 additions & 1 deletion Documentation/dev-guide/api_reference_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ Empty field.
| ----- | ----------- | ---- |
| cluster_id | cluster_id is the ID of the cluster which sent the response. | uint64 |
| member_id | member_id is the ID of the member which sent the response. | uint64 |
| revision | revision is the key-value store revision when the request was applied. | int64 |
| revision | revision is the key-value store revision when the request was applied. For watch progress responses, the header.revision indicates progress. All future events recieved in this stream are guaranteed to have a higher revision number than the header.revision number. | int64 |
| raft_term | raft_term is the raft term when the request was applied. | uint64 |


Expand Down Expand Up @@ -840,13 +840,22 @@ From google paxosdb paper: Our implementation hinges around a powerful primitive



##### message `WatchProgressRequest` (etcdserver/etcdserverpb/rpc.proto)

Requests the a watch stream progress status be sent in the watch response stream as soon as possible.

Empty field.



##### message `WatchRequest` (etcdserver/etcdserverpb/rpc.proto)

| Field | Description | Type |
| ----- | ----------- | ---- |
| request_union | request_union is a request to either create a new watcher or cancel an existing watcher. | oneof |
| create_request | | WatchCreateRequest |
| cancel_request | | WatchCancelRequest |
| progress_request | | WatchProgressRequest |



Expand Down
9 changes: 8 additions & 1 deletion Documentation/dev-guide/apispec/swagger/rpc.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -2195,7 +2195,7 @@
"format": "uint64"
},
"revision": {
"description": "revision is the key-value store revision when the request was applied.",
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guaranteed to have a higher revision number than the\nheader.revision number.",
"type": "string",
"format": "int64"
}
Expand Down Expand Up @@ -2396,6 +2396,10 @@
}
}
},
"etcdserverpbWatchProgressRequest": {
"description": "Requests the a watch stream progress status be sent in the watch response stream as soon as\npossible.",
"type": "object"
},
"etcdserverpbWatchRequest": {
"type": "object",
"properties": {
Expand All @@ -2404,6 +2408,9 @@
},
"create_request": {
"$ref": "#/definitions/etcdserverpbWatchCreateRequest"
},
"progress_request": {
"$ref": "#/definitions/etcdserverpbWatchProgressRequest"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied."
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
"revision": {
"type": "string",
"format": "int64",
"description": "revision is the key-value store revision when the request was applied."
"description": "revision is the key-value store revision when the request was applied.\nFor watch progress responses, the header.revision indicates progress. All future events\nrecieved in this stream are guaranteed to have a higher revision number than the\nheader.revision number."
},
"raft_term": {
"type": "string",
Expand Down
20 changes: 20 additions & 0 deletions Documentation/dev-guide/interacting_v3.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,26 @@ foo # key
bar_latest # value of foo key after modification
```

## Watch progress

Applications may want to check the progress of a watch to determine how up-to-date the watch stream is. For example, if a watch is used to update a cache, it can be useful to know if the cache is stale compared to the revision from a quorum read.

Progress requests can be issued using the "progress" command in interactive watch session to ask the etcd server to send a progress notify update in the watch stream:

```bash
$ etcdctl watch -i
$ watch a
$ progress
progress notify: 1
# in another terminal: etcdctl put x 0
# in another terminal: etcdctl put y 1
$ progress
progress notify: 3
```

Note: The revision number in the progress notify response is the revision from the local etcd server node that the watch stream is connected to. If this node is partitioned and not part of quorum, this progress notify revision might be lower than
than the revision returned by a quorum read against a non-partitioned etcd server node.

## Compacted revisions

As we mentioned, etcd keeps revisions so that applications can read past versions of keys. However, to avoid accumulating an unbounded amount of history, it is important to compact past revisions. After compacting, etcd removes historical revisions, releasing resources for future use. All superseded data with revisions before the compacted revision will be unavailable.
Expand Down
72 changes: 72 additions & 0 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,78 @@ func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) {
}
}

func TestWatchRequestProgress(t *testing.T) {
testCases := []struct {
name string
watchers []string
}{
{"0-watcher", []string{}},
{"1-watcher", []string{"/"}},
{"2-watcher", []string{"/", "/"}},
}

for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
defer testutil.AfterTest(t)

watchTimeout := 3 * time.Second

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

wc := clus.RandClient()

var watchChans []clientv3.WatchChan

for _, prefix := range c.watchers {
watchChans = append(watchChans, wc.Watch(context.Background(), prefix, clientv3.WithPrefix()))
}

_, err := wc.Put(context.Background(), "/a", "1")
if err != nil {
t.Fatal(err)
}

for _, rch := range watchChans {
select {
case resp := <-rch: // wait for notification
if len(resp.Events) != 1 {
t.Fatalf("resp.Events expected 1, got %d", len(resp.Events))
}
case <-time.After(watchTimeout):
t.Fatalf("watch response expected in %v, but timed out", watchTimeout)
}
}

// put a value not being watched to increment revision
_, err = wc.Put(context.Background(), "x", "1")
if err != nil {
t.Fatal(err)
}

err = wc.RequestProgress(context.Background())
if err != nil {
t.Fatal(err)
}

// verify all watch channels receive a progress notify
for _, rch := range watchChans {
select {
case resp := <-rch:
if !resp.IsProgressNotify() {
t.Fatalf("expected resp.IsProgressNotify() == true")
}
if resp.Header.Revision != 3 {
t.Fatalf("resp.Header.Revision expected 3, got %d", resp.Header.Revision)
}
case <-time.After(watchTimeout):
t.Fatalf("progress response expected in %v, but timed out", watchTimeout)
}
}
})
}
}

func TestWatchEventType(t *testing.T) {
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
Expand Down
123 changes: 102 additions & 21 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type Watcher interface {
// (see https://github.com/coreos/etcd/issues/8980)
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan

// RequestProgress requests a progress notify response be sent in all watch channels.
RequestProgress(ctx context.Context) error

// Close closes the watcher and cancels all watch requests.
Close() error
}
Expand Down Expand Up @@ -156,7 +159,7 @@ type watchGrpcStream struct {
resuming []*watcherStream

// reqc sends a watch request from Watch() to the main goroutine
reqc chan *watchRequest
reqc chan watchStreamRequest
// respc receives data from the watch client
respc chan *pb.WatchResponse
// donec closes to broadcast shutdown
Expand All @@ -174,6 +177,11 @@ type watchGrpcStream struct {
closeErr error
}

// watchStreamRequest is a union of the supported watch request operation types
type watchStreamRequest interface {
toPB() *pb.WatchRequest
}

// watchRequest is issued by the subscriber to start a new watcher
type watchRequest struct {
ctx context.Context
Expand All @@ -198,6 +206,10 @@ type watchRequest struct {
retc chan chan WatchResponse
}

// progressRequest is issued by the subscriber to request watch progress
type progressRequest struct {
}

// watcherStream represents a registered watcher
type watcherStream struct {
// initReq is the request that initiated this request
Expand Down Expand Up @@ -255,7 +267,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
reqc: make(chan watchStreamRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
Expand Down Expand Up @@ -361,6 +373,42 @@ func (w *watcher) Close() (err error) {
return err
}

// RequestProgress requests a progress notify response be sent in all watch channels.
func (w *watcher) RequestProgress(ctx context.Context) (err error) {
ctxKey := streamKeyFromCtx(ctx)

w.mu.Lock()
if w.streams == nil {
return fmt.Errorf("no stream found for context")
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()

pr := &progressRequest{}

select {
case reqc <- pr:
return nil
case <-ctx.Done():
if err == nil {
return ctx.Err()
}
return err
case <-donec:
if wgs.closeErr != nil {
return wgs.closeErr
}
// retry; may have dropped stream from no ctxs
return w.RequestProgress(ctx)
}
}

func (w *watchGrpcStream) close() (err error) {
w.cancel()
<-w.donec
Expand Down Expand Up @@ -468,26 +516,31 @@ func (w *watchGrpcStream) run() {
for {
select {
// Watch() requested
case wreq := <-w.reqc:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
case req := <-w.reqc:
switch wreq := req.(type) {
case *watchRequest:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}

ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)
ws.donec = make(chan struct{})
w.wg.Add(1)
go w.serveSubstream(ws, w.resumec)

// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
}
case *progressRequest:
wc.Send(wreq.toPB())
}

// new events from the watch client
Expand Down Expand Up @@ -614,7 +667,28 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
Canceled: pbresp.Canceled,
cancelReason: pbresp.CancelReason,
}
ws, ok := w.substreams[pbresp.WatchId]

if wr.IsProgressNotify() {
return w.broadcastResponse(wr)
}
return w.unicastResponse(wr, pbresp.WatchId)

}

// broadcastResponse send a watch response to all watch substreams.
func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
for _, ws := range w.substreams {
select {
case ws.recvc <- wr:
case <-ws.donec:
}
}
return true
}

// unicastResponse sends a watch response to a specific watch substream.
func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
ws, ok := w.substreams[watchId]
if !ok {
return false
}
Expand Down Expand Up @@ -888,6 +962,13 @@ func (wr *watchRequest) toPB() *pb.WatchRequest {
return &pb.WatchRequest{RequestUnion: cr}
}

// toPB converts an internal progress request structure to its protobuf WatchRequest structure.
func (pr *progressRequest) toPB() *pb.WatchRequest {
req := &pb.WatchProgressRequest{}
cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
return &pb.WatchRequest{RequestUnion: cr}
}

func streamKeyFromCtx(ctx context.Context) string {
if md, ok := metadata.FromOutgoingContext(ctx); ok {
return fmt.Sprintf("%+v", md)
Expand Down
Loading

0 comments on commit 1186f46

Please sign in to comment.