Skip to content

Commit

Permalink
fix!: gateway: fix rate limiting, general cleanup
Browse files Browse the repository at this point in the history
Minor API changes:

* gateway.NewRateLimiterHandler and gateway.NewConnectionRateLimiterHandler have
  been replaced with gateway.NewRateLimitHandler.
* The handlers returned by both gateway.NewRateLimitHandler and the primary
  gateway.Handler return an http.Handler augmented with a Shutdown(ctx) method
  to be used for graceful cleanup of resources.

Fix:

* --per-conn-rate-limit was previously applied as a global rate limiter,
  effectively making it have the same impact as --rate-limit. This change fixes
  the behaviour such that --per-conn-rate-limit is applied as a API call
  limiter within a single connection (i.e. a WebSocket connection). The rate
  is specified as tokens-per-second, where tokens are relative to the expense
  of the API call being made.
  • Loading branch information
rvagg authored and rjan90 committed Aug 12, 2024
1 parent ec3842e commit 6d6f29f
Show file tree
Hide file tree
Showing 7 changed files with 417 additions and 150 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@

- fix: add datacap balance to circ supply internal accounting as unCirc #12348

## Improvements

- fix!: gateway: fix rate limiting, general cleanup ([filecoin-project/lotus#12315](https://github.com/filecoin-project/lotus/pull/12315)).
- CLI usage documentation has been improved for `lotus-gateway`
- `--per-conn-rate-limit` now works as advertised.
- Some APIs have changed which may impact users consuming Lotus Gateway code as a library.

# v1.28.1 / 2024-07-24

This is the MANDATORY Lotus v1.28.1 release, which will deliver the Filecoin network version 23, codenamed Waffle 🧇. v1.28.1 is also the minimal version that supports nv23.
Expand Down
47 changes: 27 additions & 20 deletions cmd/lotus-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,23 +132,29 @@ var runCmd = &cli.Command{
Value: int64(gateway.DefaultStateWaitLookbackLimit),
},
&cli.Int64Flag{
Name: "rate-limit",
Usage: "rate-limit API calls. Use 0 to disable",
Name: "rate-limit",
Usage: fmt.Sprintf(
"Global API call throttling rate limit (per second), weighted by relative expense of the call, with the most expensive calls counting for %d. Use 0 to disable",
gateway.MaxRateLimitTokens,
),
Value: 0,
},
&cli.Int64Flag{
Name: "per-conn-rate-limit",
Usage: "rate-limit API calls per each connection. Use 0 to disable",
Name: "per-conn-rate-limit",
Usage: fmt.Sprintf(
"API call throttling rate limit (per second) per WebSocket connection, weighted by relative expense of the call, with the most expensive calls counting for %d. Use 0 to disable",
gateway.MaxRateLimitTokens,
),
Value: 0,
},
&cli.DurationFlag{
Name: "rate-limit-timeout",
Usage: "the maximum time to wait for the rate limiter before returning an error to clients",
Usage: "The maximum time to wait for the API call throttling rate limiter before returning an error to clients",
Value: gateway.DefaultRateLimitTimeout,
},
&cli.Int64Flag{
Name: "conn-per-minute",
Usage: "The number of incomming connections to accept from a single IP per minute. Use 0 to disable",
Usage: "A hard limit on the number of incomming connections (requests) to accept per remote host per minute. Use 0 to disable",
Value: 0,
},
},
Expand All @@ -171,13 +177,13 @@ var runCmd = &cli.Command{
defer closer()

var (
lookbackCap = cctx.Duration("api-max-lookback")
address = cctx.String("listen")
waitLookback = abi.ChainEpoch(cctx.Int64("api-wait-lookback-limit"))
rateLimit = cctx.Int64("rate-limit")
perConnRateLimit = cctx.Int64("per-conn-rate-limit")
rateLimitTimeout = cctx.Duration("rate-limit-timeout")
connPerMinute = cctx.Int64("conn-per-minute")
lookbackCap = cctx.Duration("api-max-lookback")
address = cctx.String("listen")
waitLookback = abi.ChainEpoch(cctx.Int64("api-wait-lookback-limit"))
globalRateLimit = cctx.Int("rate-limit")
perConnectionRateLimit = cctx.Int("per-conn-rate-limit")
rateLimitTimeout = cctx.Duration("rate-limit-timeout")
perHostConnectionsPerMinute = cctx.Int("conn-per-minute")
)

serverOptions := make([]jsonrpc.ServerOption, 0)
Expand All @@ -197,21 +203,22 @@ var runCmd = &cli.Command{
return xerrors.Errorf("failed to convert endpoint address to multiaddr: %w", err)
}

gwapi := gateway.NewNode(api, subHnd, lookbackCap, waitLookback, rateLimit, rateLimitTimeout)
h, err := gateway.Handler(gwapi, api, perConnRateLimit, connPerMinute, serverOptions...)
gwapi := gateway.NewNode(api, subHnd, lookbackCap, waitLookback, int64(globalRateLimit), rateLimitTimeout)
handler, err := gateway.Handler(gwapi, api, perConnectionRateLimit, perHostConnectionsPerMinute, serverOptions...)
if err != nil {
return xerrors.Errorf("failed to set up gateway HTTP handler")
}

stopFunc, err := node.ServeRPC(h, "lotus-gateway", maddr)
stopFunc, err := node.ServeRPC(handler, "lotus-gateway", maddr)
if err != nil {
return xerrors.Errorf("failed to serve rpc endpoint: %w", err)
}

<-node.MonitorShutdown(nil, node.ShutdownHandler{
Component: "rpc",
StopFunc: stopFunc,
})
<-node.MonitorShutdown(
nil,
node.ShutdownHandler{Component: "rpc", StopFunc: stopFunc},
node.ShutdownHandler{Component: "rpc-handler", StopFunc: handler.Shutdown},
)
return nil
},
}
226 changes: 151 additions & 75 deletions gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,48 @@ import (
"github.com/filecoin-project/lotus/node"
)

