Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement BACKEND_TYPE=memcache as an alternative k/v store to redis #172

Merged
merged 5 commits into from
Jan 2, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.integration
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Running this docker image runs the integration tests.
FROM golang:1.14

RUN apt-get update -y && apt-get install sudo stunnel4 redis -y && rm -rf /var/lib/apt/lists/*
RUN apt-get update -y && apt-get install sudo stunnel4 redis memcached -y && rm -rf /var/lib/apt/lists/*

WORKDIR /workdir

Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ tests_with_redis: bootstrap_redis_tls tests_unit
redis-server --port 6382 --requirepass password123 &
redis-server --port 6384 --requirepass password123 &
redis-server --port 6385 --requirepass password123 &
memcached -u root --port 6386 -m 64 &
go test -race -tags=integration $(MODULE)/...

.PHONY: docker_tests
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ services:
networks:
- ratelimit-network

memcached:
image: memcached:alpine
expose:
- 11211
ports:
- 11211:11211
networks:
- ratelimit-network

# minimal container that builds the ratelimit service binary and exits.
ratelimit-build:
image: golang:1.14-alpine
Expand Down Expand Up @@ -50,6 +59,7 @@ services:
- REDIS_URL=redis:6379
- RUNTIME_ROOT=/data
- RUNTIME_SUBDIRECTORY=ratelimit
- MEMCACHE_HOST_PORT=memcached:11211

networks:
ratelimit-network:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.14

require (
github.com/alicebob/miniredis/v2 v2.11.4
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354 // indirect
github.com/coocood/freecache v1.1.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 h1:45bxf7AZMw
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.11.4 h1:GsuyeunTx7EllZBU3/6Ji3dhMQZDpC9rLf1luJ+6M5M=
github.com/alicebob/miniredis/v2 v2.11.4/go.mod h1:VL3UDEfAH59bSa7MuHMuFToxkqyHh69s/WUbYlOAuyg=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
Expand Down
300 changes: 300 additions & 0 deletions src/memcached/cache_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
// The memcached limiter uses GetMulti() to check keys in parallel and then does
// increments asynchronously in the backend, since the memcache interface doesn't
// support multi-increment and it seems worthwhile to minimize the number of
// concurrent or sequential RPCs in the critical path.
//
// Another difference from redis is that memcache doesn't create a key implicitly by
// incrementing a missing entry. Instead, when increment fails an explicit "add" needs
// to be called. The process of increment becomes a bit of a dance since we try to
// limit the number of RPCs. First we call increment, then add if the increment
// failed, then increment again if the add failed (which could happen if there was
// a race to call "add").

package memcached

import (
"context"
"math"
"math/rand"
"strconv"
"sync"

"github.com/coocood/freecache"
stats "github.com/lyft/gostats"

"github.com/bradfitz/gomemcache/memcache"

logger "github.com/sirupsen/logrus"

pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3"

"github.com/envoyproxy/ratelimit/src/assert"
"github.com/envoyproxy/ratelimit/src/config"
"github.com/envoyproxy/ratelimit/src/limiter"
"github.com/envoyproxy/ratelimit/src/settings"
)

type rateLimitCache interface {
// Same as in limiter.RateLimitCache
DoLimit(
ctx context.Context,
request *pb.RateLimitRequest,
limits []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus

// Waits for any lingering goroutines that are incrementing memcache values.
// This is used for unit tests, since the memcache increments happen
// asynchronously in the background.
Flush()
}

type rateLimitMemcacheImpl struct {
client Client
timeSource limiter.TimeSource
jitterRand *rand.Rand
expirationJitterMaxSeconds int64
cacheKeyGenerator limiter.CacheKeyGenerator
localCache *freecache.Cache
wg sync.WaitGroup
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could expand field name to waitGroup

}

var _ limiter.RateLimitCache = (*rateLimitMemcacheImpl)(nil)

func max(a uint32, b uint32) uint32 {
if a > b {
return a
}
return b
}

func (this *rateLimitMemcacheImpl) DoLimit(
ctx context.Context,
request *pb.RateLimitRequest,
limits []*config.RateLimit) []*pb.RateLimitResponse_DescriptorStatus {

logger.Debugf("starting cache lookup")

// request.HitsAddend could be 0 (default value) if not specified by the caller in the Ratelimit request.
hitsAddend := max(1, request.HitsAddend)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that bunch of code is duplicated across redis and memcache implementations particularly in this method. Would it be possible to factor that code our into some common method?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note this comment is the same as this one: #193 (comment). It would be great to figure out some of this as I think the two PRs are going to run into very similar issues around duplication so I would take a look at my other comments in that one also.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, maybe we need a separate PR which pulls up duplicated code into single place


// First build a list of all cache keys that we are actually going to hit. generateCacheKey()
// returns an empty string in the key if there is no limit so that we can keep the arrays
// all the same size.
assert.Assert(len(request.Descriptors) == len(limits))
cacheKeys := make([]limiter.CacheKey, len(request.Descriptors))
now := this.timeSource.UnixNow()
for i := 0; i < len(request.Descriptors); i++ {
cacheKeys[i] = this.cacheKeyGenerator.GenerateCacheKey(request.Domain, request.Descriptors[i], limits[i], now)

// Increase statistics for limits hit by their respective requests.
if limits[i] != nil {
limits[i].Stats.TotalHits.Add(uint64(hitsAddend))
}
}

isOverLimitWithLocalCache := make([]bool, len(request.Descriptors))

keysToGet := make([]string, 0, len(request.Descriptors))

for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" {
continue
}

if this.localCache != nil {
// Get returns the value or not found error.
_, err := this.localCache.Get([]byte(cacheKey.Key))
if err == nil {
isOverLimitWithLocalCache[i] = true
logger.Debugf("cache key is over the limit: %s", cacheKey.Key)
continue
}
}

logger.Debugf("looking up cache key: %s", cacheKey.Key)
keysToGet = append(keysToGet, cacheKey.Key)
}

// Now fetch the pipeline.
responseDescriptorStatuses := make([]*pb.RateLimitResponse_DescriptorStatus,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is pipeline term still applicable? (it is heavily used for redis, not aware how much for memcache)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

len(request.Descriptors))

var memcacheValues map[string]*memcache.Item
var err error

if len(keysToGet) > 0 {
memcacheValues, err = this.client.GetMulti(keysToGet)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments to cache_impl.go and README

if err != nil {
logger.Errorf("Error multi-getting memcache keys (%s): %s", keysToGet, err)
}
}

for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Code: pb.RateLimitResponse_OK,
CurrentLimit: nil,
LimitRemaining: 0,
}
continue
}

if isOverLimitWithLocalCache[i] {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Code: pb.RateLimitResponse_OVER_LIMIT,
CurrentLimit: limits[i].Limit,
LimitRemaining: 0,
}
limits[i].Stats.OverLimit.Add(uint64(hitsAddend))
limits[i].Stats.OverLimitWithLocalCache.Add(uint64(hitsAddend))
continue
}

rawMemcacheValue, ok := memcacheValues[cacheKey.Key]
var limitBeforeIncrease uint32
if ok {
decoded, err := strconv.ParseInt(string(rawMemcacheValue.Value), 10, 32)
if err != nil {
logger.Errorf("Unexpected non-numeric value in memcached: %v", rawMemcacheValue)
Copy link
Member

@nezdolik nezdolik Dec 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be continue keyword here? Does it make sense to go further and calculate limits with default initialised uint32?

Copy link
Contributor Author

@dweitzman dweitzman Dec 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this case would still benefit from the calculation. An extreme example would be if hitsAdded was high and the limit was lower.

Or someone might also set a quota of 0 if they wanted to stop traffic entirely, and we'd want to respect that even if memcache didn't return anything

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

} else {
limitBeforeIncrease = uint32(decoded)
}

}

limitAfterIncrease := limitBeforeIncrease + hitsAddend
overLimitThreshold := limits[i].Limit.RequestsPerUnit
// The nearLimitThreshold is the number of requests that can be made before hitting the NearLimitRatio.
// We need to know it in both the OK and OVER_LIMIT scenarios.
nearLimitThreshold := uint32(math.Floor(float64(float32(overLimitThreshold) * config.NearLimitRatio)))

logger.Debugf("cache key: %s current: %d", cacheKey.Key, limitAfterIncrease)
if limitAfterIncrease > overLimitThreshold {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Code: pb.RateLimitResponse_OVER_LIMIT,
CurrentLimit: limits[i].Limit,
LimitRemaining: 0,
}

// Increase over limit statistics. Because we support += behavior for increasing the limit, we need to
// assess if the entire hitsAddend were over the limit. That is, if the limit's value before adding the
// N hits was over the limit, then all the N hits were over limit.
// Otherwise, only the difference between the current limit value and the over limit threshold
// were over limit hits.
if limitBeforeIncrease >= overLimitThreshold {
limits[i].Stats.OverLimit.Add(uint64(hitsAddend))
} else {
limits[i].Stats.OverLimit.Add(uint64(limitAfterIncrease - overLimitThreshold))

// If the limit before increase was below the over limit value, then some of the hits were
// in the near limit range.
limits[i].Stats.NearLimit.Add(uint64(overLimitThreshold - max(nearLimitThreshold, limitBeforeIncrease)))
}
if this.localCache != nil {
// Set the TTL of the local_cache to be the entire duration.
// Since the cache_key gets changed once the time crosses over current time slot, the over-the-limit
// cache keys in local_cache lose effectiveness.
// For example, if we have an hour limit on all mongo connections, the cache key would be
// similar to mongo_1h, mongo_2h, etc. In the hour 1 (0h0m - 0h59m), the cache key is mongo_1h, we start
// to get ratelimited in the 50th minute, the ttl of local_cache will be set as 1 hour(0h50m-1h49m).
// In the time of 1h1m, since the cache key becomes different (mongo_2h), it won't get ratelimited.
err := this.localCache.Set([]byte(cacheKey.Key), []byte{}, int(limiter.UnitToDivider(limits[i].Limit.Unit)))
if err != nil {
logger.Errorf("Failing to set local cache key: %s", cacheKey.Key)
}
}
} else {
responseDescriptorStatuses[i] =
&pb.RateLimitResponse_DescriptorStatus{
Code: pb.RateLimitResponse_OK,
CurrentLimit: limits[i].Limit,
LimitRemaining: overLimitThreshold - limitAfterIncrease,
}

// The limit is OK but we additionally want to know if we are near the limit.
if limitAfterIncrease > nearLimitThreshold {
// Here we also need to assess which portion of the hitsAddend were in the near limit range.
// If all the hits were over the nearLimitThreshold, then all hits are near limit. Otherwise,
// only the difference between the current limit value and the near limit threshold were near
// limit hits.
if limitBeforeIncrease >= nearLimitThreshold {
limits[i].Stats.NearLimit.Add(uint64(hitsAddend))
} else {
limits[i].Stats.NearLimit.Add(uint64(limitAfterIncrease - nearLimitThreshold))
}
}
}
}

this.wg.Add(1)
go this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, uint64(hitsAddend))

return responseDescriptorStatuses
}

func (this *rateLimitMemcacheImpl) increaseAsync(cacheKeys []limiter.CacheKey, isOverLimitWithLocalCache []bool, limits []*config.RateLimit, hitsAddend uint64) {
defer this.wg.Done()
for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" || isOverLimitWithLocalCache[i] {
continue
}

_, err := this.client.Increment(cacheKey.Key, hitsAddend)
if err == memcache.ErrCacheMiss {
expirationSeconds := limiter.UnitToDivider(limits[i].Limit.Unit)
if this.expirationJitterMaxSeconds > 0 {
expirationSeconds += this.jitterRand.Int63n(this.expirationJitterMaxSeconds)
}

// Need to add instead of increment
err = this.client.Add(&memcache.Item{
Key: cacheKey.Key,
Value: []byte(strconv.FormatUint(hitsAddend, 10)),
Expiration: int32(expirationSeconds),
})
if err == memcache.ErrNotStored {
// There was a race condition to do this add. We should be able to increment
// now instead.
_, err := this.client.Increment(cacheKey.Key, hitsAddend)
if err != nil {
logger.Errorf("Failed to increment key %s after failing to add: %s", cacheKey.Key, err)
continue
}
} else if err != nil {
logger.Errorf("Failed to add key %s: %s", cacheKey.Key, err)
continue
}
} else if err != nil {
logger.Errorf("Failed to increment key %s: %s", cacheKey.Key, err)
continue
}
}
}

func (this *rateLimitMemcacheImpl) Flush() {
this.wg.Wait()
}

func NewRateLimitCacheImpl(client Client, timeSource limiter.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache, scope stats.Scope) rateLimitCache {
return &rateLimitMemcacheImpl{
client: client,
timeSource: timeSource,
cacheKeyGenerator: limiter.NewCacheKeyGenerator(),
jitterRand: jitterRand,
expirationJitterMaxSeconds: expirationJitterMaxSeconds,
localCache: localCache,
}
}

func NewRateLimitCacheImplFromSettings(s settings.Settings, timeSource limiter.TimeSource, jitterRand *rand.Rand, localCache *freecache.Cache, scope stats.Scope) rateLimitCache {
return NewRateLimitCacheImpl(
memcache.New(s.MemcacheHostPort),
timeSource,
jitterRand,
s.ExpirationJitterMaxSeconds,
localCache,
scope,
)
}
14 changes: 14 additions & 0 deletions src/memcached/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package memcached

import (
"github.com/bradfitz/gomemcache/memcache"
)

var _ Client = (*memcache.Client)(nil)

// Interface for memcached, used for mocking.
type Client interface {
GetMulti(keys []string) (map[string]*memcache.Item, error)
Increment(key string, delta uint64) (newValue uint64, err error)
Add(item *memcache.Item) error
}
Loading