diff --git a/tools/record/event.go b/tools/record/event.go index 85690d6fad..55d86ee1da 100644 --- a/tools/record/event.go +++ b/tools/record/event.go @@ -81,7 +81,7 @@ type CorrelatorOptions struct { MaxIntervalInSeconds int // The clock used by the EventAggregator to allow for testing // If not specified (zero value), clock.RealClock{} will be used - Clock clock.Clock + Clock clock.PassiveClock // The func used by EventFilterFunc, which returns a key for given event, based on which filtering will take place // If not specified (zero value), getSpamKey will be used SpamKeyFunc EventSpamKeyFunc @@ -323,7 +323,7 @@ type recorderImpl struct { scheme *runtime.Scheme source v1.EventSource *watch.Broadcaster - clock clock.Clock + clock clock.PassiveClock } func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, eventtype, reason, message string) { diff --git a/tools/record/event_test.go b/tools/record/event_test.go index 60182bb3bc..78dd3d39b2 100644 --- a/tools/record/event_test.go +++ b/tools/record/event_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" ref "k8s.io/client-go/tools/reference" + testclocks "k8s.io/utils/clock/testing" ) type testEventSink struct { @@ -438,7 +439,7 @@ func TestWriteEventError(t *testing.T) { }, } - clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second} + clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second} eventCorrelator := NewEventCorrelator(&clock) for caseName, ent := range table { @@ -461,7 +462,7 @@ func TestWriteEventError(t *testing.T) { } func TestUpdateExpiredEvent(t *testing.T) { - clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second} + clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: time.Second} eventCorrelator := NewEventCorrelator(&clock) var createdEvent *v1.Event diff --git a/tools/record/events_cache.go b/tools/record/events_cache.go index 50bb9e061f..d818f923f5 100644 --- a/tools/record/events_cache.go +++ b/tools/record/events_cache.go @@ -102,14 +102,14 @@ type EventSourceObjectSpamFilter struct { qps float32 // clock is used to allow for testing over a time interval - clock clock.Clock + clock clock.PassiveClock // spamKeyFunc is a func used to create a key based on an event, which is later used to filter spam events. spamKeyFunc EventSpamKeyFunc } // NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill. -func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter { +func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.PassiveClock, spamKeyFunc EventSpamKeyFunc) *EventSourceObjectSpamFilter { return &EventSourceObjectSpamFilter{ cache: lru.New(lruCacheSize), burst: burst, @@ -122,7 +122,7 @@ func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock // spamRecord holds data used to perform spam filtering decisions. type spamRecord struct { // rateLimiter controls the rate of events about this object - rateLimiter flowcontrol.RateLimiter + rateLimiter flowcontrol.PassiveRateLimiter } // Filter controls that a given source+object are not exceeding the allowed rate. @@ -142,7 +142,7 @@ func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool { // verify we have a rate limiter for this record if record.rateLimiter == nil { - record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock) + record.rateLimiter = flowcontrol.NewTokenBucketPassiveRateLimiterWithClock(f.qps, f.burst, f.clock) } // ensure we have available rate @@ -207,12 +207,12 @@ type EventAggregator struct { maxIntervalInSeconds uint // clock is used to allow for testing over a time interval - clock clock.Clock + clock clock.PassiveClock } // NewEventAggregator returns a new instance of an EventAggregator func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc, - maxEvents int, maxIntervalInSeconds int, clock clock.Clock) *EventAggregator { + maxEvents int, maxIntervalInSeconds int, clock clock.PassiveClock) *EventAggregator { return &EventAggregator{ cache: lru.New(lruCacheSize), keyFunc: keyFunc, @@ -315,11 +315,11 @@ type eventLog struct { type eventLogger struct { sync.RWMutex cache *lru.Cache - clock clock.Clock + clock clock.PassiveClock } // newEventLogger observes events and counts their frequencies -func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger { +func newEventLogger(lruCacheEntries int, clock clock.PassiveClock) *eventLogger { return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock} } @@ -436,7 +436,7 @@ type EventCorrelateResult struct { // times. // * A source may burst 25 events about an object, but has a refill rate budget // per object of 1 event every 5 minutes to control long-tail of spam. -func NewEventCorrelator(clock clock.Clock) *EventCorrelator { +func NewEventCorrelator(clock clock.PassiveClock) *EventCorrelator { cacheSize := maxLruCacheEntries spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock, getSpamKey) return &EventCorrelator{ diff --git a/tools/record/events_cache_test.go b/tools/record/events_cache_test.go index dcb2a7f3c1..2ede35508c 100644 --- a/tools/record/events_cache_test.go +++ b/tools/record/events_cache_test.go @@ -25,8 +25,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/diff" + testclocks "k8s.io/utils/clock/testing" ) func makeObjectReference(kind, name, namespace string) v1.ObjectReference { @@ -234,7 +234,7 @@ func TestEventCorrelator(t *testing.T) { for testScenario, testInput := range scenario { eventInterval := time.Duration(testInput.intervalSeconds) * time.Second - clock := clock.IntervalClock{Time: time.Now(), Duration: eventInterval} + clock := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: eventInterval} correlator := NewEventCorrelator(&clock) for i := range testInput.previousEvents { event := testInput.previousEvents[i] @@ -320,9 +320,9 @@ func TestEventSpamFilter(t *testing.T) { spamKeyFunc: spamKeyFuncBasedOnObjectsAndReason, }, } - for testDescription, testInput := range testCases { - c := clock.IntervalClock{Time: time.Now(), Duration: eventInterval} + for testDescription, testInput := range testCases { + c := testclocks.SimpleIntervalClock{Time: time.Now(), Duration: eventInterval} correlator := NewEventCorrelatorWithOptions(CorrelatorOptions{ Clock: &c, SpamKeyFunc: testInput.spamKeyFunc, diff --git a/util/flowcontrol/throttle.go b/util/flowcontrol/throttle.go index ffd912c560..af7abd898f 100644 --- a/util/flowcontrol/throttle.go +++ b/util/flowcontrol/throttle.go @@ -23,26 +23,36 @@ import ( "time" "golang.org/x/time/rate" + "k8s.io/utils/clock" ) -type RateLimiter interface { +type PassiveRateLimiter interface { // TryAccept returns true if a token is taken immediately. Otherwise, // it returns false. TryAccept() bool - // Accept returns once a token becomes available. - Accept() // Stop stops the rate limiter, subsequent calls to CanAccept will return false Stop() // QPS returns QPS of this rate limiter QPS() float32 +} + +type RateLimiter interface { + PassiveRateLimiter + // Accept returns once a token becomes available. + Accept() // Wait returns nil if a token is taken before the Context is done. Wait(ctx context.Context) error } -type tokenBucketRateLimiter struct { +type tokenBucketPassiveRateLimiter struct { limiter *rate.Limiter - clock Clock qps float32 + clock clock.PassiveClock +} + +type tokenBucketRateLimiter struct { + tokenBucketPassiveRateLimiter + clock Clock } // NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach. @@ -52,58 +62,73 @@ type tokenBucketRateLimiter struct { // The maximum number of tokens in the bucket is capped at 'burst'. func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { limiter := rate.NewLimiter(rate.Limit(qps), burst) - return newTokenBucketRateLimiter(limiter, realClock{}, qps) + return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps) +} + +// NewTokenBucketPassiveRateLimiter is similar to NewTokenBucketRateLimiter except that it returns +// a PassiveRateLimiter which does not have Accept() and Wait() methods. +func NewTokenBucketPassiveRateLimiter(qps float32, burst int) PassiveRateLimiter { + limiter := rate.NewLimiter(rate.Limit(qps), burst) + return newTokenBucketRateLimiterWithPassiveClock(limiter, clock.RealClock{}, qps) } // An injectable, mockable clock interface. type Clock interface { - Now() time.Time + clock.PassiveClock Sleep(time.Duration) } -type realClock struct{} - -func (realClock) Now() time.Time { - return time.Now() -} -func (realClock) Sleep(d time.Duration) { - time.Sleep(d) -} +var _ Clock = (*clock.RealClock)(nil) // NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter // but allows an injectable clock, for testing. func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter { limiter := rate.NewLimiter(rate.Limit(qps), burst) - return newTokenBucketRateLimiter(limiter, c, qps) + return newTokenBucketRateLimiterWithClock(limiter, c, qps) +} + +// NewTokenBucketPassiveRateLimiterWithClock is similar to NewTokenBucketRateLimiterWithClock +// except that it returns a PassiveRateLimiter which does not have Accept() and Wait() methods +// and uses a PassiveClock. +func NewTokenBucketPassiveRateLimiterWithClock(qps float32, burst int, c clock.PassiveClock) PassiveRateLimiter { + limiter := rate.NewLimiter(rate.Limit(qps), burst) + return newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps) } -func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter { +func newTokenBucketRateLimiterWithClock(limiter *rate.Limiter, c Clock, qps float32) *tokenBucketRateLimiter { return &tokenBucketRateLimiter{ + tokenBucketPassiveRateLimiter: *newTokenBucketRateLimiterWithPassiveClock(limiter, c, qps), + clock: c, + } +} + +func newTokenBucketRateLimiterWithPassiveClock(limiter *rate.Limiter, c clock.PassiveClock, qps float32) *tokenBucketPassiveRateLimiter { + return &tokenBucketPassiveRateLimiter{ limiter: limiter, - clock: c, qps: qps, + clock: c, } } -func (t *tokenBucketRateLimiter) TryAccept() bool { - return t.limiter.AllowN(t.clock.Now(), 1) +func (tbprl *tokenBucketPassiveRateLimiter) Stop() { } -// Accept will block until a token becomes available -func (t *tokenBucketRateLimiter) Accept() { - now := t.clock.Now() - t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now)) +func (tbprl *tokenBucketPassiveRateLimiter) QPS() float32 { + return tbprl.qps } -func (t *tokenBucketRateLimiter) Stop() { +func (tbprl *tokenBucketPassiveRateLimiter) TryAccept() bool { + return tbprl.limiter.AllowN(tbprl.clock.Now(), 1) } -func (t *tokenBucketRateLimiter) QPS() float32 { - return t.qps +// Accept will block until a token becomes available +func (tbrl *tokenBucketRateLimiter) Accept() { + now := tbrl.clock.Now() + tbrl.clock.Sleep(tbrl.limiter.ReserveN(now, 1).DelayFrom(now)) } -func (t *tokenBucketRateLimiter) Wait(ctx context.Context) error { - return t.limiter.Wait(ctx) +func (tbrl *tokenBucketRateLimiter) Wait(ctx context.Context) error { + return tbrl.limiter.Wait(ctx) } type fakeAlwaysRateLimiter struct{} @@ -157,3 +182,11 @@ func (t *fakeNeverRateLimiter) QPS() float32 { func (t *fakeNeverRateLimiter) Wait(ctx context.Context) error { return errors.New("can not be accept") } + +var ( + _ RateLimiter = (*tokenBucketRateLimiter)(nil) + _ RateLimiter = (*fakeAlwaysRateLimiter)(nil) + _ RateLimiter = (*fakeNeverRateLimiter)(nil) +) + +var _ PassiveRateLimiter = (*tokenBucketPassiveRateLimiter)(nil) diff --git a/util/flowcontrol/throttle_test.go b/util/flowcontrol/throttle_test.go index 81fea2b499..b567eee1ec 100644 --- a/util/flowcontrol/throttle_test.go +++ b/util/flowcontrol/throttle_test.go @@ -190,6 +190,10 @@ func (fc *fakeClock) Sleep(d time.Duration) { fc.now = fc.now.Add(d) } +func (fc *fakeClock) Since(ts time.Time) time.Duration { + return time.Since(ts) +} + func TestRatePrecisionBug(t *testing.T) { // golang.org/x/time/rate used to have bugs around precision and this // proves that they don't recur (at least in the form we know about). This