Skip to content

Commit

Permalink
Release pending requests count of breaker if the request is timeout (#…
Browse files Browse the repository at this point in the history
…4776)

* defer releasing requests

* add test for overflows
  • Loading branch information
yanweiguo authored and knative-prow-robot committed Jul 17, 2019
1 parent 949e9b5 commit 04e467e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
9 changes: 7 additions & 2 deletions pkg/queue/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,23 @@ func (b *Breaker) Maybe(ctx context.Context, thunk func()) bool {
return false
case b.pendingRequests <- struct{}{}:
// Pending request has capacity.
// Defer releasing pending request queue.
defer func() {
<-b.pendingRequests
}()

// Wait for capacity in the active queue.
if !b.sem.acquire(ctx) {
return false
}
// Defer releasing capacity in the active and pending request queue.
// Defer releasing capacity in the active.
defer func() {
// It's safe to ignore the error returned by release since we
// make sure the semaphore is only manipulated here and acquire
// + release calls are equally paired.
b.sem.release()
<-b.pendingRequests
}()

// Do the thing.
thunk()
// Report success
Expand Down
19 changes: 11 additions & 8 deletions pkg/queue/breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,25 @@ func TestBreakerCancel(t *testing.T) {
cancel1()
reqs.expectFailure(t)

// Let through a request with capacity then timeout following request
// Let through a request with capacity.
b.UpdateConcurrency(1)
reqs.request()

// Exceed capacity and assert failure. This makes sure the Breaker is consistently
// at capacity.
reqs.request()
reqs.expectFailure(t)

// This request cannot get capacity.
// This request fails due to canceled.
ctx2, cancel2 := context.WithCancel(context.Background())
reqs.requestWithContext(ctx2)
cancel2()
reqs.expectFailure(t)

// The request that was put in earlier should succeed.
// This request is cached
reqs.request()

// This request fails due to overflows.
reqs.request()
reqs.expectFailure(t)

// The requests that were put in earlier should succeed.
reqs.processSuccessfully(t)
reqs.processSuccessfully(t)
}

Expand Down

0 comments on commit 04e467e

Please sign in to comment.