Skip to content

Commit

Permalink
Fix how chunk size is computed for throttling. (#1158)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@eigenlabs.org>
  • Loading branch information
cody-littley authored Jan 28, 2025
1 parent f2df601 commit c27d274
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 102 deletions.
18 changes: 15 additions & 3 deletions relay/chunk_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newChunkProvider(
}

cacheAccessor, err := cache.NewCacheAccessor[blobKeyWithMetadata, []*encoding.Frame](
cache.NewFIFOCache[blobKeyWithMetadata, []*encoding.Frame](cacheSize, computeFramesCacheWeight),
cache.NewFIFOCache[blobKeyWithMetadata, []*encoding.Frame](cacheSize, server.computeFramesCacheWeight),
maxIOConcurrency,
server.fetchFrames,
metrics)
Expand All @@ -80,8 +80,20 @@ type frameMap map[v2.BlobKey][]*encoding.Frame

// computeFramesCacheWeight computes the 'weight' of the frames for the cache. The weight of a list of frames
// is equal to the size required to store the data, in bytes.
func computeFramesCacheWeight(key blobKeyWithMetadata, frames []*encoding.Frame) uint64 {
return uint64(len(frames)) * uint64(key.metadata.chunkSizeBytes)
func (s *chunkProvider) computeFramesCacheWeight(key blobKeyWithMetadata, frames []*encoding.Frame) uint64 {

// This returns the size of the frames when serialized.
// The in-memory footprint is much larger, so this isn't a good proxy for cache size.
// return uint64(len(frames)) * uint64(key.metadata.chunkSizeBytes)

size, err := computeInMemoryFrameSize(frames)

if err != nil {
s.logger.Errorf("Failed to compute frame size for blob %v: %s", key.blobKey.Hex(), err)
return 0
}

return size
}

// GetFrames retrieves the frames for a blob.
Expand Down
167 changes: 167 additions & 0 deletions relay/chunk_size.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package relay

import (
"fmt"
"github.com/Layr-Labs/eigenda/encoding"
"reflect"
"unsafe"
)

// computeInMemoryFrameSize computes the size of a blob's chunks in memory.
func computeInMemoryFrameSize(frames []*encoding.Frame) (uint64, error) {

if len(frames) == 0 {
return 0, fmt.Errorf("no frames provided")
}

firstFrame := frames[0]
firstFrameSize, err := SizeOf(firstFrame)
if err != nil {
return 0, fmt.Errorf("error calculating size of first frame: %w", err)
}

// all frames for a particular blob are the same size
size := firstFrameSize * uint64(len(frames))

return size, nil
}

// SizeOf calculates the size of an object in memory using reflection. Includes the memory
// referenced by the object. This function assumes that there are no circular references
// in the object graph. If there are, then this function will enter an infinite loop
// (likely ending with a stack overflow).
//
// This has non-trivial performance implications and should be used carefully.
func SizeOf(object any) (uint64, error) {
return recursiveSizeOf(object, true)
}

// recursiveSizeOf recursively walks through a data structure and calculates the memory it uses.
//
// If the indirect flag is true, then the provided object was referenced by its
// parent (e.g. it was a pointer or in a map). If the indirect flag is false,
// then the provided object was directly embedded in its parent.
func recursiveSizeOf(object any, indirect bool) (uint64, error) {

size := uint64(0)
if object == nil {
return size, nil
}

///////////////////////////////////////////////////////////////////////////////////
// Determine the size of this object. //
///////////////////////////////////////////////////////////////////////////////////

// If indirect is false, then this object's size will have been counted by its parent.
// If indirect is true, then we need to count this object's size.
if indirect {
// SizeOf is actually safe, it's just that the creators of golang decided that
// software engineers weren't smart enough to use it properly.
// Well, ok. That's a little insulting. I'm going to use it anyway.
size = uint64(unsafe.Sizeof(object))
}

///////////////////////////////////////////////////////////////////////////////////
// Determine the size of the memory referenced by this object. //
///////////////////////////////////////////////////////////////////////////////////

val := reflect.ValueOf(object)
objectType := val.Type().Kind()

switch objectType {
case reflect.Pointer:
// Although the bytes for the pointer itself will have been counted,
// the thing being pointed to will not have been counted.

referencedObject := val.Elem().Interface()
referencedSize, err := recursiveSizeOf(referencedObject, true)
if err != nil {
return 0, fmt.Errorf("error calculating size of referenced object: %w", err)
}
size += referencedSize

case reflect.Struct:
// iterate over the fields in the struct

fieldCount := val.NumField()
for index := 0; index < fieldCount; index++ {
field := val.Field(index)
fieldSize, err := recursiveSizeOf(field.Interface(), false)
if err != nil {
return 0, fmt.Errorf("error calculating size of field: %w", err)
}

size += fieldSize
}
case reflect.Array:
fallthrough
case reflect.Slice:
// The slice/array header will have been counted, but the memory it references will not have been.
// This is a little tricky because slices are pointers to arrays, so we need to get the size of the array.
length := val.Len()
for i := 0; i < length; i++ {
fieldSize, err := recursiveSizeOf(val.Index(i).Interface(), true)
if err != nil {
return 0, fmt.Errorf("error calculating size of field: %w", err)
}
size += fieldSize
}
case reflect.String:
// The string header will have been counted, but not the data contained in the string.
size += uint64(len(val.String()))
case reflect.Map:
// The map header will have been counted, but not the map's keys and values.
keys := val.MapKeys()
for _, key := range keys {
keySize, err := recursiveSizeOf(key.Interface(), true)
if err != nil {
return 0, fmt.Errorf("error calculating size of map key: %w", err)
}
size += keySize

valueSize, err := recursiveSizeOf(val.MapIndex(key).Interface(), true)
if err != nil {
return 0, fmt.Errorf("error calculating size of map value: %w", err)
}
size += valueSize
}
case reflect.Bool:
fallthrough
case reflect.Int:
fallthrough
case reflect.Int8:
fallthrough
case reflect.Int16:
fallthrough
case reflect.Int32:
fallthrough
case reflect.Int64:
fallthrough
case reflect.Uint:
fallthrough
case reflect.Uint8:
fallthrough
case reflect.Uint16:
fallthrough
case reflect.Uint32:
fallthrough
case reflect.Uint64:
fallthrough
case reflect.Float32:
fallthrough
case reflect.Float64:
fallthrough
case reflect.Complex64:
fallthrough
case reflect.Complex128:
// There is no memory referenced by these types.
default:
// This utility was created to calculate the size of simple object types, not as a general purpose
// memory calculator. If you're seeing this error, then you're trying to calculate the size of
// an object with some fancy type in it that I didn't bother with because I didn't need it.
// Take your unsafe pointers, functions, and other hoo haa and go calculate the size yourself, thank you.
return 0, fmt.Errorf("unsupported object type: %v", objectType)
}

return size, nil
}
2 changes: 1 addition & 1 deletion relay/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
MetadataMaxConcurrency: ctx.Int(flags.MetadataMaxConcurrencyFlag.Name),
BlobCacheBytes: ctx.Uint64(flags.BlobCacheBytes.Name),
BlobMaxConcurrency: ctx.Int(flags.BlobMaxConcurrencyFlag.Name),
ChunkCacheSize: ctx.Uint64(flags.ChunkCacheSizeFlag.Name),
ChunkCacheBytes: ctx.Uint64(flags.ChunkCacheBytesFlag.Name),
ChunkMaxConcurrency: ctx.Int(flags.ChunkMaxConcurrencyFlag.Name),
MaxKeysPerGetChunksRequest: ctx.Int(flags.MaxKeysPerGetChunksRequestFlag.Name),
RateLimits: limiter.Config{
Expand Down
18 changes: 9 additions & 9 deletions relay/cmd/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "BLOB_MAX_CONCURRENCY"),
Value: 32,
}
ChunkCacheSizeFlag = cli.Int64Flag{
Name: common.PrefixFlag(FlagPrefix, "chunk-cache-size"),
ChunkCacheBytesFlag = cli.Int64Flag{
Name: common.PrefixFlag(FlagPrefix, "chunk-cache-bytes"),
Usage: "Size of the chunk cache, in bytes.",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "CHUNK_CACHE_SIZE"),
Value: 4 * 1024 * 1024 * 1024,
EnvVar: common.PrefixEnvVar(envVarPrefix, "CHUNK_CACHE_BYTES"),
Value: 1024 * 1024 * 1024,
}
ChunkMaxConcurrencyFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "chunk-max-concurrency"),
Expand Down Expand Up @@ -150,14 +150,14 @@ var (
Usage: "Max bandwidth for GetChunk operations in bytes per second",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_BYTES_PER_SECOND"),
Value: 20 * 1024 * 1024,
Value: 80 * 1024 * 1024,
}
GetChunkBytesBurstinessFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-bytes-burstiness"),
Usage: "Burstiness of the GetChunk bandwidth rate limiter",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_BYTES_BURSTINESS"),
Value: 20 * 1024 * 1024,
Value: 800 * 1024 * 1024,
}
MaxConcurrentGetChunkOpsFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-concurrent-get-chunk-ops"),
Expand Down Expand Up @@ -185,14 +185,14 @@ var (
Usage: "Max bandwidth for GetChunk operations in bytes per second per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_GET_CHUNK_BYTES_PER_SECOND_CLIENT"),
Value: 2 * 1024 * 1024,
Value: 40 * 1024 * 1024,
}
GetChunkBytesBurstinessClientFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "get-chunk-bytes-burstiness-client"),
Usage: "Burstiness of the GetChunk bandwidth rate limiter per client",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "GET_CHUNK_BYTES_BURSTINESS_CLIENT"),
Value: 2 * 1024 * 1024,
Value: 400 * 1024 * 1024,
}
MaxConcurrentGetChunkOpsClientFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-concurrent-get-chunk-ops-client"),
Expand Down Expand Up @@ -306,7 +306,7 @@ var optionalFlags = []cli.Flag{
MetadataMaxConcurrencyFlag,
BlobCacheBytes,
BlobMaxConcurrencyFlag,
ChunkCacheSizeFlag,
ChunkCacheBytesFlag,
ChunkMaxConcurrencyFlag,
MaxKeysPerGetChunksRequestFlag,
MaxGetBlobOpsPerSecondFlag,
Expand Down
67 changes: 67 additions & 0 deletions relay/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package relay

import (
v2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/relay/limiter"
"time"
)

// Config is the configuration for the relay Server.
type Config struct {

// RelayKeys contains the keys of the relays that this server is willing to serve data for. If empty, the server will
// serve data for any shard it can.
RelayKeys []v2.RelayKey

// GRPCPort is the port that the relay server listens on.
GRPCPort int

// MaxGRPCMessageSize is the maximum size of a gRPC message that the server will accept.
MaxGRPCMessageSize int

// MetadataCacheSize is the maximum number of items in the metadata cache.
MetadataCacheSize int

// MetadataMaxConcurrency puts a limit on the maximum number of concurrent metadata fetches actively running on
// goroutines.
MetadataMaxConcurrency int

// BlobCacheBytes is the maximum size of the blob cache, in bytes.
BlobCacheBytes uint64

// BlobMaxConcurrency puts a limit on the maximum number of concurrent blob fetches actively running on goroutines.
BlobMaxConcurrency int

// ChunkCacheBytes is the maximum size of the chunk cache, in bytes.
ChunkCacheBytes uint64

// ChunkMaxConcurrency is the size of the work pool for fetching chunks. Note that this does not
// impact concurrency utilized by the s3 client to upload/download fragmented files.
ChunkMaxConcurrency int

// MaxKeysPerGetChunksRequest is the maximum number of keys that can be requested in a single GetChunks request.
MaxKeysPerGetChunksRequest int

// RateLimits contains configuration for rate limiting.
RateLimits limiter.Config

// AuthenticationKeyCacheSize is the maximum number of operator public keys that can be cached.
AuthenticationKeyCacheSize int

// AuthenticationTimeout is the duration for which an authentication is "cached". A request from the same client
// within this duration will not trigger a new authentication in order to save resources. If zero, then each request
// will be authenticated independently, regardless of timing.
AuthenticationTimeout time.Duration

// AuthenticationDisabled will disable authentication if set to true.
AuthenticationDisabled bool

// Timeouts contains configuration for relay timeouts.
Timeouts TimeoutConfig

// OnchainStateRefreshInterval is the interval at which the onchain state is refreshed.
OnchainStateRefreshInterval time.Duration

// MetricsPort is the port that the relay metrics server listens on.
MetricsPort int
}
26 changes: 20 additions & 6 deletions relay/metadata_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,24 @@ func (m *metadataProvider) UpdateBlobVersionParameters(blobParamsMap *v2.BlobVer
m.blobParamsMap.Store(blobParamsMap)
}

func (m *metadataProvider) computeChunkSize(header *v2.BlobHeader, totalChunkSizeBytes uint32) (uint32, error) {
blobParamsMap := m.blobParamsMap.Load()
if blobParamsMap == nil {
return 0, fmt.Errorf("blob version parameters is nil")
}

blobParams, ok := blobParamsMap.Get(header.BlobVersion)
if !ok {
return 0, fmt.Errorf("blob version %d not found in blob params map", header.BlobVersion)
}

if blobParams.NumChunks == 0 {
return 0, fmt.Errorf("numChunks is 0, this should never happen")
}

return totalChunkSizeBytes / blobParams.NumChunks, nil
}

// fetchMetadata retrieves metadata about a blob. Fetches from the cache if available, otherwise from the store.
func (m *metadataProvider) fetchMetadata(key v2.BlobKey) (*blobMetadata, error) {
ctx, cancel := context.WithTimeout(m.ctx, m.fetchTimeout)
Expand Down Expand Up @@ -192,12 +210,8 @@ func (m *metadataProvider) fetchMetadata(key v2.BlobKey) (*blobMetadata, error)

// TODO(cody-littley): blob size is not correct https://github.com/Layr-Labs/eigenda/pull/906#discussion_r1847396530
blobSize := uint32(cert.BlobHeader.BlobCommitments.Length) * encoding.BYTES_PER_SYMBOL
blobParams, ok := blobParamsMap.Get(cert.BlobHeader.BlobVersion)
if !ok {
return nil, fmt.Errorf("blob version %d not found in blob params map", cert.BlobHeader.BlobVersion)
}
chunkSize, err := v2.GetChunkLength(blobSize, blobParams)
chunkSize *= encoding.BYTES_PER_SYMBOL

chunkSize, err := m.computeChunkSize(cert.BlobHeader, fragmentInfo.TotalChunkSizeBytes)
if err != nil {
return nil, fmt.Errorf("error getting chunk length: %w", err)
}
Expand Down
Loading

0 comments on commit c27d274

Please sign in to comment.