From 51e44103ba0e0dc006aa4ad9cf0051f5137f8e7f Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 1 Jun 2018 10:19:44 -0700 Subject: [PATCH 1/7] mvcc: add watch multiplexing benchmarks $ go test -v -run XXX -bench BenchmarkWatchableStoreMultiplexWatchPut BenchmarkWatchableStoreMultiplexWatchPutSynced-4 100000 12602 ns/op 50038 B/op 11 allocs/op BenchmarkWatchableStoreMultiplexWatchPutUnsynced-4 50000 25189 ns/op 50171 B/op 11 allocs/op Signed-off-by: Gyuho Lee --- mvcc/watchable_store_bench_test.go | 47 ++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index 08010d3abaf..38646279563 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -113,6 +113,53 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { } } +func BenchmarkWatchableStoreMultiplexWatchPutSynced(b *testing.B) { + benchmarkWatchableStoreMultiplexWatchPut(b, true) +} + +func BenchmarkWatchableStoreMultiplexWatchPutUnsynced(b *testing.B) { + benchmarkWatchableStoreMultiplexWatchPut(b, false) +} + +// benchmarkWatchableStoreMultiplexWatchPut tests watch event creation +// and notification when multiple watchers are multiplexed over one single stream. +func benchmarkWatchableStoreMultiplexWatchPut(b *testing.B, synced bool) { + be, tmpPath := backend.NewDefaultTmpBackend() + s := newWatchableStore(zap.NewExample(), be, &lease.FakeLessor{}, nil) + defer cleanup(s, be, tmpPath) + + k := []byte("testkey") + v := []byte("testval") + + rev := int64(0) + if !synced { + // non-0 value to keep watchers in unsynced + rev = 1 + } + + streams := make([]WatchStream, b.N) + defer func() { + for _, w := range streams { + w.Close() + } + }() + + b.ResetTimer() + b.ReportAllocs() + + // each called when a client requests watcher + for i := range streams { + streams[i] = s.NewWatchStream() + streams[i].Watch(0, k, nil, rev) + } + + // trigger watchers + s.Put(k, v, lease.NoLease) + for _, w := range streams { + <-w.Chan() + } +} + // Benchmarks on cancel function performance for unsynced watchers // in a WatchableStore. It creates k*N watchers to populate unsynced // with a reasonably large number of watchers. And measures the time it From 5736e5d8a9fb1defc03ce90b4c56dad00757c192 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 30 May 2018 11:39:28 -0700 Subject: [PATCH 2/7] mvcc: allocate global watch IDs Currently, multiplexed watchers always have WatchID "0" within its watcher pool, and never getting reclaimed on cancellation. This is preparatory work for better resource utilization. Test results show that this makes synced watcher cancellation faster. And no regression on watch stream multiplexing performance. Test result 1: benchmark old ns/op new ns/op delta BenchmarkIndexCompact1-4 213 214 +0.47% BenchmarkIndexCompact100-4 13544 13496 -0.35% BenchmarkIndexCompact10000-4 732862 738492 +0.77% BenchmarkIndexCompact100000-4 23537303 25035956 +6.37% BenchmarkIndexCompact1000000-4 292751386 297354406 +1.57% BenchmarkStorePut-4 5353 5043 -5.79% BenchmarkStoreRangeKey1-4 857 863 +0.70% BenchmarkStoreRangeKey100-4 73652 73469 -0.25% BenchmarkConsistentIndex-4 3.11 3.10 -0.32% BenchmarkStorePutUpdate-4 3828 3839 +0.29% BenchmarkStoreTxnPut-4 5425 4968 -8.42% BenchmarkStoreRestoreRevs1-4 4015 4075 +1.49% BenchmarkStoreRestoreRevs10-4 7731 7753 +0.28% BenchmarkStoreRestoreRevs20-4 10940 10092 -7.75% BenchmarkWatchableStorePut-4 5385 5011 -6.95% BenchmarkWatchableStoreTxnPut-4 6695 5627 -15.95% BenchmarkWatchableStoreWatchPutSync-4 2928 2844 -2.87% BenchmarkWatchableStoreWatchPutUnsync-4 7058 9042 +28.11% BenchmarkWatchableStoreMultiplexWatchPutSynced-4 14390 15299 +6.32% BenchmarkWatchableStoreMultiplexWatchPutUnsynced-4 24573 21944 -10.70% BenchmarkWatchableStoreUnsyncedCancel-4 954 1001 +4.93% BenchmarkWatchableStoreSyncedCancel-4 0.84 0.47 -44.05% BenchmarkKVWatcherMemoryUsage-4 1548 1752 +13.18% benchmark old allocs new allocs delta BenchmarkStoreRangeKey1-4 12 12 +0.00% BenchmarkStoreRangeKey100-4 713 713 +0.00% BenchmarkConsistentIndex-4 0 0 +0.00% BenchmarkStoreTxnPut-4 15 15 +0.00% BenchmarkStoreRestoreRevs1-4 6 6 +0.00% BenchmarkStoreRestoreRevs10-4 37 37 +0.00% BenchmarkStoreRestoreRevs20-4 68 68 +0.00% BenchmarkWatchableStorePut-4 15 15 +0.00% BenchmarkWatchableStoreTxnPut-4 17 17 +0.00% BenchmarkWatchableStoreWatchPutSync-4 2 2 +0.00% BenchmarkWatchableStoreWatchPutUnsync-4 2 2 +0.00% BenchmarkWatchableStoreMultiplexWatchPutSynced-4 11 12 +9.09% BenchmarkWatchableStoreMultiplexWatchPutUnsynced-4 11 12 +9.09% BenchmarkWatchableStoreUnsyncedCancel-4 0 0 +0.00% BenchmarkWatchableStoreSyncedCancel-4 0 0 +0.00% BenchmarkKVWatcherMemoryUsage-4 8 8 +0.00% benchmark old bytes new bytes delta BenchmarkStoreRangeKey1-4 544 544 +0.00% BenchmarkStoreRangeKey100-4 40029 40029 +0.00% BenchmarkConsistentIndex-4 0 0 +0.00% BenchmarkStoreTxnPut-4 1483 1479 -0.27% BenchmarkStoreRestoreRevs1-4 586 586 +0.00% BenchmarkStoreRestoreRevs10-4 4731 4730 -0.02% BenchmarkStoreRestoreRevs20-4 9311 9311 +0.00% BenchmarkWatchableStorePut-4 1528 1539 +0.72% BenchmarkWatchableStoreTxnPut-4 1536 1533 -0.20% BenchmarkWatchableStoreWatchPutSync-4 252 251 -0.40% BenchmarkWatchableStoreWatchPutUnsync-4 272 315 +15.81% BenchmarkWatchableStoreMultiplexWatchPutSynced-4 50038 49990 -0.10% BenchmarkWatchableStoreMultiplexWatchPutUnsynced-4 50172 50133 -0.08% BenchmarkWatchableStoreUnsyncedCancel-4 0 0 +0.00% BenchmarkWatchableStoreSyncedCancel-4 0 0 +0.00% BenchmarkKVWatcherMemoryUsage-4 670 720 +7.46% Test result 2: benchmark old ns/op new ns/op delta BenchmarkIndexCompact1-4 208 207 -0.48% BenchmarkIndexCompact100-4 13776 13474 -2.19% BenchmarkIndexCompact10000-4 787606 737522 -6.36% BenchmarkIndexCompact100000-4 24437827 22737532 -6.96% BenchmarkIndexCompact1000000-4 295421494 298144196 +0.92% BenchmarkStorePut-4 5027 5357 +6.56% BenchmarkStoreRangeKey1-4 864 854 -1.16% BenchmarkStoreRangeKey100-4 73460 73495 +0.05% BenchmarkConsistentIndex-4 3.11 3.41 +9.65% BenchmarkStorePutUpdate-4 3874 3799 -1.94% BenchmarkStoreTxnPut-4 5065 5362 +5.86% BenchmarkStoreRestoreRevs1-4 4012 4019 +0.17% BenchmarkStoreRestoreRevs10-4 7915 7845 -0.88% BenchmarkStoreRestoreRevs20-4 10944 10627 -2.90% BenchmarkWatchableStorePut-4 5326 5366 +0.75% BenchmarkWatchableStoreTxnPut-4 5419 5551 +2.44% BenchmarkWatchableStoreWatchPutSync-4 2439 2755 +12.96% BenchmarkWatchableStoreWatchPutUnsync-4 9839 5919 -39.84% BenchmarkWatchableStoreMultiplexWatchPutSynced-4 13722 14325 +4.39% BenchmarkWatchableStoreMultiplexWatchPutUnsynced-4 21755 21383 -1.71% BenchmarkWatchableStoreUnsyncedCancel-4 924 1066 +15.37% BenchmarkWatchableStoreSyncedCancel-4 0.83 0.47 -43.37% BenchmarkKVWatcherMemoryUsage-4 1547 1714 +10.80% benchmark old allocs new allocs delta BenchmarkStoreRangeKey1-4 12 12 +0.00% BenchmarkStoreRangeKey100-4 713 713 +0.00% BenchmarkConsistentIndex-4 0 0 +0.00% BenchmarkStoreTxnPut-4 15 15 +0.00% BenchmarkStoreRestoreRevs1-4 6 6 +0.00% BenchmarkStoreRestoreRevs10-4 37 37 +0.00% BenchmarkStoreRestoreRevs20-4 68 68 +0.00% BenchmarkWatchableStorePut-4 15 15 +0.00% BenchmarkWatchableStoreTxnPut-4 17 17 +0.00% BenchmarkWatchableStoreWatchPutSync-4 2 2 +0.00% BenchmarkWatchableStoreWatchPutUnsync-4 2 2 +0.00% BenchmarkWatchableStoreMultiplexWatchPutSynced-4 11 12 +9.09% BenchmarkWatchableStoreMultiplexWatchPutUnsynced-4 11 12 +9.09% BenchmarkWatchableStoreUnsyncedCancel-4 0 0 +0.00% BenchmarkWatchableStoreSyncedCancel-4 0 0 +0.00% BenchmarkKVWatcherMemoryUsage-4 8 8 +0.00% benchmark old bytes new bytes delta BenchmarkStoreRangeKey1-4 544 544 +0.00% BenchmarkStoreRangeKey100-4 40029 40029 +0.00% BenchmarkConsistentIndex-4 0 0 +0.00% BenchmarkStoreTxnPut-4 1485 1476 -0.61% BenchmarkStoreRestoreRevs1-4 585 586 +0.17% BenchmarkStoreRestoreRevs10-4 4730 4731 +0.02% BenchmarkStoreRestoreRevs20-4 9311 9311 +0.00% BenchmarkWatchableStorePut-4 1528 1528 +0.00% BenchmarkWatchableStoreTxnPut-4 1557 1538 -1.22% BenchmarkWatchableStoreWatchPutSync-4 251 251 +0.00% BenchmarkWatchableStoreWatchPutUnsync-4 317 276 -12.93% BenchmarkWatchableStoreMultiplexWatchPutSynced-4 50038 49990 -0.10% BenchmarkWatchableStoreMultiplexWatchPutUnsynced-4 50183 50129 -0.11% BenchmarkWatchableStoreUnsyncedCancel-4 0 0 +0.00% BenchmarkWatchableStoreSyncedCancel-4 0 0 +0.00% BenchmarkKVWatcherMemoryUsage-4 670 720 +7.46% Signed-off-by: Gyuho Lee --- mvcc/watchable_store.go | 85 +++++++++++++++++++++++++++--- mvcc/watchable_store_bench_test.go | 4 ++ mvcc/watchable_store_test.go | 7 +++ mvcc/watcher.go | 60 ++++++--------------- mvcc/watcher_test.go | 21 ++++++-- 5 files changed, 121 insertions(+), 56 deletions(-) diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index f04fe994320..f7d974c7bbd 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -21,6 +21,7 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" + "go.uber.org/zap" ) @@ -37,9 +38,10 @@ var ( ) type watchable interface { - watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) - progress(w *watcher) + watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (WatchID, error) + progress(id WatchID) rev() int64 + cancelByID(id WatchID) (bool, error) } type watchableStore struct { @@ -60,6 +62,12 @@ type watchableStore struct { // The key of the map is the key that the watcher watches on. synced watcherGroup + // nextID is the ID pre-allocated for next new watcher + // should be unique globally + nextID WatchID + watchers map[WatchID]*watcher + cancels map[WatchID]cancelFunc + stopc chan struct{} wg sync.WaitGroup } @@ -78,6 +86,9 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig Co victimc: make(chan struct{}, 1), unsynced: newWatcherGroup(), synced: newWatcherGroup(), + nextID: 0, + watchers: make(map[WatchID]*watcher), + cancels: make(map[WatchID]cancelFunc), stopc: make(chan struct{}), } s.store.ReadView = &readView{s} @@ -103,12 +114,11 @@ func (s *watchableStore) NewWatchStream() WatchStream { return &watchStream{ watchable: s, ch: make(chan WatchResponse, chanBufLen), - cancels: make(map[WatchID]cancelFunc), - watchers: make(map[WatchID]*watcher), + watchers: make(map[WatchID]struct{}), } } -func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) { +func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (WatchID, error) { wa := &watcher{ key: key, end: end, @@ -120,6 +130,26 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c s.mu.Lock() s.revMu.RLock() + + if id == AutoWatchID { + advanced := false + for !advanced { + if _, ok := s.watchers[s.nextID]; !ok { + // good to use "nextID" + advanced = true + } else { + s.nextID++ + } + } + id = s.nextID + s.nextID++ + } else if _, ok := s.watchers[id]; ok { + s.revMu.RUnlock() + s.mu.Unlock() + return -1, ErrWatcherDuplicateID + } + wa.id = id + synced := startRev > s.store.currentRev || startRev == 0 if synced { wa.minRev = s.store.currentRev + 1 @@ -133,12 +163,18 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c slowWatcherGauge.Inc() s.unsynced.add(wa) } + + s.watchers[id] = wa + s.cancels[id] = func() { + s.cancelWatcher(wa) + } + s.revMu.RUnlock() s.mu.Unlock() watcherGauge.Inc() - return wa, func() { s.cancelWatcher(wa) } + return wa.id, nil } // cancelWatcher removes references of the watcher from the watchableStore @@ -184,6 +220,37 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { s.mu.Unlock() } +// cancelByID cancels a watcher by its ID. +// It returns 'true' if watcher had existed and been deleted. +func (s *watchableStore) cancelByID(id WatchID) (bool, error) { + s.mu.RLock() + w, wok := s.watchers[id] + if !wok { + s.mu.RUnlock() + return false, ErrWatcherNotExist + } + cancel, cok := s.cancels[id] + if !cok { + s.mu.RUnlock() + return false, ErrWatcherNotExist + } + s.mu.RUnlock() + + cancel() + + s.mu.Lock() + // The watch isn't removed until cancel so that if Close() is called, + // it will wait for the cancel. Otherwise, Close() could close the + // watch channel while the store is still posting events. + if ww := s.watchers[id]; ww == w { + delete(s.watchers, id) + delete(s.cancels, id) + } + s.mu.Unlock() + + return true, nil +} + func (s *watchableStore) Restore(b backend.Backend) error { s.mu.Lock() defer s.mu.Unlock() @@ -477,10 +544,14 @@ func (s *watchableStore) addVictim(victim watcherBatch) { func (s *watchableStore) rev() int64 { return s.store.Rev() } -func (s *watchableStore) progress(w *watcher) { +func (s *watchableStore) progress(id WatchID) { s.mu.RLock() defer s.mu.RUnlock() + w, ok := s.watchers[id] + if !ok { + return + } if _, ok := s.synced.watchers[w]; ok { w.send(WatchResponse{WatchID: w.id, Revision: s.rev()}) // If the ch is full, this watcher is receiving events. diff --git a/mvcc/watchable_store_bench_test.go b/mvcc/watchable_store_bench_test.go index 38646279563..cb2f7df85e7 100644 --- a/mvcc/watchable_store_bench_test.go +++ b/mvcc/watchable_store_bench_test.go @@ -182,6 +182,10 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. synced: newWatcherGroup(), + + nextID: 0, + watchers: make(map[WatchID]*watcher), + cancels: make(map[WatchID]cancelFunc), } defer func() { diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index a929e995c25..5654066fd74 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -91,6 +91,10 @@ func TestCancelUnsynced(t *testing.T) { // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. synced: newWatcherGroup(), + + nextID: 0, + watchers: make(map[WatchID]*watcher), + cancels: make(map[WatchID]cancelFunc), } defer func() { @@ -143,6 +147,9 @@ func TestSyncWatchers(t *testing.T) { store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil), unsynced: newWatcherGroup(), synced: newWatcherGroup(), + nextID: 0, + watchers: make(map[WatchID]*watcher), + cancels: make(map[WatchID]cancelFunc), } defer func() { diff --git a/mvcc/watcher.go b/mvcc/watcher.go index 886b87d5a47..68a72fc587d 100644 --- a/mvcc/watcher.go +++ b/mvcc/watcher.go @@ -96,12 +96,9 @@ type watchStream struct { watchable watchable ch chan WatchResponse - mu sync.Mutex // guards fields below it - // nextID is the ID pre-allocated for next new watcher in this stream - nextID WatchID + mu sync.Mutex // guards fields below it closed bool - cancels map[WatchID]cancelFunc - watchers map[WatchID]*watcher + watchers map[WatchID]struct{} } // Watch creates a new watcher in the stream and returns its WatchID. @@ -118,20 +115,11 @@ func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs .. return -1, ErrEmptyWatcherRange } - if id == AutoWatchID { - for ws.watchers[ws.nextID] != nil { - ws.nextID++ - } - id = ws.nextID - ws.nextID++ - } else if _, ok := ws.watchers[id]; ok { - return -1, ErrWatcherDuplicateID + id, werr := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...) + if werr != nil { + return -1, werr } - - w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...) - - ws.cancels[id] = c - ws.watchers[id] = w + ws.watchers[id] = struct{}{} return id, nil } @@ -141,26 +129,13 @@ func (ws *watchStream) Chan() <-chan WatchResponse { func (ws *watchStream) Cancel(id WatchID) error { ws.mu.Lock() - cancel, ok := ws.cancels[id] - w := ws.watchers[id] - ok = ok && !ws.closed + dok, err := ws.watchable.cancelByID(id) + dok = dok && !ws.closed + delete(ws.watchers, id) ws.mu.Unlock() - - if !ok { - return ErrWatcherNotExist - } - cancel() - - ws.mu.Lock() - // The watch isn't removed until cancel so that if Close() is called, - // it will wait for the cancel. Otherwise, Close() could close the - // watch channel while the store is still posting events. - if ww := ws.watchers[id]; ww == w { - delete(ws.cancels, id) - delete(ws.watchers, id) + if !dok { + return err } - ws.mu.Unlock() - return nil } @@ -168,9 +143,10 @@ func (ws *watchStream) Close() { ws.mu.Lock() defer ws.mu.Unlock() - for _, cancel := range ws.cancels { - cancel() + for id := range ws.watchers { + ws.watchable.cancelByID(id) } + ws.watchers = make(map[WatchID]struct{}) ws.closed = true close(ws.ch) watchStreamGauge.Dec() @@ -183,11 +159,5 @@ func (ws *watchStream) Rev() int64 { } func (ws *watchStream) RequestProgress(id WatchID) { - ws.mu.Lock() - w, ok := ws.watchers[id] - ws.mu.Unlock() - if !ok { - return - } - ws.watchable.progress(w) + ws.watchable.progress(id) } diff --git a/mvcc/watcher_test.go b/mvcc/watcher_test.go index 948158424ee..7ed4bee46e8 100644 --- a/mvcc/watcher_test.go +++ b/mvcc/watcher_test.go @@ -25,6 +25,7 @@ import ( "github.com/coreos/etcd/lease" "github.com/coreos/etcd/mvcc/backend" "github.com/coreos/etcd/mvcc/mvccpb" + "go.uber.org/zap" ) @@ -32,7 +33,8 @@ import ( // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) + st := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := WatchableKV(st) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -42,6 +44,12 @@ func TestWatcherWatchID(t *testing.T) { for i := 0; i < 10; i++ { id, _ := w.Watch(0, []byte("foo"), nil, 0) + + // expect 1 watch ID + if _, ok := st.watchers[id]; !ok || len(st.watchers) != 1 { + t.Errorf("#%d: expected watch ID %d creation, but cannot find", i, id) + } + if _, ok := idm[id]; ok { t.Errorf("#%d: id %d exists", i, id) } @@ -64,6 +72,7 @@ func TestWatcherWatchID(t *testing.T) { // unsynced watchers for i := 10; i < 20; i++ { id, _ := w.Watch(0, []byte("foo2"), nil, 1) + if _, ok := idm[id]; ok { t.Errorf("#%d: id %d exists", i, id) } @@ -99,7 +108,7 @@ func TestWatcherRequestsCustomID(t *testing.T) { }{ {1, 1, nil}, {1, 0, ErrWatcherDuplicateID}, - {0, 0, nil}, + {0, 0, nil}, // let server generate watch ID {0, 2, nil}, } @@ -252,7 +261,8 @@ func TestWatchDeleteRange(t *testing.T) { // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { b, tmpPath := backend.NewDefaultTmpBackend() - s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil)) + st := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, nil) + s := WatchableKV(st) defer cleanup(s, b, tmpPath) w := s.NewWatchStream() @@ -280,7 +290,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) { } } - if l := len(w.(*watchStream).cancels); l != 0 { + if l := len(st.cancels); l != 0 { t.Errorf("cancels = %d, want 0", l) } } @@ -298,6 +308,9 @@ func TestWatcherRequestProgress(t *testing.T) { store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, nil), unsynced: newWatcherGroup(), synced: newWatcherGroup(), + nextID: 0, + watchers: make(map[WatchID]*watcher), + cancels: make(map[WatchID]cancelFunc), } defer func() { From 4c77e01bcd836afaaf24436c5fc94537d4590e16 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 30 May 2018 13:32:05 -0700 Subject: [PATCH 3/7] clientv3: provide watch ID on watch response Signed-off-by: Gyuho Lee --- clientv3/integration/watch_test.go | 22 ++++++++++++++++++++++ clientv3/watch.go | 10 ++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index f9ac47b4ae4..e7ef99190f3 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -853,6 +853,28 @@ func TestWatchWithCreatedNotificationDropConn(t *testing.T) { } } +// TestWatchIDs tests subsequent watchers have unique watch IDs. +func TestWatchIDs(t *testing.T) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + + cli := cluster.RandClient() + + numWatches := int64(10) + for id := int64(0); id < numWatches; id++ { + ctx, cancel := context.WithCancel(clientv3.WithRequireLeader(context.TODO())) + ww := cli.Watch(ctx, "a", clientv3.WithCreatedNotify()) + wresp := <-ww + cancel() + if wresp.Err() != nil { + t.Fatal(wresp.Err()) + } + if id != wresp.ID { + t.Fatalf("expected watch ID %d, got %d", id, wresp.ID) + } + } +} + // TestWatchCancelOnServer ensures client watcher cancels propagate back to the server. func TestWatchCancelOnServer(t *testing.T) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) diff --git a/clientv3/watch.go b/clientv3/watch.go index 39d149c530e..c8744929818 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -70,6 +70,10 @@ type Watcher interface { type WatchResponse struct { Header pb.ResponseHeader + + // ID is the registered watch ID. + ID int64 + Events []*Event // CompactRevision is the minimum revision the watcher may receive. @@ -403,8 +407,10 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { } // close subscriber's channel if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil { - go w.sendCloseSubstream(ws, &WatchResponse{closeErr: w.closeErr}) + go w.sendCloseSubstream(ws, &WatchResponse{ID: ws.id, closeErr: w.closeErr}) } else if ws.outc != nil { + // TODO: propagate context errors to client? + // ws.outc <- WatchResponse{ID: ws.id, Canceled: true, closeErr: ws.initReq.ctx.Err()} close(ws.outc) } if ws.id != -1 { @@ -599,9 +605,9 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { for i, ev := range pbresp.Events { events[i] = (*Event)(ev) } - // TODO: return watch ID? wr := &WatchResponse{ Header: *pbresp.Header, + ID: pbresp.WatchId, Events: events, CompactRevision: pbresp.CompactRevision, Created: pbresp.Created, From eefcc1d59f9bb1912fd570b82f9c55bb827b338e Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Fri, 1 Jun 2018 11:19:02 -0700 Subject: [PATCH 4/7] clientv3: garbage collect cancelled watchers Signed-off-by: Gyuho Lee --- clientv3/integration/watch_test.go | 54 ++++++++++++++++++++++++++++++ clientv3/watch.go | 5 +++ integration/cluster.go | 10 ++++++ 3 files changed, 69 insertions(+) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index e7ef99190f3..f6a9e32ac3c 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -875,6 +875,60 @@ func TestWatchIDs(t *testing.T) { } } +// TestWatchCancelOnClient tests watch cancel operation from client-side. +func TestWatchCancelOnClient(t *testing.T) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + + client := cluster.RandClient() + numWatches := 10 + + // The grpc proxy starts watches to detect leadership after the proxy server + // returns as started; to avoid racing on the proxy's internal watches, wait + // until require leader watches get create responses to ensure the leadership + // watches have started. + for { + ctx, cancel := context.WithCancel(clientv3.WithRequireLeader(context.TODO())) + ww := client.Watch(ctx, "a", clientv3.WithCreatedNotify()) + wresp := <-ww + cancel() + if wresp.Err() == nil { + break + } + } + + // gRPC proxy creates a watcher with "__lostleader" suffix + w1, werr1 := cluster.Members[0].GetWatcherTotal() + if werr1 != nil { + t.Fatal(werr1) + } + for i := 0; i < numWatches; i++ { + ctx, cancel := context.WithCancel(context.Background()) + w := client.Watch(ctx, fmt.Sprintf("%d", i)) + + // cancel watch operation from client-side + cancel() + + wresp, ok := <-w + if ok { + t.Fatalf("#%d: expected closed channel, got watch channel open %v", i, ok) + } + + // TODO: propagate context error before closing channel? + if wresp.Err() != nil { + t.Fatalf("#%d: expected nil error on watch cancellation, got %v", i, wresp.Err()) + } + } + w2, werr2 := cluster.Members[0].GetWatcherTotal() + if werr2 != nil { + t.Fatal(werr2) + } + + if w1 < w2 { + t.Fatalf("expected watchers to be canceled, got %d != %d", w1, w2) + } +} + // TestWatchCancelOnServer ensures client watcher cancels propagate back to the server. func TestWatchCancelOnServer(t *testing.T) { cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) diff --git a/clientv3/watch.go b/clientv3/watch.go index c8744929818..a021930097e 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -577,6 +577,11 @@ func (w *watchGrpcStream) run() { return case ws := <-w.closingc: + // request to garbage collect cancelled watcher in watch storage backend + wc.Send(&pb.WatchRequest{ + RequestUnion: &pb.WatchRequest_CancelRequest{ + CancelRequest: &pb.WatchCancelRequest{WatchId: ws.id}, + }}) w.closeSubstream(ws) delete(closing, ws) // no more watchers on this stream, shutdown diff --git a/integration/cluster.go b/integration/cluster.go index a1840f73409..bb07f70eb7a 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -27,6 +27,7 @@ import ( "os" "reflect" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -1125,6 +1126,15 @@ func (m *member) Metric(metricName string) (string, error) { return "", nil } +// GetWatcherTotal gets the number of watchers. +func (m *member) GetWatcherTotal() (int64, error) { + ws, err := m.Metric("etcd_debugging_mvcc_watcher_total") + if err != nil { + return -1, err + } + return strconv.ParseInt(ws, 10, 64) +} + // InjectPartition drops connections from m to others, vice versa. func (m *member) InjectPartition(t *testing.T, others ...*member) { for _, other := range others { From 66ba69dcc313bc276713b1150ce6562aea6eedd6 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 7 Jun 2018 12:34:41 -0700 Subject: [PATCH 5/7] integration: "testV3WatchMultipleStreams" with stream coalescing Now that watch response returns the original watch IDs, gRPC proxy test should expect coalesced watch ID, while regular test expects unique watch IDs. Signed-off-by: Gyuho Lee --- integration/v3_watch.go | 134 ++++++++++++++++++ .../v3_watch_multiplex_noproxy_test.go | 33 +++++ integration/v3_watch_multiplex_proxy_test.go | 33 +++++ integration/v3_watch_test.go | 106 -------------- 4 files changed, 200 insertions(+), 106 deletions(-) create mode 100644 integration/v3_watch.go create mode 100644 integration/v3_watch_multiplex_noproxy_test.go create mode 100644 integration/v3_watch_multiplex_proxy_test.go diff --git a/integration/v3_watch.go b/integration/v3_watch.go new file mode 100644 index 00000000000..808eade4a99 --- /dev/null +++ b/integration/v3_watch.go @@ -0,0 +1,134 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "reflect" + "sync" + "testing" + "time" + + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/mvcc/mvccpb" +) + +// testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams. +// if streams are coalesced by gRPC proxy, it expects one shared watch ID in watch responses. +func testV3WatchMultipleStreams(t *testing.T, startRev int64, coalesced bool) { + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + wAPI := toGRPC(clus.RandClient()).Watch + kvc := toGRPC(clus.RandClient()).KV + + streams := make([]pb.Watch_WatchClient, 5) + for i := range streams { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + wStream, errW := wAPI.Watch(ctx) + if errW != nil { + t.Fatalf("wAPI.Watch error: %v", errW) + } + wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("foo"), StartRevision: startRev}}} + if err := wStream.Send(wreq); err != nil { + t.Fatalf("wStream.Send error: %v", err) + } + streams[i] = wStream + } + + for _, wStream := range streams { + wresp, err := wStream.Recv() + if err != nil { + t.Fatalf("wStream.Recv error: %v", err) + } + if !wresp.Created { + t.Fatalf("wresp.Created got = %v, want = true", wresp.Created) + } + } + + if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + + var wg sync.WaitGroup + wg.Add(len(streams)) + + var mu sync.Mutex + ids := make(map[int64]struct{}) + + wevents := []*mvccpb.Event{ + { + Type: mvccpb.PUT, + Kv: &mvccpb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, + }, + } + for i := range streams { + go func(i int) { + defer wg.Done() + wStream := streams[i] + wresp, err := wStream.Recv() + if err != nil { + t.Fatalf("wStream.Recv error: %v", err) + } + mu.Lock() + ids[wresp.WatchId] = struct{}{} + mu.Unlock() + if !reflect.DeepEqual(wresp.Events, wevents) { + t.Errorf("wresp.Events got = %+v, want = %+v", wresp.Events, wevents) + } + // now Recv should block because there is no more events coming + rok, nr := waitResponse(wStream, 1*time.Second) + if !rok { + t.Errorf("unexpected pb.WatchResponse is received %+v", nr) + } + }(i) + } + wg.Wait() + + if !coalesced && len(ids) != 5 { + t.Fatalf("expect unique watch IDs, got %v", ids) + } + if coalesced && len(ids) == 5 { + t.Fatalf("expect coalesced watchers, got %v", ids) + } +} + +// waitResponse waits on the given stream for given duration. +// If there is no more events, true and a nil response will be +// returned closing the WatchClient stream. Or the response will +// be returned. +func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.WatchResponse) { + rCh := make(chan *pb.WatchResponse, 1) + donec := make(chan struct{}) + defer close(donec) + go func() { + resp, _ := wc.Recv() + select { + case rCh <- resp: + case <-donec: + } + }() + select { + case nr := <-rCh: + return false, nr + case <-time.After(timeout): + } + // didn't get response + wc.CloseSend() + return true, nil +} diff --git a/integration/v3_watch_multiplex_noproxy_test.go b/integration/v3_watch_multiplex_noproxy_test.go new file mode 100644 index 00000000000..ea354b1f84a --- /dev/null +++ b/integration/v3_watch_multiplex_noproxy_test.go @@ -0,0 +1,33 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !cluster_proxy + +package integration + +import ( + "testing" + + "github.com/coreos/etcd/pkg/testutil" +) + +func TestV3WatchMultipleStreamsSynced(t *testing.T) { + defer testutil.AfterTest(t) + testV3WatchMultipleStreams(t, 0, false) +} + +func TestV3WatchMultipleStreamsUnsynced(t *testing.T) { + defer testutil.AfterTest(t) + testV3WatchMultipleStreams(t, 1, false) +} diff --git a/integration/v3_watch_multiplex_proxy_test.go b/integration/v3_watch_multiplex_proxy_test.go new file mode 100644 index 00000000000..5c8e6a7c560 --- /dev/null +++ b/integration/v3_watch_multiplex_proxy_test.go @@ -0,0 +1,33 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build cluster_proxy + +package integration + +import ( + "testing" + + "github.com/coreos/etcd/pkg/testutil" +) + +func TestV3WatchMultipleStreamsSynced(t *testing.T) { + defer testutil.AfterTest(t) + testV3WatchMultipleStreams(t, 0, true) +} + +func TestV3WatchMultipleStreamsUnsynced(t *testing.T) { + defer testutil.AfterTest(t) + testV3WatchMultipleStreams(t, 1, true) +} diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index c91f4df6503..3ca94071d8f 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -864,112 +864,6 @@ func TestV3WatchMultipleEventsPutUnsynced(t *testing.T) { } } -func TestV3WatchMultipleStreamsSynced(t *testing.T) { - defer testutil.AfterTest(t) - testV3WatchMultipleStreams(t, 0) -} - -func TestV3WatchMultipleStreamsUnsynced(t *testing.T) { - defer testutil.AfterTest(t) - testV3WatchMultipleStreams(t, 1) -} - -// testV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams. -func testV3WatchMultipleStreams(t *testing.T, startRev int64) { - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - defer clus.Terminate(t) - - wAPI := toGRPC(clus.RandClient()).Watch - kvc := toGRPC(clus.RandClient()).KV - - streams := make([]pb.Watch_WatchClient, 5) - for i := range streams { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - wStream, errW := wAPI.Watch(ctx) - if errW != nil { - t.Fatalf("wAPI.Watch error: %v", errW) - } - wreq := &pb.WatchRequest{RequestUnion: &pb.WatchRequest_CreateRequest{ - CreateRequest: &pb.WatchCreateRequest{ - Key: []byte("foo"), StartRevision: startRev}}} - if err := wStream.Send(wreq); err != nil { - t.Fatalf("wStream.Send error: %v", err) - } - streams[i] = wStream - } - - for _, wStream := range streams { - wresp, err := wStream.Recv() - if err != nil { - t.Fatalf("wStream.Recv error: %v", err) - } - if !wresp.Created { - t.Fatalf("wresp.Created got = %v, want = true", wresp.Created) - } - } - - if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { - t.Fatalf("couldn't put key (%v)", err) - } - - var wg sync.WaitGroup - wg.Add(len(streams)) - wevents := []*mvccpb.Event{ - { - Type: mvccpb.PUT, - Kv: &mvccpb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, - }, - } - for i := range streams { - go func(i int) { - defer wg.Done() - wStream := streams[i] - wresp, err := wStream.Recv() - if err != nil { - t.Fatalf("wStream.Recv error: %v", err) - } - if wresp.WatchId != 0 { - t.Errorf("watchId got = %d, want = 0", wresp.WatchId) - } - if !reflect.DeepEqual(wresp.Events, wevents) { - t.Errorf("wresp.Events got = %+v, want = %+v", wresp.Events, wevents) - } - // now Recv should block because there is no more events coming - rok, nr := waitResponse(wStream, 1*time.Second) - if !rok { - t.Errorf("unexpected pb.WatchResponse is received %+v", nr) - } - }(i) - } - wg.Wait() -} - -// waitResponse waits on the given stream for given duration. -// If there is no more events, true and a nil response will be -// returned closing the WatchClient stream. Or the response will -// be returned. -func waitResponse(wc pb.Watch_WatchClient, timeout time.Duration) (bool, *pb.WatchResponse) { - rCh := make(chan *pb.WatchResponse, 1) - donec := make(chan struct{}) - defer close(donec) - go func() { - resp, _ := wc.Recv() - select { - case rCh <- resp: - case <-donec: - } - }() - select { - case nr := <-rCh: - return false, nr - case <-time.After(timeout): - } - // didn't get response - wc.CloseSend() - return true, nil -} - func TestWatchWithProgressNotify(t *testing.T) { // accelerate report interval so test terminates quickly oldpi := v3rpc.GetProgressReportInterval() From f4fabc2cbc6659bea6e380ad10733bc5bbfbf3f6 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 7 Jun 2018 13:52:42 -0700 Subject: [PATCH 6/7] proxy/grpcproxy: broadcast original watch ID Signed-off-by: Gyuho Lee --- proxy/grpcproxy/watcher.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/proxy/grpcproxy/watcher.go b/proxy/grpcproxy/watcher.go index 1a497462f99..9376d932ac7 100644 --- a/proxy/grpcproxy/watcher.go +++ b/proxy/grpcproxy/watcher.go @@ -112,8 +112,10 @@ func (w *watcher) send(wr clientv3.WatchResponse) { Created: wr.Created, CompactRevision: wr.CompactRevision, Canceled: wr.Canceled, - WatchId: w.id, Events: events, + + // broadcast original watch ID to coalesced watch streams + WatchId: wr.ID, }) } From 5258b1688edc30e8ceff19bdc8e4b2f9aa0c9be1 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 14 Jun 2018 16:21:15 -0700 Subject: [PATCH 7/7] integration: debugging "TestDoubleBarrier" Signed-off-by: Gyuho Lee --- integration/v3_double_barrier_test.go | 47 ++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/integration/v3_double_barrier_test.go b/integration/v3_double_barrier_test.go index da520967d38..fbe1d3c9d1e 100644 --- a/integration/v3_double_barrier_test.go +++ b/integration/v3_double_barrier_test.go @@ -15,6 +15,8 @@ package integration import ( + "fmt" + "os" "testing" "time" @@ -24,71 +26,114 @@ import ( func TestDoubleBarrier(t *testing.T) { clus := NewClusterV3(t, &ClusterConfig{Size: 3}) - defer clus.Terminate(t) + + os.Setenv("CLUSTER_DEBUG", "1") + defer func() { + os.Unsetenv("CLUSTER_DEBUG") + clus.Terminate(t) + }() waiters := 10 + fmt.Println("concurrency.NewSession 1") session, err := concurrency.NewSession(clus.RandClient()) + fmt.Println("concurrency.NewSession 2", err) if err != nil { t.Error(err) } defer session.Orphan() + fmt.Println("NewDoubleBarrier 1") b := recipe.NewDoubleBarrier(session, "test-barrier", waiters) + fmt.Println("NewDoubleBarrier 2") donec := make(chan struct{}) for i := 0; i < waiters-1; i++ { go func() { + fmt.Println("concurrency.NewSession 3") session, err := concurrency.NewSession(clus.RandClient()) + fmt.Println("concurrency.NewSession 4") if err != nil { t.Error(err) } defer session.Orphan() + fmt.Println("NewDoubleBarrier 3") bb := recipe.NewDoubleBarrier(session, "test-barrier", waiters) + fmt.Println("NewDoubleBarrier 4") + + fmt.Println("Enter 1") if err := bb.Enter(); err != nil { + fmt.Println("Enter 2", err) t.Fatalf("could not enter on barrier (%v)", err) } + fmt.Println("Enter 3") donec <- struct{}{} + fmt.Println("Enter 4") + fmt.Println("Lease 1") if err := bb.Leave(); err != nil { + fmt.Println("Lease 2", err) t.Fatalf("could not leave on barrier (%v)", err) } + fmt.Println("Lease 3") donec <- struct{}{} + fmt.Println("Lease 4") }() } + fmt.Println("<-donec 1") time.Sleep(10 * time.Millisecond) select { case <-donec: + fmt.Println("<-donec 2") t.Fatalf("barrier did not enter-wait") default: + fmt.Println("<-donec 3") } + fmt.Println("<-donec 4") + fmt.Println("Enter 10") if err := b.Enter(); err != nil { + fmt.Println("Enter 11", err) t.Fatalf("could not enter last barrier (%v)", err) } + fmt.Println("Enter 12") timerC := time.After(time.Duration(waiters*100) * time.Millisecond) for i := 0; i < waiters-1; i++ { + fmt.Println("waiters 1", i) select { case <-timerC: + fmt.Println("waiters 2", i) t.Fatalf("barrier enter timed out") case <-donec: + fmt.Println("waiters 3", i) } } + fmt.Println("donec 10-1") time.Sleep(10 * time.Millisecond) select { case <-donec: + fmt.Println("donec 10-2") t.Fatalf("barrier did not leave-wait") default: + fmt.Println("donec 10-3") } + fmt.Println("donec 10-4") + fmt.Println("Leave 1") b.Leave() + fmt.Println("Leave 2") + + fmt.Println("waiter 100-1") timerC = time.After(time.Duration(waiters*100) * time.Millisecond) for i := 0; i < waiters-1; i++ { + fmt.Println("waiter 100-2", i) select { case <-timerC: + fmt.Println("waiter 100-3", i) t.Fatalf("barrier leave timed out") case <-donec: + fmt.Println("waiter 100-4", i) } } }