type perConnLimiterKeyType string
type perConnectionAPIRateLimiterKeyType string
type filterTrackerKeyType string

const perConnLimiterKey perConnLimiterKeyType = "limiter"
const (
perConnectionAPIRateLimiterKey perConnectionAPIRateLimiterKeyType = "limiter"
statefulCallTrackerKey filterTrackerKeyType = "statefulCallTracker"
connectionLimiterCleanupInterval = 30 * time.Second
)

type filterTrackerKeyType string
// ShutdownHandler is an http.Handler that can be gracefully shutdown.
type ShutdownHandler interface {
http.Handler

Shutdown(ctx context.Context) error
}

const statefulCallTrackerKey filterTrackerKeyType = "statefulCallTracker"
var _ ShutdownHandler = &statefulCallHandler{}
var _ ShutdownHandler = &RateLimitHandler{}

// Handler returns a gateway http.Handler, to be mounted as-is on the server. The handler is
// returned as a ShutdownHandler which allows for graceful shutdown of the handler via its
// Shutdown method.
//
// The handler will limit the number of API calls per minute within a single WebSocket connection
// (where API calls are weighted by their relative expense), and the number of connections per
// minute from a single host.
//
// Connection limiting is a hard limit that will reject requests with a 429 status code if the limit
// is exceeded. API call limiting is a soft limit that will delay requests if the limit is exceeded.
func Handler(
gwapi lapi.Gateway,
api lapi.FullNode,
perConnectionAPIRateLimit int,
perHostConnectionsPerMinute int,
opts ...jsonrpc.ServerOption,
) (ShutdownHandler, error) {

// Handler returns a gateway http.Handler, to be mounted as-is on the server.
func Handler(gwapi lapi.Gateway, api lapi.FullNode, rateLimit int64, connPerMinute int64, opts ...jsonrpc.ServerOption) (http.Handler, error) {
m := mux.NewRouter()

opts = append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors))
serveRpc := func(path string, hnd interface{}) {
rpcServer := jsonrpc.NewServer(append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors))...)
rpcServer := jsonrpc.NewServer(opts...)
rpcServer.Register("Filecoin", hnd)
rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover")

Expand All @@ -61,104 +89,152 @@ func Handler(gwapi lapi.Gateway, api lapi.FullNode, rateLimit int64, connPerMinu
m.Handle("/health/readyz", node.NewReadyHandler(api))
m.PathPrefix("/").Handler(http.DefaultServeMux)

/*ah := &auth.Handler{
Verify: nodeApi.AuthVerify,
Next: mux.ServeHTTP,
}*/

rlh := NewRateLimiterHandler(m, rateLimit)
clh := NewConnectionRateLimiterHandler(rlh, connPerMinute)
return clh, nil
handler := &statefulCallHandler{m}
if perConnectionAPIRateLimit > 0 && perHostConnectionsPerMinute > 0 {
return NewRateLimitHandler(
handler,
perConnectionAPIRateLimit,
perHostConnectionsPerMinute,
connectionLimiterCleanupInterval,
), nil
}
return handler, nil
}

