Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relay rate limits #906

Merged
merged 15 commits into from
Nov 19, 2024
33 changes: 18 additions & 15 deletions relay/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"github.com/Layr-Labs/eigenda/relay/limiter"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws"
Expand All @@ -12,21 +13,6 @@ import (
)

// Config is the configuration for the relay Server.
//
// Environment variables are mapped into this struct by taking the name of the field in this struct,
// converting to upper case, and prepending "RELAY_". For example, "BlobCacheSize" can be set using the
// environment variable "RELAY_BLOBCACHESIZE".
//
// For nested structs, add the name of the struct variable before the field name, separated by an underscore.
// For example, "Log.Format" can be set using the environment variable "RELAY_LOG_FORMAT".
//
// Slice values can be set using a comma-separated list. For example, "RelayIDs" can be set using the environment
// variable "RELAY_RELAYIDS='1,2,3,4'".
//
// It is also possible to set the configuration using a configuration file. The path to the configuration file should
// be passed as the first argument to the relay binary, e.g. "bin/relay config.yaml". The structure of the config
// file should mirror the structure of this struct, with keys in the config file matching the field names
// of this struct.
type Config struct {

// Log is the configuration for the logger. Default is common.DefaultLoggerConfig().
Expand Down Expand Up @@ -70,6 +56,23 @@ func NewConfig(ctx *cli.Context) (Config, error) {
BlobMaxConcurrency: ctx.Int(flags.BlobMaxConcurrencyFlag.Name),
ChunkCacheSize: ctx.Int(flags.ChunkCacheSizeFlag.Name),
ChunkMaxConcurrency: ctx.Int(flags.ChunkMaxConcurrencyFlag.Name),
RateLimits: limiter.Config{
MaxGetBlobOpsPerSecond: ctx.Float64(flags.MaxGetBlobOpsPerSecondFlag.Name),
GetBlobOpsBurstiness: ctx.Int(flags.GetBlobOpsBurstinessFlag.Name),
MaxGetBlobBytesPerSecond: ctx.Float64(flags.MaxGetBlobBytesPerSecondFlag.Name),
GetBlobBytesBurstiness: ctx.Int(flags.GetBlobBytesBurstinessFlag.Name),
MaxConcurrentGetBlobOps: ctx.Int(flags.MaxConcurrentGetBlobOpsFlag.Name),
MaxGetChunkOpsPerSecond: ctx.Float64(flags.MaxGetChunkOpsPerSecondFlag.Name),
GetChunkOpsBurstiness: ctx.Int(flags.GetChunkOpsBurstinessFlag.Name),
MaxGetChunkBytesPerSecond: ctx.Float64(flags.MaxGetChunkBytesPerSecondFlag.Name),
GetChunkBytesBurstiness: ctx.Int(flags.GetChunkBytesBurstinessFlag.Name),
MaxConcurrentGetChunkOps: ctx.Int(flags.MaxConcurrentGetChunkOpsFlag.Name),
MaxGetChunkOpsPerSecondClient: ctx.Float64(flags.MaxGetChunkOpsPerSecondClientFlag.Name),
GetChunkOpsBurstinessClient: ctx.Int(flags.GetChunkOpsBurstinessClientFlag.Name),
MaxGetChunkBytesPerSecondClient: ctx.Float64(flags.MaxGetChunkBytesPerSecondClientFlag.Name),
GetChunkBytesBurstinessClient: ctx.Int(flags.GetChunkBytesBurstinessClientFlag.Name),
MaxConcurrentGetChunkOpsClient: ctx.Int(flags.MaxConcurrentGetChunkOpsClientFlag.Name),
},
},
}
for i, id := range relayIDs {
Expand Down
119 changes: 119 additions & 0 deletions relay/cmd/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,110 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "CHUNK_MAX_CONCURRENCY"),
Value: 32,
}
MaxGetBlobOpsPerSecondFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-blob-ops-per-second"),
Usage: "Max number of GetBlob operations per second",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_BLOB_OPS_PER_SECOND"),
Value: 1024,
}
GetBlobOpsBurstinessFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-blob-ops-burstiness"),
Usage: "Burstiness of the GetBlob rate limiter",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_BLOB_OPS_BURSTINESS"),
Value: 1024,
}
MaxGetBlobBytesPerSecondFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-blob-bytes-per-second"),
Usage: "Max bandwidth for GetBlob operations in bytes per second",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_BLOB_BYTES_PER_SECOND"),
Value: 20 * 1024 * 1024,
}
GetBlobBytesBurstinessFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-blob-bytes-burstiness"),
Usage: "Burstiness of the GetBlob bandwidth rate limiter",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_BLOB_BYTES_BURSTINESS"),
Value: 20 * 1024 * 1024,
}
MaxConcurrentGetBlobOpsFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-concurrent-get-blob-ops"),
Usage: "Max number of concurrent GetBlob operations",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_CONCURRENT_GET_BLOB_OPS"),
Value: 1024,
}
MaxGetChunkOpsPerSecondFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-chunk-ops-per-second"),
Usage: "Max number of GetChunk operations per second",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_OPS_PER_SECOND"),
Value: 1024,
}
GetChunkOpsBurstinessFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-ops-burstiness"),
Usage: "Burstiness of the GetChunk rate limiter",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_OPS_BURSTINESS"),
Value: 1024,
}
MaxGetChunkBytesPerSecondFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-chunk-bytes-per-second"),
Usage: "Max bandwidth for GetChunk operations in bytes per second",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_BYTES_PER_SECOND"),
Value: 20 * 1024 * 1024,
}
GetChunkBytesBurstinessFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-bytes-burstiness"),
Usage: "Burstiness of the GetChunk bandwidth rate limiter",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_BYTES_BURSTINESS"),
Value: 20 * 1024 * 1024,
}
MaxConcurrentGetChunkOpsFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-concurrent-get-chunk-ops"),
Usage: "Max number of concurrent GetChunk operations",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_CONCURRENT_GET_CHUNK_OPS"),
Value: 1024,
}
MaxGetChunkOpsPerSecondClientFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-chunk-ops-per-second-client"),
Usage: "Max number of GetChunk operations per second per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_OPS_PER_SECOND_CLIENT"),
Value: 8,
}
GetChunkOpsBurstinessClientFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-ops-burstiness-client"),
Usage: "Burstiness of the GetChunk rate limiter per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_OPS_BURSTINESS_CLIENT"),
Value: 8,
}
MaxGetChunkBytesPerSecondClientFlag = cli.Float64Flag{
Name: common.PrefixFlag(FlagPrefix, "max-get-chunk-bytes-per-second-client"),
Usage: "Max bandwidth for GetChunk operations in bytes per second per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_BYTES_PER_SECOND_CLIENT"),
Value: 2 * 1024 * 1024,
}
GetChunkBytesBurstinessClientFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-bytes-burstiness-client"),
Usage: "Burstiness of the GetChunk bandwidth rate limiter per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_BYTES_BURSTINESS_CLIENT"),
}
MaxConcurrentGetChunkOpsClientFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-concurrent-get-chunk-ops-client"),
Usage: "Max number of concurrent GetChunk operations per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_CONCURRENT_GET_CHUNK_OPS_CLIENT"),
Value: 1,
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -102,6 +206,21 @@ var optionalFlags = []cli.Flag{
BlobMaxConcurrencyFlag,
ChunkCacheSizeFlag,
ChunkMaxConcurrencyFlag,
MaxGetBlobOpsPerSecondFlag,
GetBlobOpsBurstinessFlag,
MaxGetBlobBytesPerSecondFlag,
GetBlobBytesBurstinessFlag,
MaxConcurrentGetBlobOpsFlag,
MaxGetChunkOpsPerSecondFlag,
GetChunkOpsBurstinessFlag,
MaxGetChunkBytesPerSecondFlag,
GetChunkBytesBurstinessFlag,
MaxConcurrentGetChunkOpsFlag,
MaxGetChunkOpsPerSecondClientFlag,
GetChunkOpsBurstinessClientFlag,
MaxGetChunkBytesPerSecondClientFlag,
GetChunkBytesBurstinessClientFlag,
MaxConcurrentGetChunkOpsClientFlag,
}

