Skip to content

Commit

Permalink
Merge pull request kubernetes#108042 from MadhavJivrajani/cacher-cleanup
Browse files Browse the repository at this point in the history
cacher: Minor cleanup and refactor of code and tests
  • Loading branch information
k8s-ci-robot authored Feb 10, 2022
2 parents 3b4a9cd + c3081b4 commit 56273a6
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 56 deletions.
24 changes: 12 additions & 12 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
cacheInterval, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
Expand Down Expand Up @@ -656,7 +656,7 @@ func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions,
return c.delegateList(ctx, key, opts, listObj, recursive)
}

trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
trace := utiltrace.New("cacher list", utiltrace.Field{Key: "type", Value: c.objectType.String()})
defer trace.LogIfLong(500 * time.Millisecond)

c.ready.wait()
Expand All @@ -680,7 +680,7 @@ func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions,
if err != nil {
return err
}
trace.Step("Listed items from cache", utiltrace.Field{"count", len(objs)})
trace.Step("Listed items from cache", utiltrace.Field{Key: "count", Value: len(objs)})
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
// Resize the slice appropriately, since we already know that none
// of the elements will be filtered out.
Expand All @@ -696,7 +696,7 @@ func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions,
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
trace.Step("Filtered items", utiltrace.Field{"count", listVal.Len()})
trace.Step("Filtered items", utiltrace.Field{Key: "count", Value: listVal.Len()})
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
return err
Expand Down Expand Up @@ -992,22 +992,22 @@ func (c *Cacher) finishDispatching() {
defer c.Unlock()
c.dispatching = false
for _, watcher := range c.watchersToStop {
watcher.stopThreadUnsafe()
watcher.stopLocked()
}
c.watchersToStop = c.watchersToStop[:0]
}

func (c *Cacher) terminateAllWatchers() {
c.Lock()
defer c.Unlock()
c.watchers.terminateAll(c.objectType, c.stopWatcherThreadUnsafe)
c.watchers.terminateAll(c.objectType, c.stopWatcherLocked)
}

func (c *Cacher) stopWatcherThreadUnsafe(watcher *cacheWatcher) {
func (c *Cacher) stopWatcherLocked(watcher *cacheWatcher) {
if c.dispatching {
c.watchersToStop = append(c.watchersToStop, watcher)
} else {
watcher.stopThreadUnsafe()
watcher.stopLocked()
}
}

Expand Down Expand Up @@ -1037,9 +1037,9 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b
defer c.Unlock()

// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopThreadUnsafe()
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe)
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherLocked)
}
}

Expand Down Expand Up @@ -1196,8 +1196,8 @@ func (c *cacheWatcher) Stop() {
c.forget()
}

// we rely on the fact that stopThredUnsafe is actually protected by Cacher.Lock()
func (c *cacheWatcher) stopThreadUnsafe() {
// we rely on the fact that stopLocked is actually protected by Cacher.Lock()
func (c *cacheWatcher) stopLocked() {
if !c.stopped {
c.stopped = true
close(c.done)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
// forget() has to stop the watcher, as only stopping the watcher
// triggers stopping the process() goroutine which we are in the
// end waiting for in this test.
w.stopThreadUnsafe()
w.stopLocked()
}
initEvents := []*watchCacheEvent{
{Object: &v1.Pod{}},
Expand Down Expand Up @@ -210,7 +210,7 @@ TestCase:
break TestCase
default:
}
w.stopThreadUnsafe()
w.stopLocked()
}
}

Expand Down Expand Up @@ -518,7 +518,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
done := make(chan struct{})
filter := func(string, labels.Set, fields.Set) bool { return true }
forget := func() {
w.stopThreadUnsafe()
w.stopLocked()
done <- struct{}{}
}

Expand All @@ -541,14 +541,15 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "")
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
ctx, _ := context.WithDeadline(context.Background(), deadline)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
go w.processInterval(ctx, intervalFromEvents(nil), 0)
select {
case <-w.ResultChan():
case <-time.After(time.Second):
t.Fatal("expected received a event on ResultChan")
}
w.stopThreadUnsafe()
w.stopLocked()
}
}

