From 04e467e0f2d0d2c63778619cd8fbfcca7d707299 Mon Sep 17 00:00:00 2001 From: Yanwei Guo Date: Tue, 16 Jul 2019 23:15:27 -0700 Subject: [PATCH] Release pending requests count of breaker if the request is timeout (#4776) * defer releasing requests * add test for overflows --- pkg/queue/breaker.go | 9 +++++++-- pkg/queue/breaker_test.go | 19 +++++++++++-------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/pkg/queue/breaker.go b/pkg/queue/breaker.go index cd3a024876a8..a169530bc161 100644 --- a/pkg/queue/breaker.go +++ b/pkg/queue/breaker.go @@ -76,18 +76,23 @@ func (b *Breaker) Maybe(ctx context.Context, thunk func()) bool { return false case b.pendingRequests <- struct{}{}: // Pending request has capacity. + // Defer releasing pending request queue. + defer func() { + <-b.pendingRequests + }() + // Wait for capacity in the active queue. if !b.sem.acquire(ctx) { return false } - // Defer releasing capacity in the active and pending request queue. + // Defer releasing capacity in the active. defer func() { // It's safe to ignore the error returned by release since we // make sure the semaphore is only manipulated here and acquire // + release calls are equally paired. b.sem.release() - <-b.pendingRequests }() + // Do the thing. thunk() // Report success diff --git a/pkg/queue/breaker_test.go b/pkg/queue/breaker_test.go index 1fa923e12e8b..0ab3bd53b172 100644 --- a/pkg/queue/breaker_test.go +++ b/pkg/queue/breaker_test.go @@ -131,22 +131,25 @@ func TestBreakerCancel(t *testing.T) { cancel1() reqs.expectFailure(t) - // Let through a request with capacity then timeout following request + // Let through a request with capacity. b.UpdateConcurrency(1) reqs.request() - // Exceed capacity and assert failure. This makes sure the Breaker is consistently - // at capacity. - reqs.request() - reqs.expectFailure(t) - - // This request cannot get capacity. + // This request fails due to canceled. ctx2, cancel2 := context.WithCancel(context.Background()) reqs.requestWithContext(ctx2) cancel2() reqs.expectFailure(t) - // The request that was put in earlier should succeed. + // This request is cached + reqs.request() + + // This request fails due to overflows. + reqs.request() + reqs.expectFailure(t) + + // The requests that were put in earlier should succeed. + reqs.processSuccessfully(t) reqs.processSuccessfully(t) }