var Flags []cli.Flag
Expand Down
102 changes: 102 additions & 0 deletions relay/limiter/blob_rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package limiter

import (
"fmt"
"golang.org/x/time/rate"
"sync"
"time"
)

// BlobRateLimiter enforces rate limits on GetBlob operations.
type BlobRateLimiter struct {

// config is the rate limit configuration.
config *Config

// opLimiter enforces rate limits on the maximum rate of GetBlob operations
opLimiter *rate.Limiter

// bandwidthLimiter enforces rate limits on the maximum bandwidth consumed by GetBlob operations. Only the size
// of the blob data is considered, not the size of the entire response.
bandwidthLimiter *rate.Limiter

// operationsInFlight is the number of GetBlob operations currently in flight.
operationsInFlight int

// this lock is used to provide thread safety
lock sync.Mutex
}

// NewBlobRateLimiter creates a new BlobRateLimiter.
func NewBlobRateLimiter(config *Config) *BlobRateLimiter {
globalGetBlobOpLimiter := rate.NewLimiter(
rate.Limit(config.MaxGetBlobOpsPerSecond),
config.GetBlobOpsBurstiness)

globalGetBlobBandwidthLimiter := rate.NewLimiter(
rate.Limit(config.MaxGetBlobBytesPerSecond),
config.GetBlobBytesBurstiness)

return &BlobRateLimiter{
config: config,
opLimiter: globalGetBlobOpLimiter,
bandwidthLimiter: globalGetBlobBandwidthLimiter,
}
}

