Skip to content

Commit

Permalink
Address some review comments for memcached support:
Browse files Browse the repository at this point in the history
- Add memcache error unit test
- Rename wg -> waitGroup
- Use explict "redis" for backend type in some tests instead of always assume it's the default
  • Loading branch information
dweitzman committed Dec 24, 2020
1 parent 7e68912 commit 975b5f3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 7 deletions.
10 changes: 5 additions & 5 deletions src/memcached/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type rateLimitMemcacheImpl struct {
expirationJitterMaxSeconds int64
cacheKeyGenerator limiter.CacheKeyGenerator
localCache *freecache.Cache
wg sync.WaitGroup
waitGroup sync.WaitGroup
nearLimitRatio float32
}

Expand Down Expand Up @@ -106,7 +106,7 @@ func (this *rateLimitMemcacheImpl) DoLimit(
keysToGet = append(keysToGet, cacheKey.Key)
}

// Now fetch the pipeline.
// Now fetch from memcache.
responseDescriptorStatuses := make([]*pb.RateLimitResponse_DescriptorStatus,
len(request.Descriptors))

Expand Down Expand Up @@ -223,14 +223,14 @@ func (this *rateLimitMemcacheImpl) DoLimit(
}
}

this.wg.Add(1)
this.waitGroup.Add(1)
go this.increaseAsync(cacheKeys, isOverLimitWithLocalCache, limits, uint64(hitsAddend))

return responseDescriptorStatuses
}

func (this *rateLimitMemcacheImpl) increaseAsync(cacheKeys []limiter.CacheKey, isOverLimitWithLocalCache []bool, limits []*config.RateLimit, hitsAddend uint64) {
defer this.wg.Done()
defer this.waitGroup.Done()
for i, cacheKey := range cacheKeys {
if cacheKey.Key == "" || isOverLimitWithLocalCache[i] {
continue
Expand Down Expand Up @@ -269,7 +269,7 @@ func (this *rateLimitMemcacheImpl) increaseAsync(cacheKeys []limiter.CacheKey, i
}

func (this *rateLimitMemcacheImpl) Flush() {
this.wg.Wait()
this.waitGroup.Wait()
}

func NewRateLimitCacheImpl(client Client, timeSource utils.TimeSource, jitterRand *rand.Rand, expirationJitterMaxSeconds int64, localCache *freecache.Cache, scope stats.Scope, nearLimitRatio float32) limiter.RateLimitCache {
Expand Down
4 changes: 2 additions & 2 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func newDescriptorStatusLegacy(
// stop the server at the end of each test, thus we can reuse the grpc port among these integration tests.
func TestBasicConfig(t *testing.T) {
t.Run("WithoutPerSecondRedis", testBasicConfig("8083", "false", "0", ""))
t.Run("WithPerSecondRedis", testBasicConfig("8085", "true", "0", ""))
t.Run("WithPerSecondRedis", testBasicConfig("8085", "true", "0", "redis"))
t.Run("WithoutPerSecondRedisWithLocalCache", testBasicConfig("18083", "false", "1000", ""))
t.Run("WithPerSecondRedisWithLocalCache", testBasicConfig("18085", "true", "1000", ""))
t.Run("WithPerSecondRedisWithLocalCache", testBasicConfig("18085", "true", "1000", "redis"))
}

func TestBasicTLSConfig(t *testing.T) {
Expand Down
46 changes: 46 additions & 0 deletions test/memcached/cache_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,52 @@ func TestMemcached(t *testing.T) {
cache.Flush()
}

func TestMemcachedGetError(t *testing.T) {
assert := assert.New(t)
controller := gomock.NewController(t)
defer controller.Finish()

timeSource := mock_utils.NewMockTimeSource(controller)
client := mock_memcached.NewMockClient(controller)
statsStore := stats.NewStore(stats.NewNullSink(), false)
cache := memcached.NewRateLimitCacheImpl(client, timeSource, nil, 0, nil, statsStore, 0.8)

timeSource.EXPECT().UnixNow().Return(int64(1234)).MaxTimes(3)
client.EXPECT().GetMulti([]string{"domain_key_value_1234"}).Return(
nil, memcache.ErrNoServers,
)
client.EXPECT().Increment("domain_key_value_1234", uint64(1)).Return(uint64(5), nil)

request := common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value"}}}, 1)
limits := []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key_value", statsStore)}

assert.Equal(
[]*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 9, DurationUntilReset: utils.CalculateReset(limits[0].Limit, timeSource)}},
cache.DoLimit(nil, request, limits))
assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value())
assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value())
assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value())

// No error, but the key is missing
timeSource.EXPECT().UnixNow().Return(int64(1234)).MaxTimes(3)
client.EXPECT().GetMulti([]string{"domain_key_value1_1234"}).Return(
nil, nil,
)
client.EXPECT().Increment("domain_key_value1_1234", uint64(1)).Return(uint64(5), nil)

request = common.NewRateLimitRequest("domain", [][][2]string{{{"key", "value1"}}}, 1)
limits = []*config.RateLimit{config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_SECOND, "key_value1", statsStore)}

assert.Equal(
[]*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: limits[0].Limit, LimitRemaining: 9, DurationUntilReset: utils.CalculateReset(limits[0].Limit, timeSource)}},
cache.DoLimit(nil, request, limits))
assert.Equal(uint64(1), limits[0].Stats.TotalHits.Value())
assert.Equal(uint64(0), limits[0].Stats.OverLimit.Value())
assert.Equal(uint64(0), limits[0].Stats.NearLimit.Value())

cache.Flush()
}

func testLocalCacheStats(localCacheStats stats.StatGenerator, statsStore stats.Store, sink *common.TestStatSink,
expectedHitCount int, expectedMissCount int, expectedLookUpCount int, expectedExpiredCount int,
expectedEntryCount int) func(*testing.T) {
Expand Down

0 comments on commit 975b5f3

Please sign in to comment.