From cad72fa419af1376359af268dcc94a0a23a99fe8 Mon Sep 17 00:00:00 2001 From: zach593 Date: Fri, 7 Feb 2025 00:22:52 +0800 Subject: [PATCH] Fix behavior of rate limit option in priorityqueue.AddWithOpts Signed-off-by: zach593 --- pkg/controller/priorityqueue/priorityqueue.go | 12 ++-- .../priorityqueue/priorityqueue_test.go | 67 +++++++++++++++++++ 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/pkg/controller/priorityqueue/priorityqueue.go b/pkg/controller/priorityqueue/priorityqueue.go index 996369f47a..ff5dea9021 100644 --- a/pkg/controller/priorityqueue/priorityqueue.go +++ b/pkg/controller/priorityqueue/priorityqueue.go @@ -11,6 +11,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/internal/metrics" ) @@ -132,16 +133,17 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) { defer w.lock.Unlock() for _, key := range items { + after := o.After if o.RateLimited { - after := w.rateLimiter.When(key) - if o.After == 0 || after < o.After { - o.After = after + rlAfter := w.rateLimiter.When(key) + if after == 0 || rlAfter < after { + after = rlAfter } } var readyAt *time.Time - if o.After > 0 { - readyAt = ptr.To(w.now().Add(o.After)) + if after > 0 { + readyAt = ptr.To(w.now().Add(after)) w.metrics.retry() } if _, ok := w.items[key]; !ok { diff --git a/pkg/controller/priorityqueue/priorityqueue_test.go b/pkg/controller/priorityqueue/priorityqueue_test.go index 18de95aac2..f54d3cc11c 100644 --- a/pkg/controller/priorityqueue/priorityqueue_test.go +++ b/pkg/controller/priorityqueue/priorityqueue_test.go @@ -11,6 +11,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" ) var _ = Describe("Controllerworkqueue", func() { @@ -438,6 +439,72 @@ var _ = Describe("Controllerworkqueue", func() { Expect(metrics.depth["test"]).To(Equal(0)) metrics.mu.Unlock() }) + + It("When adding items with rateLimit, previous items' rateLimit should not affect subsequent items", func() { + q, metrics := newQueue() + defer q.ShutDown() + + now := time.Now().Round(time.Second) + nowLock := sync.Mutex{} + tick := make(chan time.Time) + + cwq := q.(*priorityqueue[string]) + cwq.rateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second) + cwq.now = func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } + cwq.tick = func(d time.Duration) <-chan time.Time { + done := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(done) + + Expect(d).To(Or(Equal(5*time.Millisecond), Equal(635*time.Millisecond))) + }() + <-done + return tick + } + + retrievedItem := make(chan struct{}) + retrievedSecondItem := make(chan struct{}) + + go func() { + defer GinkgoRecover() + first, _, _ := q.GetWithPriority() + Expect(first).To(Equal("foo")) + close(retrievedItem) + + second, _, _ := q.GetWithPriority() + Expect(second).To(Equal("bar")) + close(retrievedSecondItem) + }() + + // after 7 calls, the next When("bar") call will return 640ms. + for range 7 { + cwq.rateLimiter.When("bar") + } + q.AddWithOpts(AddOpts{RateLimited: true}, "foo", "bar") + + Consistently(retrievedItem).ShouldNot(BeClosed()) + nowLock.Lock() + now = now.Add(5 * time.Millisecond) + nowLock.Unlock() + tick <- now + Eventually(retrievedItem).Should(BeClosed()) + + Consistently(retrievedSecondItem).ShouldNot(BeClosed()) + nowLock.Lock() + now = now.Add(635 * time.Millisecond) + nowLock.Unlock() + tick <- now + Eventually(retrievedSecondItem).Should(BeClosed()) + + Expect(metrics.depth["test"]).To(Equal(0)) + Expect(metrics.adds["test"]).To(Equal(2)) + Expect(metrics.retries["test"]).To(Equal(2)) + }) }) func BenchmarkAddGetDone(b *testing.B) {