Skip to content

Commit

Permalink
Fix behavior of rate limit option in priorityqueue.AddWithOpts
Browse files Browse the repository at this point in the history
Signed-off-by: zach593 <zach_li@outlook.com>
  • Loading branch information
zach593 authored and k8s-infra-cherrypick-robot committed Feb 8, 2025
1 parent fc48583 commit cad72fa
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 5 deletions.
12 changes: 7 additions & 5 deletions pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/internal/metrics"
)

Expand Down Expand Up @@ -132,16 +133,17 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
defer w.lock.Unlock()

for _, key := range items {
after := o.After
if o.RateLimited {
after := w.rateLimiter.When(key)
if o.After == 0 || after < o.After {
o.After = after
rlAfter := w.rateLimiter.When(key)
if after == 0 || rlAfter < after {
after = rlAfter
}
}

var readyAt *time.Time
if o.After > 0 {
readyAt = ptr.To(w.now().Add(o.After))
if after > 0 {
readyAt = ptr.To(w.now().Add(after))
w.metrics.retry()
}
if _, ok := w.items[key]; !ok {
Expand Down
67 changes: 67 additions & 0 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
)

var _ = Describe("Controllerworkqueue", func() {
Expand Down Expand Up @@ -438,6 +439,72 @@ var _ = Describe("Controllerworkqueue", func() {
Expect(metrics.depth["test"]).To(Equal(0))
metrics.mu.Unlock()
})

It("When adding items with rateLimit, previous items' rateLimit should not affect subsequent items", func() {
q, metrics := newQueue()
defer q.ShutDown()

now := time.Now().Round(time.Second)
nowLock := sync.Mutex{}
tick := make(chan time.Time)

cwq := q.(*priorityqueue[string])
cwq.rateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second)
cwq.now = func() time.Time {
nowLock.Lock()
defer nowLock.Unlock()
return now
}
cwq.tick = func(d time.Duration) <-chan time.Time {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
defer close(done)

Expect(d).To(Or(Equal(5*time.Millisecond), Equal(635*time.Millisecond)))
}()
<-done
return tick
}

retrievedItem := make(chan struct{})
retrievedSecondItem := make(chan struct{})

go func() {
defer GinkgoRecover()
first, _, _ := q.GetWithPriority()
Expect(first).To(Equal("foo"))
close(retrievedItem)

second, _, _ := q.GetWithPriority()
Expect(second).To(Equal("bar"))
close(retrievedSecondItem)
}()

// after 7 calls, the next When("bar") call will return 640ms.
for range 7 {
cwq.rateLimiter.When("bar")
}
q.AddWithOpts(AddOpts{RateLimited: true}, "foo", "bar")

Consistently(retrievedItem).ShouldNot(BeClosed())
nowLock.Lock()
now = now.Add(5 * time.Millisecond)
nowLock.Unlock()
tick <- now
Eventually(retrievedItem).Should(BeClosed())

Consistently(retrievedSecondItem).ShouldNot(BeClosed())
nowLock.Lock()
now = now.Add(635 * time.Millisecond)
nowLock.Unlock()
tick <- now
Eventually(retrievedSecondItem).Should(BeClosed())

Expect(metrics.depth["test"]).To(Equal(0))
Expect(metrics.adds["test"]).To(Equal(2))
Expect(metrics.retries["test"]).To(Equal(2))
})
})

func BenchmarkAddGetDone(b *testing.B) {
Expand Down

0 comments on commit cad72fa

Please sign in to comment.