// BeginGetBlobOperation should be called when a GetBlob operation is about to begin. If it returns an error,
// the operation should not be performed. If it does not return an error, FinishGetBlobOperation should be
// called when the operation completes.
func (l *BlobRateLimiter) BeginGetBlobOperation(now time.Time) error {
if l == nil {
// If the rate limiter is nil, do not enforce rate limits.
return nil
}

l.lock.Lock()
defer l.lock.Unlock()

if l.operationsInFlight >= l.config.MaxConcurrentGetBlobOps {
return fmt.Errorf("global concurrent request limit exceeded for getBlob operations, try again later")
}
if l.opLimiter.TokensAt(now) < 1 {
return fmt.Errorf("global rate limit exceeded for getBlob operations, try again later")
}

l.operationsInFlight++
l.opLimiter.AllowN(now, 1)

return nil
}

// FinishGetBlobOperation should be called exactly once for each time BeginGetBlobOperation is called and
// returns nil.
func (l *BlobRateLimiter) FinishGetBlobOperation() {
if l == nil {
// If the rate limiter is nil, do not enforce rate limits.
return
}

l.lock.Lock()
defer l.lock.Unlock()

l.operationsInFlight--
}

// RequestGetBlobBandwidth should be called when a GetBlob is about to start downloading blob data
// from S3. It returns an error if there is insufficient bandwidth available. If it returns nil, the
// operation should proceed.
func (l *BlobRateLimiter) RequestGetBlobBandwidth(now time.Time, bytes uint32) error {
if l == nil {
// If the rate limiter is nil, do not enforce rate limits.
return nil
}

// no locking needed, the only thing we touch here is the bandwidthLimiter, which is inherently thread-safe

allowed := l.bandwidthLimiter.AllowN(now, int(bytes))
if !allowed {
return fmt.Errorf("global rate limit exceeded for getBlob bandwidth, try again later")
}
return nil
}
Loading
Loading