Expand Down Expand Up @@ -661,7 +662,8 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
case <-stopCh:
return
default:
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
if err != nil {
watchErr = fmt.Errorf("Failed to create watch: %v", err)
Expand Down Expand Up @@ -715,7 +717,8 @@ func TestWatchInitializationSignal(t *testing.T) {
}
defer cacher.Stop()

ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
initSignal := utilflowcontrol.NewInitializationSignal()
ctx = utilflowcontrol.WithInitializationSignal(ctx, initSignal)

Expand All @@ -740,7 +743,8 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
pred := storage.Everything
pred.AllowWatchBookmarks = allowWatchBookmarks

ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
Expand Down Expand Up @@ -852,7 +856,8 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
t.Fatalf("failed to add a pod: %v", err)
}

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "100", Predicate: pred})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
Expand Down Expand Up @@ -919,7 +924,8 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
for i := 0; i < 1000; i++ {
pred := storage.Everything
pred.AllowWatchBookmarks = true
ctx, _ := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: pred})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
Expand Down Expand Up @@ -1100,26 +1106,23 @@ func TestStartingResourceVersion(t *testing.T) {
}
}

select {
case e, ok := <-watcher.ResultChan():
if !ok {
t.Errorf("unexpectedly closed watch")
break
}
object := e.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
pod := object.(*examplev1.Pod)
podRV, err := cacher.versioner.ParseResourceVersion(pod.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
e, ok := <-watcher.ResultChan()
if !ok {
t.Errorf("unexpectedly closed watch")
}
object := e.Object
if co, ok := object.(runtime.CacheableObject); ok {
object = co.GetObject()
}
pod := object.(*examplev1.Pod)
podRV, err := cacher.versioner.ParseResourceVersion(pod.ResourceVersion)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// event should have at least rv + 1, since we're starting the watch at rv
if podRV <= startVersion {
t.Errorf("expected event with resourceVersion of at least %d, got %d", startVersion+1, podRV)
}
// event should have at least rv + 1, since we're starting the watch at rv
if podRV <= startVersion {
t.Errorf("expected event with resourceVersion of at least %d, got %d", startVersion+1, podRV)
}
}

Expand Down
27 changes: 15 additions & 12 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,21 @@ func (w *watchCache) SetOnReplace(onReplace func()) {
w.onReplace = onReplace
}

func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) (*watchCacheInterval, error) {
func (w *watchCache) Resync() error {
// Nothing to do
return nil
}

// isIndexValidLocked checks if a given index is still valid.
// This assumes that the lock is held.
func (w *watchCache) isIndexValidLocked(index int) bool {
return index >= w.startIndex
}

// getAllEventsSinceLocked returns a watchCacheInterval that can be used to
// retrieve events since a certain resourceVersion. This function assumes to
// be called under the watchCache lock.
func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCacheInterval, error) {
size := w.endIndex - w.startIndex
var oldest uint64
switch {
Expand Down Expand Up @@ -620,14 +634,3 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) (*wat
ci := newCacheInterval(w.startIndex+first, w.endIndex, indexerFunc, w.indexValidator, &w.RWMutex)
return ci, nil
}

func (w *watchCache) Resync() error {
// Nothing to do
return nil
}

// isIndexValidLocked checks if a given index is still valid.
// This assumes that the lock is held.
func (w *watchCache) isIndexValidLocked(index int) bool {
return index >= w.startIndex
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ import (
// for starting a watch and reduce the maximum possible time
// interval for which the lock would be held while events are
// copied over.

//
// The source of events for the interval is typically either
// the watchCache circular buffer, if events being retrieved
// need to be for resource versions > 0 or the underlying
// implementation of Store, if resource version = 0.

//
// Furthermore, an interval can be either valid or invalid at
// any given point of time. The notion of validity makes sense
// only in cases where the window of events in the underlying
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64) (*wat
w.RLock()
defer w.RUnlock()

return w.GetAllEventsSinceThreadUnsafe(resourceVersion)
return w.getAllEventsSinceLocked(resourceVersion)
}

// newTestWatchCache just adds a fake clock.
Expand Down

0 comments on commit 56273a6

Please sign in to comment.