Skip to content

Commit

Permalink
Add better error reporting for relay rate limits
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@eigenlabs.org>
  • Loading branch information
cody-littley committed Jan 24, 2025
1 parent 180bc96 commit 7956f78
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 7 deletions.
9 changes: 7 additions & 2 deletions relay/limiter/blob_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,13 @@ func (l *BlobRateLimiter) RequestGetBlobBandwidth(now time.Time, bytes uint32) e
if l.relayMetrics != nil {
l.relayMetrics.ReportBlobRateLimited("global bandwidth")
}
return fmt.Errorf("global rate limit %dMib/s exceeded for getBlob bandwidth, try again later",
int(l.config.MaxGetBlobBytesPerSecond/1024/1024))

rateLimit := l.config.MaxGetBlobBytesPerSecond / 1024 / 1024
burstiness := l.config.GetBlobBytesBurstiness / 1024 / 1024

return fmt.Errorf(
"global rate limit %0.1fMiB/s (burstiness %dMiB) exceeded for getBlob bandwidth, try again later",
rateLimit, burstiness)
}
return nil
}
18 changes: 14 additions & 4 deletions relay/limiter/chunk_rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,13 @@ func (l *ChunkRateLimiter) RequestGetChunkBandwidth(now time.Time, requesterID s
if l.relayMetrics != nil {
l.relayMetrics.ReportChunkRateLimited("global bandwidth")
}
return fmt.Errorf("global rate limit %dMiB exceeded for GetChunk bandwidth, try again later",
int(l.config.MaxGetChunkBytesPerSecond/1024/1024))

rateLimit := l.config.MaxGetChunkBytesPerSecond / 1024 / 1024
burstiness := l.config.GetChunkBytesBurstiness / 1024 / 1024

return fmt.Errorf(
"global rate limit %0.1fMiB (burstiness %dMIB) exceeded for GetChunk bandwidth, try again later",
rateLimit, burstiness)
}

limiter, ok := l.perClientBandwidthLimiter[requesterID]
Expand All @@ -176,8 +181,13 @@ func (l *ChunkRateLimiter) RequestGetChunkBandwidth(now time.Time, requesterID s
if l.relayMetrics != nil {
l.relayMetrics.ReportChunkRateLimited("client bandwidth")
}
return fmt.Errorf("client rate limit %dMiB exceeded for GetChunk bandwidth, try again later",
int(l.config.MaxGetChunkBytesPerSecondClient/1024/1024))

rateLimit := l.config.MaxGetChunkBytesPerSecondClient / 1024 / 1024
burstiness := l.config.GetChunkBytesBurstinessClient / 1024 / 1024

return fmt.Errorf(
"client rate limit %0.1fMiB (bustiness %dMiB) exceeded for GetChunk bandwidth, try again later",
rateLimit, burstiness)
}

return nil
Expand Down
33 changes: 32 additions & 1 deletion relay/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (*
if strings.Contains(err.Error(), "internal error") {
return nil, api.NewErrorInternal(err.Error())
}
return nil, api.NewErrorResourceExhausted(fmt.Sprintf("bandwidth limit exceeded: %v", err))
return nil, buildInsufficientGetChunksBandwidthError(request, mMap, err)
}
s.metrics.ReportChunkDataSize(requiredBandwidth)

Expand Down Expand Up @@ -469,6 +469,37 @@ func computeChunkRequestRequiredBandwidth(request *pb.GetChunksRequest, mMap met
return requiredBandwidth, nil
}

// buildInsufficientBandwidthError builds an informative error message for when there is insufficient
// bandwidth to serve a GetChunks() request.
func buildInsufficientGetChunksBandwidthError(
request *pb.GetChunksRequest,
mMap metadataMap,
originalError error) error {

sb := strings.Builder{}
sb.WriteString("unable to serve chunks ")
for i, chunkRequest := range request.ChunkRequests {
var prefix string
var chunkCount int

if chunkRequest.GetByIndex() != nil {
prefix = "i"
chunkCount = len(chunkRequest.GetByIndex().ChunkIndices)
} else {
prefix = "r"
chunkCount = int(chunkRequest.GetByRange().EndIndex - chunkRequest.GetByRange().StartIndex)
}
chunkSize := int(mMap[v2.BlobKey(chunkRequest.GetByRange().GetBlobKey())].chunkSizeBytes)

sb.WriteString(fmt.Sprintf("%s(%d*%dbytes)", prefix, chunkCount, chunkSize))
if i < len(request.ChunkRequests)-1 {
sb.WriteString(", ")
}
}

return api.NewErrorResourceExhausted(fmt.Sprintf("%s: %v", sb.String(), originalError))
}

// Start starts the server listening for requests. This method will block until the server is stopped.
func (s *Server) Start(ctx context.Context) error {
s.metrics.Start()
Expand Down

0 comments on commit 7956f78

Please sign in to comment.