diff --git a/README.md b/README.md index adba6164..97fd586b 100644 --- a/README.md +++ b/README.md @@ -625,6 +625,14 @@ descriptors will fail. Descriptors sent to Memcache should not contain whitespac When using multiple memcache nodes in `MEMCACHE_HOST_PORT=`, one should provide the identical list of memcache nodes to all ratelimiter instances to ensure that a particular cache key is always hashed to the same memcache node. +# Custom headers +Ratelimit service can be configured to return custom headers with the ratelimit information. + +Setting _all_ the following environment variables to the header name to use: +1. `LIMIT_LIMIT_HEADER` - The value will be the current limit value closest to being triggered, currently the optional quota policies are not added +1. `LIMIT_REMAINING_HEADER` - The value will be the remaining quota +1. `LIMIT_RESET_HEADER` - The value will be the number of seconds until the limit is being reset + # Contact * [envoy-announce](https://groups.google.com/forum/#!forum/envoy-announce): Low frequency mailing diff --git a/src/service/ratelimit.go b/src/service/ratelimit.go index b8d1c0bd..1ff5a13c 100644 --- a/src/service/ratelimit.go +++ b/src/service/ratelimit.go @@ -2,11 +2,16 @@ package ratelimit import ( "fmt" - "github.com/envoyproxy/ratelimit/src/stats" "math" + "strconv" "strings" "sync" + "time" + + "github.com/envoyproxy/ratelimit/src/settings" + "github.com/envoyproxy/ratelimit/src/stats" + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" "github.com/envoyproxy/ratelimit/src/assert" "github.com/envoyproxy/ratelimit/src/config" @@ -22,15 +27,29 @@ type RateLimitServiceServer interface { GetCurrentConfig() config.RateLimitConfig } +type Clock interface { + Now() time.Time +} + +// StdClock returns system time. +type StdClock struct{} + +func (c StdClock) Now() time.Time { return time.Now() } + type service struct { - runtime loader.IFace - configLock sync.RWMutex - configLoader config.RateLimitConfigLoader - config config.RateLimitConfig - runtimeUpdateEvent chan int - cache limiter.RateLimitCache - stats stats.ServiceStats - runtimeWatchRoot bool + runtime loader.IFace + configLock sync.RWMutex + configLoader config.RateLimitConfigLoader + config config.RateLimitConfig + runtimeUpdateEvent chan int + cache limiter.RateLimitCache + stats stats.ServiceStats + runtimeWatchRoot bool + customHeadersEnabled bool + customHeaderLimitHeader string + customHeaderRemainingHeader string + customHeaderResetHeader string + customHeaderClock Clock } func (this *service) reloadConfig(statsManager stats.Manager) { @@ -61,6 +80,20 @@ func (this *service) reloadConfig(statsManager stats.Manager) { this.configLock.Lock() this.config = newConfig this.configLock.Unlock() + + rlSettings := settings.NewSettings() + + if len(rlSettings.HeaderRatelimitLimit) > 0 && + len(rlSettings.HeaderRatelimitReset) > 0 && + len(rlSettings.HeaderRatelimitRemaining) > 0 { + this.customHeadersEnabled = true + + this.customHeaderLimitHeader = rlSettings.HeaderRatelimitLimit + + this.customHeaderRemainingHeader = rlSettings.HeaderRatelimitRemaining + + this.customHeaderResetHeader = rlSettings.HeaderRatelimitReset + } } type serviceError string @@ -118,6 +151,8 @@ func (this *service) constructLimitsToCheck(request *pb.RateLimitRequest, ctx co return limitsToCheck, isUnlimited } +const MaxUint32 = uint32(1<<32 - 1) + func (this *service) shouldRateLimitWorker( ctx context.Context, request *pb.RateLimitRequest) *pb.RateLimitResponse { @@ -132,7 +167,19 @@ func (this *service) shouldRateLimitWorker( response := &pb.RateLimitResponse{} response.Statuses = make([]*pb.RateLimitResponse_DescriptorStatus, len(request.Descriptors)) finalCode := pb.RateLimitResponse_OK + + // Keep track of the descriptor which is closes to hit the ratelimit + minLimitRemaining := MaxUint32 + var minimumDescriptor *pb.RateLimitResponse_DescriptorStatus = nil + for i, descriptorStatus := range responseDescriptorStatuses { + // Keep track of the descriptor closest to reset if we have a CurrentLimit + if descriptorStatus.CurrentLimit != nil && + descriptorStatus.LimitRemaining < minLimitRemaining { + minimumDescriptor = descriptorStatus + minLimitRemaining = descriptorStatus.LimitRemaining + } + if isUnlimited[i] { response.Statuses[i] = &pb.RateLimitResponse_DescriptorStatus{ Code: pb.RateLimitResponse_OK, @@ -142,14 +189,71 @@ func (this *service) shouldRateLimitWorker( response.Statuses[i] = descriptorStatus if descriptorStatus.Code == pb.RateLimitResponse_OVER_LIMIT { finalCode = descriptorStatus.Code + + minimumDescriptor = descriptorStatus + minLimitRemaining = 0 } } } + // Add Headers if requested + if this.customHeadersEnabled && minimumDescriptor != nil { + response.ResponseHeadersToAdd = []*core.HeaderValue{ + this.rateLimitLimitHeader(minimumDescriptor), + this.rateLimitRemainingHeader(minimumDescriptor), + this.rateLimitResetHeader(minimumDescriptor, this.customHeaderClock.Now().Unix()), + } + } + response.OverallCode = finalCode return response } +func (this *service) rateLimitLimitHeader(descriptor *pb.RateLimitResponse_DescriptorStatus) *core.HeaderValue { + + return &core.HeaderValue{ + Key: this.customHeaderLimitHeader, + Value: strconv.FormatUint(uint64(descriptor.CurrentLimit.RequestsPerUnit), 10), + } +} + +func (this *service) rateLimitRemainingHeader(descriptor *pb.RateLimitResponse_DescriptorStatus) *core.HeaderValue { + + return &core.HeaderValue{ + Key: this.customHeaderRemainingHeader, + Value: strconv.FormatUint(uint64(descriptor.LimitRemaining), 10), + } +} + +func (this *service) rateLimitResetHeader( + descriptor *pb.RateLimitResponse_DescriptorStatus, now int64) *core.HeaderValue { + + return &core.HeaderValue{ + Key: this.customHeaderResetHeader, + Value: strconv.FormatInt(calculateReset(descriptor, now), 10), + } +} + +func calculateReset(descriptor *pb.RateLimitResponse_DescriptorStatus, now int64) int64 { + sec := unitInSeconds(descriptor.CurrentLimit.Unit) + return sec - now%sec +} + +func unitInSeconds(unit pb.RateLimitResponse_RateLimit_Unit) int64 { + switch unit { + case pb.RateLimitResponse_RateLimit_SECOND: + return 1 + case pb.RateLimitResponse_RateLimit_MINUTE: + return 60 + case pb.RateLimitResponse_RateLimit_HOUR: + return 60 * 60 + case pb.RateLimitResponse_RateLimit_DAY: + return 60 * 60 * 24 + default: + panic("unknown rate limit unit") + } +} + func (this *service) ShouldRateLimit( ctx context.Context, request *pb.RateLimitRequest) (finalResponse *pb.RateLimitResponse, finalError error) { @@ -190,7 +294,7 @@ func (this *service) GetCurrentConfig() config.RateLimitConfig { } func NewService(runtime loader.IFace, cache limiter.RateLimitCache, - configLoader config.RateLimitConfigLoader, statsManager stats.Manager, runtimeWatchRoot bool) RateLimitServiceServer { + configLoader config.RateLimitConfigLoader, statsManager stats.Manager, runtimeWatchRoot bool, clock Clock) RateLimitServiceServer { newService := &service{ runtime: runtime, @@ -201,6 +305,7 @@ func NewService(runtime loader.IFace, cache limiter.RateLimitCache, cache: cache, stats: statsManager.NewServiceStats(), runtimeWatchRoot: runtimeWatchRoot, + customHeaderClock: clock, } runtime.AddUpdateCallback(newService.runtimeUpdateEvent) diff --git a/src/service_cmd/runner/runner.go b/src/service_cmd/runner/runner.go index 31865045..ae75bf17 100644 --- a/src/service_cmd/runner/runner.go +++ b/src/service_cmd/runner/runner.go @@ -1,8 +1,6 @@ package runner import ( - "github.com/envoyproxy/ratelimit/src/metrics" - "github.com/envoyproxy/ratelimit/src/stats" "io" "math/rand" "net/http" @@ -10,6 +8,9 @@ import ( "sync" "time" + "github.com/envoyproxy/ratelimit/src/metrics" + "github.com/envoyproxy/ratelimit/src/stats" + gostats "github.com/lyft/gostats" "github.com/coocood/freecache" @@ -107,6 +108,7 @@ func (runner *Runner) Run() { config.NewRateLimitConfigLoaderImpl(), runner.statsManager, s.RuntimeWatchRoot, + ratelimit.StdClock{}, ) srv.AddDebugHttpEndpoint( diff --git a/src/settings/settings.go b/src/settings/settings.go index 7cd232c9..f576c005 100644 --- a/src/settings/settings.go +++ b/src/settings/settings.go @@ -49,6 +49,14 @@ type Settings struct { CacheKeyPrefix string `envconfig:"CACHE_KEY_PREFIX" default:""` BackendType string `envconfig:"BACKEND_TYPE" default:"redis"` + // Settings for optional returning of custom headers + // value: the current limit + HeaderRatelimitLimit string `envconfig:"LIMIT_LIMIT_HEADER" default:""` + // value: remaining count + HeaderRatelimitRemaining string `envconfig:"LIMIT_REMAINING_HEADER" default:""` + // value: remaining seconds + HeaderRatelimitReset string `envconfig:"LIMIT_RESET_HEADER" default:""` + // Redis settings RedisSocketType string `envconfig:"REDIS_SOCKET_TYPE" default:"unix"` RedisType string `envconfig:"REDIS_TYPE" default:"SINGLE"` diff --git a/test/service/ratelimit_test.go b/test/service/ratelimit_test.go index 64c44ba1..e958ce18 100644 --- a/test/service/ratelimit_test.go +++ b/test/service/ratelimit_test.go @@ -1,11 +1,15 @@ package ratelimit_test import ( - "github.com/envoyproxy/ratelimit/src/stats" "math" + "os" "sync" "testing" + "time" + + "github.com/envoyproxy/ratelimit/src/stats" + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" pb "github.com/envoyproxy/go-control-plane/envoy/service/ratelimit/v3" "github.com/envoyproxy/ratelimit/src/config" "github.com/envoyproxy/ratelimit/src/redis" @@ -60,8 +64,15 @@ type rateLimitServiceTestSuite struct { runtimeUpdateCallback chan<- int statsManager stats.Manager statStore gostats.Store + mockClock ratelimit.Clock +} + +type MockClock struct { + now int64 } +func (c MockClock) Now() time.Time { return time.Unix(c.now, 0) } + func commonSetup(t *testing.T) rateLimitServiceTestSuite { ret := rateLimitServiceTestSuite{} ret.assert = assert.New(t) @@ -87,7 +98,7 @@ func (this *rateLimitServiceTestSuite) setupBasicService() ratelimit.RateLimitSe this.configLoader.EXPECT().Load( []config.RateLimitConfigToLoad{{"config.basic_config", "fake_yaml"}}, gomock.Any()).Return(this.config) - return ratelimit.NewService(this.runtime, this.cache, this.configLoader, this.statsManager, true) + return ratelimit.NewService(this.runtime, this.cache, this.configLoader, this.statsManager, true, MockClock{now: int64(2222)}) } func TestService(test *testing.T) { @@ -176,6 +187,99 @@ func TestService(test *testing.T) { t.assert.EqualValues(1, t.statStore.NewCounter("config_load_error").Value()) } +func TestServiceWithCustomHeaders(test *testing.T) { + os.Setenv("LIMIT_LIMIT_HEADER", "A-Ratelimit-Limit") + os.Setenv("LIMIT_REMAINING_HEADER", "A-Ratelimit-Remaining") + os.Setenv("LIMIT_RESET_HEADER", "A-Ratelimit-Reset") + defer func() { + os.Unsetenv("LIMIT_LIMIT_HEADER") + os.Unsetenv("LIMIT_REMAINING_HEADER") + os.Unsetenv("LIMIT_RESET_HEADER") + }() + + t := commonSetup(test) + defer t.controller.Finish() + service := t.setupBasicService() + + // Config reload. + barrier := newBarrier() + t.configLoader.EXPECT().Load( + []config.RateLimitConfigToLoad{{"config.basic_config", "fake_yaml"}}, gomock.Any()).Do( + func([]config.RateLimitConfigToLoad, stats.Manager) { barrier.signal() }).Return(t.config) + t.runtimeUpdateCallback <- 1 + barrier.wait() + + // Make request + request := common.NewRateLimitRequest( + "different-domain", [][][2]string{{{"foo", "bar"}}, {{"hello", "world"}}}, 1) + limits := []*config.RateLimit{ + config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_MINUTE, t.statsManager.NewStats("key"), false), + nil} + t.config.EXPECT().GetLimit(nil, "different-domain", request.Descriptors[0]).Return(limits[0]) + t.config.EXPECT().GetLimit(nil, "different-domain", request.Descriptors[1]).Return(limits[1]) + t.cache.EXPECT().DoLimit(nil, request, limits).Return( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[0].Limit, LimitRemaining: 0}, + {Code: pb.RateLimitResponse_OK, CurrentLimit: nil, LimitRemaining: 0}}) + + response, err := service.ShouldRateLimit(nil, request) + common.AssertProtoEqual( + t.assert, + &pb.RateLimitResponse{ + OverallCode: pb.RateLimitResponse_OVER_LIMIT, + Statuses: []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[0].Limit, LimitRemaining: 0}, + {Code: pb.RateLimitResponse_OK, CurrentLimit: nil, LimitRemaining: 0}, + }, + ResponseHeadersToAdd: []*core.HeaderValue{ + {Key: "A-Ratelimit-Limit", Value: "10"}, + {Key: "A-Ratelimit-Remaining", Value: "0"}, + {Key: "A-Ratelimit-Reset", Value: "58"}, + }, + }, + response) + t.assert.Nil(err) + + // Config load failure. + t.configLoader.EXPECT().Load( + []config.RateLimitConfigToLoad{{"config.basic_config", "fake_yaml"}}, gomock.Any()).Do( + func([]config.RateLimitConfigToLoad, stats.Manager) { + defer barrier.signal() + panic(config.RateLimitConfigError("load error")) + }) + t.runtimeUpdateCallback <- 1 + barrier.wait() + + // Config should still be valid. Also make sure order does not affect results. + limits = []*config.RateLimit{ + nil, + config.NewRateLimit(10, pb.RateLimitResponse_RateLimit_MINUTE, t.statsManager.NewStats("key"), false)} + t.config.EXPECT().GetLimit(nil, "different-domain", request.Descriptors[0]).Return(limits[0]) + t.config.EXPECT().GetLimit(nil, "different-domain", request.Descriptors[1]).Return(limits[1]) + t.cache.EXPECT().DoLimit(nil, request, limits).Return( + []*pb.RateLimitResponse_DescriptorStatus{{Code: pb.RateLimitResponse_OK, CurrentLimit: nil, LimitRemaining: 0}, + {Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[1].Limit, LimitRemaining: 0}}) + response, err = service.ShouldRateLimit(nil, request) + common.AssertProtoEqual( + t.assert, + &pb.RateLimitResponse{ + OverallCode: pb.RateLimitResponse_OVER_LIMIT, + Statuses: []*pb.RateLimitResponse_DescriptorStatus{ + {Code: pb.RateLimitResponse_OK, CurrentLimit: nil, LimitRemaining: 0}, + {Code: pb.RateLimitResponse_OVER_LIMIT, CurrentLimit: limits[1].Limit, LimitRemaining: 0}, + }, + ResponseHeadersToAdd: []*core.HeaderValue{ + {Key: "A-Ratelimit-Limit", Value: "10"}, + {Key: "A-Ratelimit-Remaining", Value: "0"}, + {Key: "A-Ratelimit-Reset", Value: "58"}, + }, + }, + response) + t.assert.Nil(err) + + t.assert.EqualValues(2, t.statStore.NewCounter("config_load_success").Value()) + t.assert.EqualValues(1, t.statStore.NewCounter("config_load_error").Value()) +} + func TestEmptyDomain(test *testing.T) { t := commonSetup(test) defer t.controller.Finish() @@ -233,7 +337,7 @@ func TestInitialLoadError(test *testing.T) { func([]config.RateLimitConfigToLoad, stats.Manager) { panic(config.RateLimitConfigError("load error")) }) - service := ratelimit.NewService(t.runtime, t.cache, t.configLoader, t.statsManager, true) + service := ratelimit.NewService(t.runtime, t.cache, t.configLoader, t.statsManager, true, t.mockClock) request := common.NewRateLimitRequest("test-domain", [][][2]string{{{"hello", "world"}}}, 1) response, err := service.ShouldRateLimit(nil, request)