func NewRateLimiterHandler(handler http.Handler, rateLimit int64) *RateLimiterHandler {
limiter := limiterFromRateLimit(rateLimit)

return &RateLimiterHandler{
handler: handler,
limiter: limiter,
}
type statefulCallHandler struct {
next http.Handler
}

// RateLimiterHandler adds a rate limiter to the request context for per-connection rate limiting
type RateLimiterHandler struct {
handler http.Handler
limiter *rate.Limiter
func (h statefulCallHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(context.WithValue(r.Context(), statefulCallTrackerKey, newStatefulCallTracker()))
h.next.ServeHTTP(w, r)
}

func (h RateLimiterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(context.WithValue(r.Context(), perConnLimiterKey, h.limiter))
func (h statefulCallHandler) Shutdown(ctx context.Context) error {
return shutdown(ctx, h.next)
}

// also add a filter tracker to the context
r = r.WithContext(context.WithValue(r.Context(), statefulCallTrackerKey, newStatefulCallTracker()))
type hostLimiter struct {
limiter *rate.Limiter
lastAccess time.Time
}

h.handler.ServeHTTP(w, r)
type RateLimitHandler struct {
cancelFunc context.CancelFunc
mu sync.Mutex
limiters map[string]*hostLimiter
perConnectionAPILimit rate.Limit
perHostConnectionsPerMinute int
next http.Handler
cleanupInterval time.Duration
expiryDuration time.Duration
}

// NewConnectionRateLimiterHandler blocks new connections if there have already been too many.
func NewConnectionRateLimiterHandler(handler http.Handler, connPerMinute int64) *ConnectionRateLimiterHandler {
ipmap := make(map[string]int64)
return &ConnectionRateLimiterHandler{
ipmap: ipmap,
connPerMinute: connPerMinute,
handler: handler,
// NewRateLimitHandler creates a new RateLimitHandler that wraps the
// provided handler and limits the number of API calls per minute within a single WebSocket
// connection (where API calls are weighted by their relative expense), and the number of
// connections per minute from a single host.
// The cleanupInterval determines how often the handler will check for unused limiters to clean up.
func NewRateLimitHandler(
next http.Handler,
perConnectionAPIRateLimit int,
perHostConnectionsPerMinute int,
cleanupInterval time.Duration,
) *RateLimitHandler {

ctx, cancel := context.WithCancel(context.Background())
h := &RateLimitHandler{
cancelFunc: cancel,
limiters: make(map[string]*hostLimiter),
perConnectionAPILimit: rate.Inf,
perHostConnectionsPerMinute: perHostConnectionsPerMinute,
next: next,
cleanupInterval: cleanupInterval,
expiryDuration: 5 * cleanupInterval,
}
if perConnectionAPIRateLimit > 0 {
h.perConnectionAPILimit = rate.Every(time.Second / time.Duration(perConnectionAPIRateLimit))
}
go h.cleanupExpiredLimiters(ctx)
return h
}

type ConnectionRateLimiterHandler struct {
mu sync.Mutex
ipmap map[string]int64
connPerMinute int64
handler http.Handler
func (h *RateLimitHandler) getLimits(host string) *hostLimiter {
h.mu.Lock()
defer h.mu.Unlock()

entry, exists := h.limiters[host]
if !exists {
var limiter *rate.Limiter
if h.perHostConnectionsPerMinute > 0 {
requestLimit := rate.Every(time.Minute / time.Duration(h.perHostConnectionsPerMinute))
limiter = rate.NewLimiter(requestLimit, h.perHostConnectionsPerMinute)
}
entry = &hostLimiter{
limiter: limiter,
lastAccess: time.Now(),
}
h.limiters[host] = entry
} else {
entry.lastAccess = time.Now()
}

return entry
}

func (h *ConnectionRateLimiterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h.connPerMinute == 0 {
h.handler.ServeHTTP(w, r)
return
}
func (h *RateLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

h.mu.Lock()
seen, ok := h.ipmap[host]
if !ok {
h.ipmap[host] = 1
h.mu.Unlock()
h.handler.ServeHTTP(w, r)
limits := h.getLimits(host)
if limits.limiter != nil && !limits.limiter.Allow() {
w.WriteHeader(http.StatusTooManyRequests)
return
}
// rate limited
if seen > h.connPerMinute {
h.mu.Unlock()
w.WriteHeader(http.StatusTooManyRequests)

if h.perConnectionAPILimit != rate.Inf {
// new rate limiter for each connection, to throttle a single WebSockets connection;
// allow for a burst of MaxRateLimitTokens
apiLimiter := rate.NewLimiter(h.perConnectionAPILimit, MaxRateLimitTokens)
r = r.WithContext(context.WithValue(r.Context(), perConnectionAPIRateLimiterKey, apiLimiter))
}

h.next.ServeHTTP(w, r)
}

func (h *RateLimitHandler) cleanupExpiredLimiters(ctx context.Context) {
if h.cleanupInterval == 0 {
return
}
h.ipmap[host] = seen + 1
h.mu.Unlock()
go func() {

for {
select {
case <-time.After(time.Minute):
case <-ctx.Done():
return
case <-time.After(h.cleanupInterval):
h.mu.Lock()
defer h.mu.Unlock()
h.ipmap[host] = h.ipmap[host] - 1
if h.ipmap[host] <= 0 {
delete(h.ipmap, host)
now := time.Now()
for host, entry := range h.limiters {
if now.Sub(entry.lastAccess) > h.expiryDuration {
delete(h.limiters, host)
}
}
h.mu.Unlock()
}
}()
h.handler.ServeHTTP(w, r)
}
}

func limiterFromRateLimit(rateLimit int64) *rate.Limiter {
var limit rate.Limit
if rateLimit == 0 {
limit = rate.Inf
} else {
limit = rate.Every(time.Second / time.Duration(rateLimit))
func (h *RateLimitHandler) Shutdown(ctx context.Context) error {
h.cancelFunc()
return shutdown(ctx, h.next)
}

func shutdown(ctx context.Context, handler http.Handler) error {
if sh, ok := handler.(ShutdownHandler); ok {
return sh.Shutdown(ctx)
}
return rate.NewLimiter(limit, stateRateLimitTokens)
return nil
}
Loading

0 comments on commit 6d6f29f

Please sign in to comment.