From 38377fec290b67716f40123cea59056d0d7b8efb Mon Sep 17 00:00:00 2001 From: Joao Morais Date: Sun, 3 Nov 2019 10:45:13 -0300 Subject: [PATCH] add rate limit to the work queue Rate limit is a mandatory argument on `NewQueue()`, if greater than 0 will configure a rate limiting control to call sync. Every single call to `Add()` is rate limited before calling `sync()`. Use `Notify()` instead if a single/global rate limit should be used instead one per added item. --- pkg/utils/queue.go | 29 ++++++++++++++++----- pkg/utils/queue_test.go | 57 ++++++++++++++++++++++++++++++++++++----- 2 files changed, 73 insertions(+), 13 deletions(-) diff --git a/pkg/utils/queue.go b/pkg/utils/queue.go index 223a20dd9..c4c6f42aa 100644 --- a/pkg/utils/queue.go +++ b/pkg/utils/queue.go @@ -17,28 +17,36 @@ limitations under the License. package utils import ( + "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/workqueue" ) // Queue ... type Queue interface { Add(item interface{}) + Notify() Run() ShuttingDown() bool ShutDown() } type queue struct { - workqueue *workqueue.Type - running chan struct{} - sync func(item interface{}) + workqueue *workqueue.Type + rateLimiter flowcontrol.RateLimiter + running chan struct{} + sync func(item interface{}) } // NewQueue ... -func NewQueue(sync func(item interface{})) Queue { +func NewQueue(rate float32, sync func(item interface{})) Queue { + var rateLimiter flowcontrol.RateLimiter + if rate > 0 { + rateLimiter = flowcontrol.NewTokenBucketRateLimiter(rate, 1) + } return &queue{ - workqueue: workqueue.New(), - sync: sync, + workqueue: workqueue.New(), + rateLimiter: rateLimiter, + sync: sync, } } @@ -46,6 +54,12 @@ func (q *queue) Add(item interface{}) { q.workqueue.Add(item) } +func (q *queue) Notify() { + // When using with rateLimiter, `nil` will be deduplicated + // and `queue.Get()` will release call to `sync()` just once + q.workqueue.Add(nil) +} + func (q *queue) Run() { if q.running != nil { // queue already running @@ -53,6 +67,9 @@ func (q *queue) Run() { } q.running = make(chan struct{}) for { + if q.rateLimiter != nil { + q.rateLimiter.Accept() + } item, shutdown := q.workqueue.Get() if shutdown { close(q.running) diff --git a/pkg/utils/queue_test.go b/pkg/utils/queue_test.go index 0e0e80538..37eaff826 100644 --- a/pkg/utils/queue_test.go +++ b/pkg/utils/queue_test.go @@ -17,6 +17,7 @@ limitations under the License. package utils import ( + "fmt" "reflect" "testing" "time" @@ -27,12 +28,12 @@ type task struct { } func TestQueueNotRunning(t *testing.T) { - q := NewQueue(nil) + q := NewQueue(0, nil) q.ShutDown() } func TestQueueAlreadyRunning(t *testing.T) { - q := NewQueue(nil) + q := NewQueue(0, nil) go q.Run() time.Sleep(100 * time.Millisecond) q.Run() // test fail if this call blocks, the test will timeout @@ -40,7 +41,7 @@ func TestQueueAlreadyRunning(t *testing.T) { } func TestQueueShutdown(t *testing.T) { - q := NewQueue(func(item interface{}) { time.Sleep(200 * time.Millisecond) }) + q := NewQueue(0, func(item interface{}) { time.Sleep(200 * time.Millisecond) }) stopped := false go func() { q.Run() @@ -55,8 +56,8 @@ func TestQueueShutdown(t *testing.T) { } func TestQueueRun(t *testing.T) { - items := []string{} - q := NewQueue(func(item interface{}) { + var items []string + q := NewQueue(0, func(item interface{}) { items = append(items, item.(string)+"-1") time.Sleep(250 * time.Millisecond) items = append(items, item.(string)+"-2") @@ -77,8 +78,10 @@ func TestQueueRun(t *testing.T) { } func TestDeduplicate(t *testing.T) { - items := []interface{}{} - q := NewQueue(func(item interface{}) { items = append(items, item) }) + var items []interface{} + q := NewQueue(0, func(item interface{}) { + items = append(items, item) + }) go q.Run() q.Add(nil) q.Add(nil) @@ -99,3 +102,43 @@ func TestDeduplicate(t *testing.T) { t.Errorf("items differ, expected: %+v; actual: %+v", expected, items) } } + +func TestRate(t *testing.T) { + var items []string + q := NewQueue(2, func(item interface{}) { + items = append(items, fmt.Sprintf("%d=%s", item, time.Now().Format("15:04:05.000"))) + }) + go q.Run() + start := time.Now() + for i := 0; i < 4; i++ { + q.Add(i + 1) + } + time.Sleep(200 * time.Millisecond) + q.ShutDown() + duration := time.Now().Sub(start) + if len(items) != 4 { + t.Errorf("expected 4 items but sync was called %d time(s)", len(items)) + } + if duration.Seconds() < 1 { + t.Errorf("expected time higher than 1s but was %s - timestamps: %v", duration.String(), items) + } +} + +func TestNotify(t *testing.T) { + var items []interface{} + q := NewQueue(10, func(item interface{}) { + items = append(items, item) + }) + go q.Run() + for i := 0; i < 10; i++ { + q.Notify() + } + time.Sleep(200 * time.Millisecond) + for i := 0; i < 10; i++ { + q.Notify() + } + q.ShutDown() + if len(items) != 2 { + t.Errorf("expected 2 items but sync was called %d time(s)", len(items)) + } +}