Skip to content

Commit

Permalink
add rate limit to the work queue
Browse files Browse the repository at this point in the history
Rate limit is a mandatory argument on `NewQueue()`, if greater than 0 will configure a rate limiting control to call sync.

Every single call to `Add()` is rate limited before calling `sync()`. Use `Notify()` instead if a single/global rate limit should be used instead one per added item.
  • Loading branch information
jcmoraisjr committed Nov 3, 2019
1 parent 1b30c79 commit 38377fe
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 13 deletions.
29 changes: 23 additions & 6 deletions pkg/utils/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,59 @@ limitations under the License.
package utils

import (
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
)

// Queue ...
type Queue interface {
Add(item interface{})
Notify()
Run()
ShuttingDown() bool
ShutDown()
}

type queue struct {
workqueue *workqueue.Type
running chan struct{}
sync func(item interface{})
workqueue *workqueue.Type
rateLimiter flowcontrol.RateLimiter
running chan struct{}
sync func(item interface{})
}

// NewQueue ...
func NewQueue(sync func(item interface{})) Queue {
func NewQueue(rate float32, sync func(item interface{})) Queue {
var rateLimiter flowcontrol.RateLimiter
if rate > 0 {
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(rate, 1)
}
return &queue{
workqueue: workqueue.New(),
sync: sync,
workqueue: workqueue.New(),
rateLimiter: rateLimiter,
sync: sync,
}
}

func (q *queue) Add(item interface{}) {
q.workqueue.Add(item)
}

func (q *queue) Notify() {
// When using with rateLimiter, `nil` will be deduplicated
// and `queue.Get()` will release call to `sync()` just once
q.workqueue.Add(nil)
}

func (q *queue) Run() {
if q.running != nil {
// queue already running
return
}
q.running = make(chan struct{})
for {
if q.rateLimiter != nil {
q.rateLimiter.Accept()
}
item, shutdown := q.workqueue.Get()
if shutdown {
close(q.running)
Expand Down
57 changes: 50 additions & 7 deletions pkg/utils/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package utils

import (
"fmt"
"reflect"
"testing"
"time"
Expand All @@ -27,20 +28,20 @@ type task struct {
}

func TestQueueNotRunning(t *testing.T) {
q := NewQueue(nil)
q := NewQueue(0, nil)
q.ShutDown()
}

func TestQueueAlreadyRunning(t *testing.T) {
q := NewQueue(nil)
q := NewQueue(0, nil)
go q.Run()
time.Sleep(100 * time.Millisecond)
q.Run() // test fail if this call blocks, the test will timeout
q.ShutDown()
}

func TestQueueShutdown(t *testing.T) {
q := NewQueue(func(item interface{}) { time.Sleep(200 * time.Millisecond) })
q := NewQueue(0, func(item interface{}) { time.Sleep(200 * time.Millisecond) })
stopped := false
go func() {
q.Run()
Expand All @@ -55,8 +56,8 @@ func TestQueueShutdown(t *testing.T) {
}

func TestQueueRun(t *testing.T) {
items := []string{}
q := NewQueue(func(item interface{}) {
var items []string
q := NewQueue(0, func(item interface{}) {
items = append(items, item.(string)+"-1")
time.Sleep(250 * time.Millisecond)
items = append(items, item.(string)+"-2")
Expand All @@ -77,8 +78,10 @@ func TestQueueRun(t *testing.T) {
}

func TestDeduplicate(t *testing.T) {
items := []interface{}{}
q := NewQueue(func(item interface{}) { items = append(items, item) })
var items []interface{}
q := NewQueue(0, func(item interface{}) {
items = append(items, item)
})
go q.Run()
q.Add(nil)
q.Add(nil)
Expand All @@ -99,3 +102,43 @@ func TestDeduplicate(t *testing.T) {
t.Errorf("items differ, expected: %+v; actual: %+v", expected, items)
}
}

func TestRate(t *testing.T) {
var items []string
q := NewQueue(2, func(item interface{}) {
items = append(items, fmt.Sprintf("%d=%s", item, time.Now().Format("15:04:05.000")))
})
go q.Run()
start := time.Now()
for i := 0; i < 4; i++ {
q.Add(i + 1)
}
time.Sleep(200 * time.Millisecond)
q.ShutDown()
duration := time.Now().Sub(start)
if len(items) != 4 {
t.Errorf("expected 4 items but sync was called %d time(s)", len(items))
}
if duration.Seconds() < 1 {
t.Errorf("expected time higher than 1s but was %s - timestamps: %v", duration.String(), items)
}
}

func TestNotify(t *testing.T) {
var items []interface{}
q := NewQueue(10, func(item interface{}) {
items = append(items, item)
})
go q.Run()
for i := 0; i < 10; i++ {
q.Notify()
}
time.Sleep(200 * time.Millisecond)
for i := 0; i < 10; i++ {
q.Notify()
}
q.ShutDown()
if len(items) != 2 {
t.Errorf("expected 2 items but sync was called %d time(s)", len(items))
}
}

0 comments on commit 38377fe

Please sign in to comment.