From d5e235c8311f43cd027c4d76a34775ce73ed60bd Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 13 Dec 2016 14:22:29 +0100 Subject: [PATCH] Reduce timeout for waiting for resource version --- pkg/api/errors/errors.go | 6 ++++++ pkg/storage/cacher.go | 6 +++--- pkg/storage/cacher_test.go | 23 +++++++++++++++++++++++ pkg/storage/watch_cache.go | 18 +++++++++++------- pkg/storage/watch_cache_test.go | 2 +- 5 files changed, 44 insertions(+), 11 deletions(-) diff --git a/pkg/api/errors/errors.go b/pkg/api/errors/errors.go index b3cb1a2b36a96..23b21408e9119 100644 --- a/pkg/api/errors/errors.go +++ b/pkg/api/errors/errors.go @@ -404,6 +404,12 @@ func IsForbidden(err error) bool { return reasonForError(err) == metav1.StatusReasonForbidden } +// IsTimeout determines if err is an error which indicates that request times out due to long +// processing. +func IsTimeout(err error) bool { + return reasonForError(err) == metav1.StatusReasonTimeout +} + // IsServerTimeout determines if err is an error which indicates that the request needs to be retried // by the client. func IsServerTimeout(err error) bool { diff --git a/pkg/storage/cacher.go b/pkg/storage/cacher.go index d82252c8c6aae..f709abb1e19f2 100644 --- a/pkg/storage/cacher.go +++ b/pkg/storage/cacher.go @@ -376,7 +376,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil) if err != nil { - return fmt.Errorf("failed to wait for fresh get: %v", err) + return err } if exists { @@ -429,7 +429,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) if err != nil { - return fmt.Errorf("failed to wait for fresh get: %v", err) + return err } trace.Step("Got from cache") @@ -485,7 +485,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace) if err != nil { - return fmt.Errorf("failed to wait for fresh list: %v", err) + return err } trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs))) if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() { diff --git a/pkg/storage/cacher_test.go b/pkg/storage/cacher_test.go index 9fae55ecd7ab7..34938ca352479 100644 --- a/pkg/storage/cacher_test.go +++ b/pkg/storage/cacher_test.go @@ -208,6 +208,29 @@ func TestList(t *testing.T) { } } +func TestInfiniteList(t *testing.T) { + server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher := newTestCacher(etcdStorage, 10) + defer cacher.Stop() + + podFoo := makeTestPod("foo") + fooCreated := updatePod(t, etcdStorage, podFoo, nil) + + // Set up List at fooCreated.ResourceVersion + 10 + rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + listRV := strconv.Itoa(int(rv + 10)) + + result := &api.PodList{} + err = cacher.List(context.TODO(), "pods/ns", listRV, storage.Everything, result) + if !errors.IsTimeout(err) { + t.Errorf("Unexpected error: %v", err) + } +} + func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) { _, _, line, _ := goruntime.Caller(1) select { diff --git a/pkg/storage/watch_cache.go b/pkg/storage/watch_cache.go index 27b7f97827feb..1584735212317 100644 --- a/pkg/storage/watch_cache.go +++ b/pkg/storage/watch_cache.go @@ -35,9 +35,11 @@ import ( ) const ( - // MaximumListWait determines how long we're willing to wait for a - // list if a client specified a resource version in the future. - MaximumListWait = 60 * time.Second + // blockTimeout determines how long we're willing to block the request + // to wait for a given resource version to be propagated to cache, + // before terminating request and returning Timeout error with retry + // after suggestion. + blockTimeout = 3 * time.Second ) // watchCacheEvent is a single "watch event" that is send to users of @@ -206,7 +208,8 @@ func parseResourceVersion(resourceVersion string) (uint64, error) { if resourceVersion == "" { return 0, nil } - return strconv.ParseUint(resourceVersion, 10, 64) + // Use bitsize being the size of int on the machine. + return strconv.ParseUint(resourceVersion, 10, 0) } func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error { @@ -288,7 +291,7 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *util. // it will wake up the loop below sometime after the broadcast, // we don't need to worry about waking it up before the time // has expired accidentally. - <-w.clock.After(MaximumListWait) + <-w.clock.After(blockTimeout) w.cond.Broadcast() }() @@ -297,8 +300,9 @@ func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *util. trace.Step("watchCache locked acquired") } for w.resourceVersion < resourceVersion { - if w.clock.Since(startTime) >= MaximumListWait { - return fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion) + if w.clock.Since(startTime) >= blockTimeout { + // Timeout with retry after 1 second. + return errors.NewTimeoutError(fmt.Sprintf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion), 1) } w.cond.Wait() } diff --git a/pkg/storage/watch_cache_test.go b/pkg/storage/watch_cache_test.go index 581f8749a96af..87f03be389801 100644 --- a/pkg/storage/watch_cache_test.go +++ b/pkg/storage/watch_cache_test.go @@ -302,7 +302,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) { for !fc.HasWaiters() { time.Sleep(time.Millisecond) } - fc.Step(MaximumListWait) + fc.Step(blockTimeout) // Add an object to make sure the test would // eventually fail instead of just waiting