Skip to content

Commit

Permalink
Refactor client-go/util/flowcontrol/throttle.go RateLimiter
Browse files Browse the repository at this point in the history
- Introduce PassiveRateLimiter which implements all methods of previous RateLimiter except Accept() and Wait()
- Change RateLimiter interface to extend PassiveRateLimiter by additionally implementing Accept() and Wait()
- Make client-go/tools/record use PassiveRateLimiter

Refactor EventSourceObjectSpamFilter, EventAggregator, EventCorrelator

- EventSourceObjectSpamFilter, EventAggregator, EventCorrelator use clock.PassiveClock now.
	- This won't be a breaking change because even if a clock.Clock is passed, it still implements the clock.PassiveClock interface.
- Extend clock.PassiveClock through Clock.
- Replace pacakge local implementation of realClock with clock.RealClock
- In flowcontrol/throttle.go split tokenBucketRateLimiters to use Clock and clock.PassiveClock.
- Migrate client-go/tools/record tests from using IntervalClock to using SimpleIntervalClock (honest implementation of clock.PassiveClock)

Signed-off-by: Madhav Jivrajani <madhav.jiv@gmail.com>

Kubernetes-commit: ac5c55f0bd853fcf883d9b8e1f5ef728a2fb5309
  • Loading branch information
MadhavJivrajani authored and k8s-publishing-bot committed Aug 25, 2021
1 parent 191e5dc commit b9fa896
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 46 deletions.
4 changes: 2 additions & 2 deletions tools/record/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type CorrelatorOptions struct {
MaxIntervalInSeconds int
// The clock used by the EventAggregator to allow for testing
// If not specified (zero value), clock.RealClock{} will be used
Clock clock.Clock
Clock clock.PassiveClock
// The func used by EventFilterFunc, which returns a key for given event, based on which filtering will take place
// If not specified (zero value), getSpamKey will be used
SpamKeyFunc EventSpamKeyFunc
Expand Down Expand Up @@ -323,7 +323,7 @@ type recorderImpl struct {
scheme *runtime.Scheme
source v1.EventSource
*watch.Broadcaster
clock clock.Clock
clock clock.PassiveClock
}

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) {
Expand Down
5 changes: 3 additions & 2 deletions tools/record/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
ref "k8s.io/client-go/tools/reference"
testclocks "k8s.io/utils/clock/testing"
)

type testEventSink struct {
Expand Down Expand Up @@ -438,7 +439,7 @@ func TestWriteEventError(t *testing.T) {
},
}

clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second}
eventCorrelator := NewEventCorrelator(&clock)

for caseName, ent := range table {
Expand All @@ -461,7 +462,7 @@ func TestWriteEventError(t *testing.T) {
}

func TestUpdateExpiredEvent(t *testing.T) {
clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second}
clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second}
eventCorrelator := NewEventCorrelator(&clock)

var createdEvent *v1.Event
Expand Down
18 changes: 9 additions & 9 deletions tools/record/events_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ type EventSourceObjectSpamFilter struct {
qps float32

// clock is used to allow for testing over a time interval
clock clock.Clock
clock clock.PassiveClock

// spamKeyFunc is a func used to create a key based on an event, which is later used to filter spam events.
spamKeyFunc EventSpamKeyFunc
}

// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill.
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.PassiveClock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter {
return &EventSourceObjectSpamFilter{
cache: lru.New(lruCacheSize),
burst: burst,
Expand All @@ -122,7 +122,7 @@ func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock
// spamRecord holds data used to perform spam filtering decisions.
type spamRecord struct {
// rateLimiter controls the rate of events about this object
rateLimiter flowcontrol.RateLimiter
rateLimiter flowcontrol.PassiveRateLimiter
}

// Filter controls that a given source+object are not exceeding the allowed rate.
Expand All @@ -142,7 +142,7 @@ func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool {

// verify we have a rate limiter for this record
if record.rateLimiter == nil {
record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock)
record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock)
}

// ensure we have available rate
Expand Down Expand Up @@ -207,12 +207,12 @@ type EventAggregator struct {
maxIntervalInSeconds uint

// clock is used to allow for testing over a time interval
clock clock.Clock
clock clock.PassiveClock
}

// NewEventAggregator returns a new instance of an EventAggregator
func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator {
maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock) *EventAggregator {
return &EventAggregator{
cache: lru.New(lruCacheSize),
keyFunc: keyFunc,
Expand Down Expand Up @@ -315,11 +315,11 @@ type eventLog struct {
type eventLogger struct {
sync.RWMutex
cache *lru.Cache
clock clock.Clock
clock clock.PassiveClock
}

// newEventLogger observes events and counts their frequencies
func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger {
func newEventLogger(lruCacheEntries int, clock clock.PassiveClock) *eventLogger {
return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
}

Expand Down Expand Up @@ -436,7 +436,7 @@ type EventCorrelateResult struct {
// times.
// * A source may burst 25 events about an object, but has a refill rate budget
// per object of 1 event every 5 minutes to control long-tail of spam.
func NewEventCorrelator(clock clock.Clock) *EventCorrelator {
func NewEventCorrelator(clock clock.PassiveClock) *EventCorrelator {
cacheSize := maxLruCacheEntries
spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey)
return &EventCorrelator{
Expand Down
8 changes: 4 additions & 4 deletions tools/record/events_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/diff"
testclocks "k8s.io/utils/clock/testing"
)

func makeObjectReference(kind, name, namespace string) v1.ObjectReference {
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestEventCorrelator(t *testing.T) {

for testScenario, testInput := range scenario {
eventInterval := time.Duration(testInput.intervalSeconds) * time.Second
clock := clock.IntervalClock{Time: time.Now(), Duration: eventInterval}
clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: eventInterval}
correlator := NewEventCorrelator(&clock)
for i := range testInput.previousEvents {
event := testInput.previousEvents[i]
Expand Down Expand Up @@ -320,9 +320,9 @@ func TestEventSpamFilter(t *testing.T) {
spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason,
},
}
for testDescription, testInput := range testCases {

c := clock.IntervalClock{Time: time.Now(), Duration: eventInterval}
for testDescription, testInput := range testCases {
c := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: eventInterval}
correlator := NewEventCorrelatorWithOptions(CorrelatorOptions{
Clock: &c,
SpamKeyFunc: testInput.spamKeyFunc,
Expand Down
91 changes: 62 additions & 29 deletions util/flowcontrol/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,36 @@ import (
"time"

"golang.org/x/time/rate"
"k8s.io/utils/clock"
)

type RateLimiter interface {
type PassiveRateLimiter interface {
// TryAccept returns true if a token is taken immediately. Otherwise,
// it returns false.
TryAccept() bool
// Accept returns once a token becomes available.
Accept()
// Stop stops the rate limiter, subsequent calls to CanAccept will return false
Stop()
// QPS returns QPS of this rate limiter
QPS() float32
}

type RateLimiter interface {
PassiveRateLimiter
// Accept returns once a token becomes available.
Accept()
// Wait returns nil if a token is taken before the Context is done.
Wait(ctx context.Context) error
}

type tokenBucketRateLimiter struct {
type tokenBucketPassiveRateLimiter struct {
limiter *rate.Limiter
clock Clock
qps float32
clock clock.PassiveClock
}

type tokenBucketRateLimiter struct {
tokenBucketPassiveRateLimiter
clock Clock
}

// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
Expand All @@ -52,58 +62,73 @@ type tokenBucketRateLimiter struct {
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, realClock{}, qps)
return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps)
}

// NewTokenBucketPassiveRateLimiter is similar to NewTokenBucketRateLimiter except that it returns
// a PassiveRateLimiter which does not have Accept() and Wait() methods.
func NewTokenBucketPassiveRateLimiter(qps float32, burst int) PassiveRateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiterWithPassiveClock(limiter, clock.RealClock{}, qps)
}

// An injectable, mockable clock interface.
type Clock interface {
Now() time.Time
clock.PassiveClock
Sleep(time.Duration)
}

type realClock struct{}

func (realClock) Now() time.Time {
return time.Now()
}
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}
var _ Clock = (*clock.RealClock)(nil)

// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
// but allows an injectable clock, for testing.
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, c, qps)
return newTokenBucketRateLimiterWithClock(limiter, c, qps)
}

// NewTokenBucketPassiveRateLimiterWithClock is similar to NewTokenBucketRateLimiterWithClock
// except that it returns a PassiveRateLimiter which does not have Accept() and Wait() methods
// and uses a PassiveClock.
func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps)
}

func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
func newTokenBucketRateLimiterWithClock(limiter *rate.Limiter, c Clock, qps float32) *tokenBucketRateLimiter {
return &tokenBucketRateLimiter{
tokenBucketPassiveRateLimiter: *newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps),
clock: c,
}
}

func newTokenBucketRateLimiterWithPassiveClock(limiter *rate.Limiter, c clock.PassiveClock, qps float32) *tokenBucketPassiveRateLimiter {
return &tokenBucketPassiveRateLimiter{
limiter: limiter,
clock: c,
qps: qps,
clock: c,
}
}

func (t *tokenBucketRateLimiter) TryAccept() bool {
return t.limiter.AllowN(t.clock.Now(), 1)
func (tbprl *tokenBucketPassiveRateLimiter) Stop() {
}

// Accept will block until a token becomes available
func (t *tokenBucketRateLimiter) Accept() {
now := t.clock.Now()
t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
func (tbprl *tokenBucketPassiveRateLimiter) QPS() float32 {
return tbprl.qps
}

func (t *tokenBucketRateLimiter) Stop() {
func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool {
return tbprl.limiter.AllowN(tbprl.clock.Now(), 1)
}

func (t *tokenBucketRateLimiter) QPS() float32 {
return t.qps
// Accept will block until a token becomes available
func (tbrl *tokenBucketRateLimiter) Accept() {
now := tbrl.clock.Now()
tbrl.clock.Sleep(tbrl.limiter.ReserveN(now, 1).DelayFrom(now))
}

func (t *tokenBucketRateLimiter) Wait(ctx context.Context) error {
return t.limiter.Wait(ctx)
func (tbrl *tokenBucketRateLimiter) Wait(ctx context.Context) error {
return tbrl.limiter.Wait(ctx)
}

type fakeAlwaysRateLimiter struct{}
Expand Down Expand Up @@ -157,3 +182,11 @@ func (t *fakeNeverRateLimiter) QPS() float32 {
func (t *fakeNeverRateLimiter) Wait(ctx context.Context) error {
return errors.New("can not be accept")
}

var (
_ RateLimiter = (*tokenBucketRateLimiter)(nil)
_ RateLimiter = (*fakeAlwaysRateLimiter)(nil)
_ RateLimiter = (*fakeNeverRateLimiter)(nil)
)

var _ PassiveRateLimiter = (*tokenBucketPassiveRateLimiter)(nil)
4 changes: 4 additions & 0 deletions util/flowcontrol/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ func (fc *fakeClock) Sleep(d time.Duration) {
fc.now = fc.now.Add(d)
}

func (fc *fakeClock) Since(ts time.Time) time.Duration {
return time.Since(ts)
}

func TestRatePrecisionBug(t *testing.T) {
// golang.org/x/time/rate used to have bugs around precision and this
// proves that they don't recur (at least in the form we know about). This
Expand Down

0 comments on commit b9fa896

Please sign in to comment.