diff --git a/processor/logratelimitprocessor/README.md b/processor/logratelimitprocessor/README.md index c4c8fcb30333..b3c41f1670f5 100644 --- a/processor/logratelimitprocessor/README.md +++ b/processor/logratelimitprocessor/README.md @@ -19,8 +19,8 @@ are one or many services which are logging too many logs such that it increases the log noise/surge from any service/namespace/pod etc. you can use the rate-limit processor, you can give the fields on which you want to limit logs and add config for allowed rate and interval.
The processor caches the count of logs in the given interval for each combination of given rate_limit_fields and once logs count starts to exceed the count -the processor will start dropping the logs till the interval finish in the best effort way. There are no mutex/locks involved, only one atomic counter is used -to keep rate-limiter lightweight / easy on resources. +the processor will start dropping the logs till the interval finish in the best effort way. There are no mutex/locks involved, sync.Map and atomic counter is +used to keep rate-limiter lightweight / easy on resources. ## Configuration | Field | Type | Default | Description | diff --git a/processor/logratelimitprocessor/rate_limiter.go b/processor/logratelimitprocessor/rate_limiter.go index fc13e79ea3af..83d0f112f60d 100644 --- a/processor/logratelimitprocessor/rate_limiter.go +++ b/processor/logratelimitprocessor/rate_limiter.go @@ -5,6 +5,7 @@ package logratelimitprocessor // import "github.com/open-telemetry/opentelemetry import ( "go.uber.org/zap" + "sync" "sync/atomic" "time" ) @@ -12,20 +13,18 @@ import ( // RateLimiter is a fixedWindow algorithm based rate-limiter type RateLimiter struct { threshold uint64 - windowStartTimeMap map[uint64]time.Time + windowStartTimeMap sync.Map windowSize time.Duration - counterMap map[uint64]*atomic.Uint64 + counterMap sync.Map logger *zap.Logger } // NewRateLimiter create a new rate-limiter initialization func NewRateLimiter(threshold uint64, windowSize time.Duration, lggr *zap.Logger) *RateLimiter { return &RateLimiter{ - threshold: threshold, - windowStartTimeMap: make(map[uint64]time.Time), - windowSize: windowSize, - counterMap: make(map[uint64]*atomic.Uint64), - logger: lggr, + threshold: threshold, + windowSize: windowSize, + logger: lggr, } } @@ -35,19 +34,18 @@ func NewRateLimiter(threshold uint64, windowSize time.Duration, lggr *zap.Logger // is configured in AllowedRate func (fw *RateLimiter) IsRequestAllowed(key uint64) bool { now := time.Now() - cntr, ok := fw.counterMap[key] - if !ok { - fw.counterMap[key] = new(atomic.Uint64) - cntr = fw.counterMap[key] - fw.windowStartTimeMap[key] = now - } + cntrI, _ := fw.counterMap.LoadOrStore(key, new(atomic.Uint64)) + cntr := cntrI.(*atomic.Uint64) + currWinI, _ := fw.windowStartTimeMap.LoadOrStore(key, now) + currWin := currWinI.(time.Time) + // cases where now.Sub(fw.windowStartTime) is negative or positive should be handled - if now.Sub(fw.windowStartTimeMap[key]) > fw.windowSize { + if now.Sub(currWin) > fw.windowSize { cntr.Store(0) // updating this without lock considering an assumption that all goroutines which will try to update this // fw.windowStartTime in race-condition(within this if block) will have a very small time-difference (in nanoseconds), // we can tolerate that time difference and accept any of the update from concurrent requests. - fw.windowStartTimeMap[key] = now + fw.windowStartTimeMap.Store(key, now) } return cntr.Add(1) <= fw.threshold }