Skip to content

Commit

Permalink
Merge pull request kubernetes#38705 from wojtek-t/fix_watch_cache
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

Reduce timeout for waiting for resource version

Ref kubernetes#37473
  • Loading branch information
Kubernetes Submit Queue authored Dec 20, 2016
2 parents 149bb30 + d5e235c commit d058240
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 11 deletions.
6 changes: 6 additions & 0 deletions pkg/api/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,12 @@ func IsForbidden(err error) bool {
return reasonForError(err) == metav1.StatusReasonForbidden
}

// IsTimeout determines if err is an error which indicates that request times out due to long
// processing.
func IsTimeout(err error) bool {
return reasonForError(err) == metav1.StatusReasonTimeout
}

// IsServerTimeout determines if err is an error which indicates that the request needs to be retried
// by the client.
func IsServerTimeout(err error) bool {
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob

obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
if err != nil {
return fmt.Errorf("failed to wait for fresh get: %v", err)
return err
}

if exists {
Expand Down Expand Up @@ -429,7 +429,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri

obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
if err != nil {
return fmt.Errorf("failed to wait for fresh get: %v", err)
return err
}
trace.Step("Got from cache")

Expand Down Expand Up @@ -485,7 +485,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p

objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
if err != nil {
return fmt.Errorf("failed to wait for fresh list: %v", err)
return err
}
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
Expand Down
23 changes: 23 additions & 0 deletions pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,29 @@ func TestList(t *testing.T) {
}
}

func TestInfiniteList(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage, 10)
defer cacher.Stop()

podFoo := makeTestPod("foo")
fooCreated := updatePod(t, etcdStorage, podFoo, nil)

// Set up List at fooCreated.ResourceVersion + 10
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
listRV := strconv.Itoa(int(rv + 10))

result := &api.PodList{}
err = cacher.List(context.TODO(), "pods/ns", listRV, storage.Everything, result)
if !errors.IsTimeout(err) {
t.Errorf("Unexpected error: %v", err)
}
}

func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
_, _, line, _ := goruntime.Caller(1)
select {
Expand Down
18 changes: 11 additions & 7 deletions pkg/storage/watch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ import (
)

const (
// MaximumListWait determines how long we're willing to wait for a
// list if a client specified a resource version in the future.
MaximumListWait = 60 * time.Second
// blockTimeout determines how long we're willing to block the request
// to wait for a given resource version to be propagated to cache,
// before terminating request and returning Timeout error with retry
// after suggestion.
blockTimeout = 3 * time.Second
)

// watchCacheEvent is a single "watch event" that is send to users of
Expand Down Expand Up @@ -206,7 +208,8 @@ func parseResourceVersion(resourceVersion string) (uint64, error) {
if resourceVersion == "" {
return 0, nil
}
return strconv.ParseUint(resourceVersion, 10, 64)
// Use bitsize being the size of int on the machine.
return strconv.ParseUint(resourceVersion, 10, 0)
}

func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
Expand Down Expand Up @@ -288,7 +291,7 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *util.
// it will wake up the loop below sometime after the broadcast,
// we don't need to worry about waking it up before the time
// has expired accidentally.
<-w.clock.After(MaximumListWait)
<-w.clock.After(blockTimeout)
w.cond.Broadcast()
}()

Expand All @@ -297,8 +300,9 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *util.
trace.Step("watchCache locked acquired")
}
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= MaximumListWait {
return fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion)
if w.clock.Since(startTime) >= blockTimeout {
// Timeout with retry after 1 second.
return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1)
}
w.cond.Wait()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/watch_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
for !fc.HasWaiters() {
time.Sleep(time.Millisecond)
}
fc.Step(MaximumListWait)
fc.Step(blockTimeout)

// Add an object to make sure the test would
// eventually fail instead of just waiting
Expand Down

0 comments on commit d058240

Please sign in to comment.