Skip to content

Commit

Permalink
use of sync.Map instead of normal golang maps to avoid any concurrent…
Browse files Browse the repository at this point in the history
… modification exception at scale
  • Loading branch information
anuraj381 committed Dec 4, 2024
1 parent a86db14 commit 3856a59
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
4 changes: 2 additions & 2 deletions processor/logratelimitprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ are one or many services which are logging too many logs such that it increases
the log noise/surge from any service/namespace/pod etc. you can use the rate-limit processor, you can give the fields on which you want to limit logs
and add config for allowed rate and interval.<br>
The processor caches the count of logs in the given interval for each combination of given rate_limit_fields and once logs count starts to exceed the count
the processor will start dropping the logs till the interval finish in the best effort way. There are no mutex/locks involved, only one atomic counter is used
to keep rate-limiter lightweight / easy on resources.
the processor will start dropping the logs till the interval finish in the best effort way. There are no mutex/locks involved, sync.Map and atomic counter is
used to keep rate-limiter lightweight / easy on resources.

## Configuration
| Field | Type | Default | Description |
Expand Down
28 changes: 13 additions & 15 deletions processor/logratelimitprocessor/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,26 @@ package logratelimitprocessor // import "github.com/open-telemetry/opentelemetry

import (
"go.uber.org/zap"
"sync"
"sync/atomic"
"time"
)

// RateLimiter is a fixedWindow algorithm based rate-limiter
type RateLimiter struct {
threshold uint64
windowStartTimeMap map[uint64]time.Time
windowStartTimeMap sync.Map
windowSize time.Duration
counterMap map[uint64]*atomic.Uint64
counterMap sync.Map
logger *zap.Logger
}

// NewRateLimiter create a new rate-limiter initialization
func NewRateLimiter(threshold uint64, windowSize time.Duration, lggr *zap.Logger) *RateLimiter {
return &RateLimiter{
threshold: threshold,
windowStartTimeMap: make(map[uint64]time.Time),
windowSize: windowSize,
counterMap: make(map[uint64]*atomic.Uint64),
logger: lggr,
threshold: threshold,
windowSize: windowSize,
logger: lggr,
}
}

Expand All @@ -35,19 +34,18 @@ func NewRateLimiter(threshold uint64, windowSize time.Duration, lggr *zap.Logger
// is configured in AllowedRate
func (fw *RateLimiter) IsRequestAllowed(key uint64) bool {
now := time.Now()
cntr, ok := fw.counterMap[key]
if !ok {
fw.counterMap[key] = new(atomic.Uint64)
cntr = fw.counterMap[key]
fw.windowStartTimeMap[key] = now
}
cntrI, _ := fw.counterMap.LoadOrStore(key, new(atomic.Uint64))
cntr := cntrI.(*atomic.Uint64)
currWinI, _ := fw.windowStartTimeMap.LoadOrStore(key, now)
currWin := currWinI.(time.Time)

// cases where now.Sub(fw.windowStartTime) is negative or positive should be handled
if now.Sub(fw.windowStartTimeMap[key]) > fw.windowSize {
if now.Sub(currWin) > fw.windowSize {
cntr.Store(0)
// updating this without lock considering an assumption that all goroutines which will try to update this
// fw.windowStartTime in race-condition(within this if block) will have a very small time-difference (in nanoseconds),
// we can tolerate that time difference and accept any of the update from concurrent requests.
fw.windowStartTimeMap[key] = now
fw.windowStartTimeMap.Store(key, now)
}
return cntr.Add(1) <= fw.threshold
}

0 comments on commit 3856a59

Please sign in to comment.