From c3081b48759db1f05a446f2acca7e05c4511ce2e Mon Sep 17 00:00:00 2001 From: Madhav Jivrajani Date: Thu, 10 Feb 2022 13:44:34 +0530 Subject: [PATCH] cacher: Minor cleanup and refactor of code and tests * Remove linter warnings. * Cancel contexts to avoid leaks. * Rename a few XXXThreadUnsafe to XXXLocked to maintain consistency. * A few are still called XXXThreadUnsafe mainly because those are safe to be called from the perspective that only one gorotuine will access them - not really called under a lock. Signed-off-by: Madhav Jivrajani --- .../apiserver/pkg/storage/cacher/cacher.go | 24 ++++---- .../storage/cacher/cacher_whitebox_test.go | 61 ++++++++++--------- .../pkg/storage/cacher/watch_cache.go | 27 ++++---- .../storage/cacher/watch_cache_interval.go | 4 +- .../pkg/storage/cacher/watch_cache_test.go | 2 +- 5 files changed, 62 insertions(+), 56 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 930939cb7638d..001a96a02ba2d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -509,7 +509,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // underlying watchCache is calling processEvent under its lock. c.watchCache.RLock() defer c.watchCache.RUnlock() - cacheInterval, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV) + cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV) if err != nil { // To match the uncached watch implementation, once we have passed authn/authz/admission, // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR, @@ -656,7 +656,7 @@ func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions, return c.delegateList(ctx, key, opts, listObj, recursive) } - trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()}) + trace := utiltrace.New("cacher list", utiltrace.Field{Key: "type", Value: c.objectType.String()}) defer trace.LogIfLong(500 * time.Millisecond) c.ready.wait() @@ -680,7 +680,7 @@ func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions, if err != nil { return err } - trace.Step("Listed items from cache", utiltrace.Field{"count", len(objs)}) + trace.Step("Listed items from cache", utiltrace.Field{Key: "count", Value: len(objs)}) if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() { // Resize the slice appropriately, since we already know that none // of the elements will be filtered out. @@ -696,7 +696,7 @@ func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions, listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) } } - trace.Step("Filtered items", utiltrace.Field{"count", listVal.Len()}) + trace.Step("Filtered items", utiltrace.Field{Key: "count", Value: listVal.Len()}) if c.versioner != nil { if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil { return err @@ -992,7 +992,7 @@ func (c *Cacher) finishDispatching() { defer c.Unlock() c.dispatching = false for _, watcher := range c.watchersToStop { - watcher.stopThreadUnsafe() + watcher.stopLocked() } c.watchersToStop = c.watchersToStop[:0] } @@ -1000,14 +1000,14 @@ func (c *Cacher) finishDispatching() { func (c *Cacher) terminateAllWatchers() { c.Lock() defer c.Unlock() - c.watchers.terminateAll(c.objectType, c.stopWatcherThreadUnsafe) + c.watchers.terminateAll(c.objectType, c.stopWatcherLocked) } -func (c *Cacher) stopWatcherThreadUnsafe(watcher *cacheWatcher) { +func (c *Cacher) stopWatcherLocked(watcher *cacheWatcher) { if c.dispatching { c.watchersToStop = append(c.watchersToStop, watcher) } else { - watcher.stopThreadUnsafe() + watcher.stopLocked() } } @@ -1037,9 +1037,9 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b defer c.Unlock() // It's possible that the watcher is already not in the structure (e.g. in case of - // simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopThreadUnsafe() + // simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked() // on a watcher multiple times. - c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe) + c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherLocked) } } @@ -1196,8 +1196,8 @@ func (c *cacheWatcher) Stop() { c.forget() } -// we rely on the fact that stopThredUnsafe is actually protected by Cacher.Lock() -func (c *cacheWatcher) stopThreadUnsafe() { +// we rely on the fact that stopLocked is actually protected by Cacher.Lock() +func (c *cacheWatcher) stopLocked() { if !c.stopped { c.stopped = true close(c.done) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 924272f91b8c4..30fbe2388e0bc 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -65,7 +65,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { // forget() has to stop the watcher, as only stopping the watcher // triggers stopping the process() goroutine which we are in the // end waiting for in this test. - w.stopThreadUnsafe() + w.stopLocked() } initEvents := []*watchCacheEvent{ {Object: &v1.Pod{}}, @@ -210,7 +210,7 @@ TestCase: break TestCase default: } - w.stopThreadUnsafe() + w.stopLocked() } } @@ -518,7 +518,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { done := make(chan struct{}) filter := func(string, labels.Set, fields.Set) bool { return true } forget := func() { - w.stopThreadUnsafe() + w.stopLocked() done <- struct{}{} } @@ -541,14 +541,15 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "") w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} - ctx, _ := context.WithDeadline(context.Background(), deadline) + ctx, cancel := context.WithDeadline(context.Background(), deadline) + defer cancel() go w.processInterval(ctx, intervalFromEvents(nil), 0) select { case <-w.ResultChan(): case <-time.After(time.Second): t.Fatal("expected received a event on ResultChan") } - w.stopThreadUnsafe() + w.stopLocked() } } @@ -661,7 +662,8 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { case <-stopCh: return default: - ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred}) if err != nil { watchErr = fmt.Errorf("Failed to create watch: %v", err) @@ -715,7 +717,8 @@ func TestWatchInitializationSignal(t *testing.T) { } defer cacher.Stop() - ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() initSignal := utilflowcontrol.NewInitializationSignal() ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal) @@ -740,7 +743,8 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo pred := storage.Everything pred.AllowWatchBookmarks = allowWatchBookmarks - ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred}) if err != nil { t.Fatalf("Failed to create watch: %v", err) @@ -852,7 +856,8 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) { t.Fatalf("failed to add a pod: %v", err) } - ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "100", Predicate: pred}) if err != nil { t.Fatalf("Failed to create watch: %v", err) @@ -919,7 +924,8 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { for i := 0; i < 1000; i++ { pred := storage.Everything pred.AllowWatchBookmarks = true - ctx, _ := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: pred}) if err != nil { t.Fatalf("Failed to create watch: %v", err) @@ -1100,26 +1106,23 @@ func TestStartingResourceVersion(t *testing.T) { } } - select { - case e, ok := <-watcher.ResultChan(): - if !ok { - t.Errorf("unexpectedly closed watch") - break - } - object := e.Object - if co, ok := object.(runtime.CacheableObject); ok { - object = co.GetObject() - } - pod := object.(*examplev1.Pod) - podRV, err := cacher.versioner.ParseResourceVersion(pod.ResourceVersion) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } + e, ok := <-watcher.ResultChan() + if !ok { + t.Errorf("unexpectedly closed watch") + } + object := e.Object + if co, ok := object.(runtime.CacheableObject); ok { + object = co.GetObject() + } + pod := object.(*examplev1.Pod) + podRV, err := cacher.versioner.ParseResourceVersion(pod.ResourceVersion) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } - // event should have at least rv + 1, since we're starting the watch at rv - if podRV <= startVersion { - t.Errorf("expected event with resourceVersion of at least %d, got %d", startVersion+1, podRV) - } + // event should have at least rv + 1, since we're starting the watch at rv + if podRV <= startVersion { + t.Errorf("expected event with resourceVersion of at least %d, got %d", startVersion+1, podRV) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index b9dba258d3dfc..3c9bd926893a3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -573,7 +573,21 @@ func (w *watchCache) SetOnReplace(onReplace func()) { w.onReplace = onReplace } -func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) (*watchCacheInterval, error) { +func (w *watchCache) Resync() error { + // Nothing to do + return nil +} + +// isIndexValidLocked checks if a given index is still valid. +// This assumes that the lock is held. +func (w *watchCache) isIndexValidLocked(index int) bool { + return index >= w.startIndex +} + +// getAllEventsSinceLocked returns a watchCacheInterval that can be used to +// retrieve events since a certain resourceVersion. This function assumes to +// be called under the watchCache lock. +func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCacheInterval, error) { size := w.endIndex - w.startIndex var oldest uint64 switch { @@ -620,14 +634,3 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) (*wat ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex) return ci, nil } - -func (w *watchCache) Resync() error { - // Nothing to do - return nil -} - -// isIndexValidLocked checks if a given index is still valid. -// This assumes that the lock is held. -func (w *watchCache) isIndexValidLocked(index int) bool { - return index >= w.startIndex -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go index d25fe1f49f5bd..833d10e153a10 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go @@ -35,12 +35,12 @@ import ( // for starting a watch and reduce the maximum possible time // interval for which the lock would be held while events are // copied over. - +// // The source of events for the interval is typically either // the watchCache circular buffer, if events being retrieved // need to be for resource versions > 0 or the underlying // implementation of Store, if resource version = 0. - +// // Furthermore, an interval can be either valid or invalid at // any given point of time. The notion of validity makes sense // only in cases where the window of events in the underlying diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 6a4a62dcc8f94..29e99f6891c68 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -96,7 +96,7 @@ func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64) (*wat w.RLock() defer w.RUnlock() - return w.GetAllEventsSinceThreadUnsafe(resourceVersion) + return w.getAllEventsSinceLocked(resourceVersion) } // newTestWatchCache just adds a fake clock.