From 9f68f747e3e76155826e8335cf198d04da291467 Mon Sep 17 00:00:00 2001 From: Cody Littley <56973212+cody-littley@users.noreply.github.com> Date: Fri, 6 Dec 2024 10:40:02 -0600 Subject: [PATCH] Relay metrics (#938) Signed-off-by: Cody Littley --- common/metrics/config.go | 10 - common/metrics/metrics.go | 8 +- common/metrics/metrics_server.go | 26 +- common/metrics/mock_metrics.go | 5 + common/metrics/test/main.go | 8 +- metrics.md | 3 +- .../churner/{ => mdoc}/churner-metrics.md | 0 operators/churner/metrics.go | 7 +- relay/blob_provider.go | 10 +- relay/blob_provider_test.go | 6 +- relay/cache/cache_accessor.go | 48 +- relay/cache/cache_accessor_metrics.go | 85 ++++ relay/cache/cache_accessor_test.go | 47 +- relay/cache/fifo-cache.go | 11 +- relay/cache/fifo_cache_test.go | 4 +- relay/chunk_provider.go | 10 +- relay/chunk_provider_test.go | 6 +- relay/cmd/config.go | 1 + relay/cmd/flags/flags.go | 8 + relay/limiter/blob_rate_limiter.go | 16 +- relay/limiter/blob_rate_limiter_test.go | 6 +- relay/limiter/chunk_rate_limiter.go | 27 +- relay/limiter/chunk_rate_limiter_test.go | 12 +- relay/mdoc/main.go | 24 + relay/mdoc/relay-metrics.md | 426 ++++++++++++++++++ relay/metadata_provider.go | 13 +- relay/metadata_provider_test.go | 15 +- relay/metrics/metrics.go | 228 ++++++++++ relay/server.go | 71 ++- relay/server_test.go | 36 +- 30 files changed, 1045 insertions(+), 132 deletions(-) delete mode 100644 common/metrics/config.go rename operators/churner/{ => mdoc}/churner-metrics.md (100%) create mode 100644 relay/cache/cache_accessor_metrics.go create mode 100644 relay/mdoc/main.go create mode 100644 relay/mdoc/relay-metrics.md create mode 100644 relay/metrics/metrics.go diff --git a/common/metrics/config.go b/common/metrics/config.go deleted file mode 100644 index 1bc8c0e4d9..0000000000 --- a/common/metrics/config.go +++ /dev/null @@ -1,10 +0,0 @@ -package metrics - -// Config provides configuration for a Metrics instance. -type Config struct { - // Namespace is the namespace for the metrics. - Namespace string - - // HTTPPort is the port to serve metrics on. - HTTPPort int -} diff --git a/common/metrics/metrics.go b/common/metrics/metrics.go index ed63a1c6a5..4fbf5d0547 100644 --- a/common/metrics/metrics.go +++ b/common/metrics/metrics.go @@ -1,6 +1,9 @@ package metrics -import "time" +import ( + "github.com/prometheus/client_golang/prometheus" + "time" +) // Metrics provides a convenient interface for reporting metrics. type Metrics interface { @@ -62,6 +65,9 @@ type Metrics interface { pollPeriod time.Duration, source func() float64, label ...any) error + + // RegisterExternalMetrics registers prometheus collectors created outside the metrics framework. + RegisterExternalMetrics(collectors ...prometheus.Collector) } // Metric represents a metric that can be reported. diff --git a/common/metrics/metrics_server.go b/common/metrics/metrics_server.go index f18fd02acd..7a5868dd0d 100644 --- a/common/metrics/metrics_server.go +++ b/common/metrics/metrics_server.go @@ -24,8 +24,8 @@ type metrics struct { // logger is the logger used to log messages. logger logging.Logger - // config is the configuration for the metrics. - config *Config + // namespace is prepended to all metric names. + namespace string // registry is the prometheus registry used to report metrics. registry *prometheus.Registry @@ -55,13 +55,13 @@ type metrics struct { } // NewMetrics creates a new Metrics instance. -func NewMetrics(logger logging.Logger, config *Config) Metrics { +func NewMetrics(logger logging.Logger, namespace string, port int) Metrics { reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) - logger.Infof("Starting metrics server at port %d", config.HTTPPort) - addr := fmt.Sprintf(":%d", config.HTTPPort) + logger.Infof("Starting metrics server at port %d", port) + addr := fmt.Sprintf(":%d", port) mux := http.NewServeMux() mux.Handle("/metrics", promhttp.HandlerFor( reg, @@ -74,7 +74,7 @@ func NewMetrics(logger logging.Logger, config *Config) Metrics { m := &metrics{ logger: logger, - config: config, + namespace: namespace, registry: reg, metricMap: make(map[metricID]Metric), isAlive: atomic.Bool{}, @@ -198,7 +198,7 @@ func (m *metrics) NewLatencyMetric( metric, err := newLatencyMetric( m.logger, m.registry, - m.config.Namespace, + m.namespace, name, description, objectives, @@ -238,7 +238,7 @@ func (m *metrics) NewCountMetric( metric, err := newCountMetric( m.logger, m.registry, - m.config.Namespace, + m.namespace, name, description, labelTemplate) @@ -287,7 +287,7 @@ func (m *metrics) newGaugeMetricUnsafe( metric, err := newGaugeMetric( m.logger, m.registry, - m.config.Namespace, + m.namespace, name, unit, description, @@ -368,7 +368,7 @@ func (m *metrics) GenerateMetricsDocumentation() string { } slices.SortFunc(metricIDs, sortFunc) - sb.Write([]byte(fmt.Sprintf("# Metrics Documentation for namespace '%s'\n\n", m.config.Namespace))) + sb.Write([]byte(fmt.Sprintf("# Metrics Documentation for namespace '%s'\n\n", m.namespace))) sb.Write([]byte(fmt.Sprintf("This documentation was automatically generated at time `%s`\n\n", time.Now().Format(time.RFC3339)))) @@ -402,7 +402,7 @@ func (m *metrics) GenerateMetricsDocumentation() string { sb.Write([]byte(fmt.Sprintf("| **Quantiles** | %s |\n", m.quantilesMap[*id]))) } sb.Write([]byte(fmt.Sprintf("| **Fully Qualified Name** | `%s_%s_%s` |\n", - m.config.Namespace, id.name, id.unit))) + m.namespace, id.name, id.unit))) } return sb.String() @@ -428,3 +428,7 @@ func (m *metrics) WriteMetricsDocumentation(fileName string) error { return nil } + +func (m *metrics) RegisterExternalMetrics(collectors ...prometheus.Collector) { + m.registry.MustRegister(collectors...) +} diff --git a/common/metrics/mock_metrics.go b/common/metrics/mock_metrics.go index 695a2662ef..244f268c90 100644 --- a/common/metrics/mock_metrics.go +++ b/common/metrics/mock_metrics.go @@ -1,6 +1,7 @@ package metrics import ( + "github.com/prometheus/client_golang/prometheus" "time" ) @@ -62,6 +63,10 @@ func (m *mockMetrics) NewAutoGauge( return nil } +func (m *mockMetrics) RegisterExternalMetrics(collectors ...prometheus.Collector) { + +} + var _ CountMetric = &mockCountMetric{} type mockCountMetric struct { diff --git a/common/metrics/test/main.go b/common/metrics/test/main.go index bdd1050b2b..430bfc75ba 100644 --- a/common/metrics/test/main.go +++ b/common/metrics/test/main.go @@ -24,18 +24,12 @@ type LabelType2 struct { } func main() { - - metricsConfig := &metrics.Config{ - Namespace: "test", - HTTPPort: 9101, - } - logger, err := common.NewLogger(common.DefaultLoggerConfig()) if err != nil { panic(err) } - metricsServer := metrics.NewMetrics(logger, metricsConfig) + metricsServer := metrics.NewMetrics(logger, "test", 9101) l1, err := metricsServer.NewLatencyMetric( "l1", diff --git a/metrics.md b/metrics.md index 72c61a314b..7138d98cae 100644 --- a/metrics.md +++ b/metrics.md @@ -1,4 +1,5 @@ # EigenDA Metrics Documentation -- [churner](operators/churner/churner-metrics.md) +- [churner](operators/churner/mdoc/churner-metrics.md) +- [relay](relay/mdoc/relay-metrics.md) diff --git a/operators/churner/churner-metrics.md b/operators/churner/mdoc/churner-metrics.md similarity index 100% rename from operators/churner/churner-metrics.md rename to operators/churner/mdoc/churner-metrics.md diff --git a/operators/churner/metrics.go b/operators/churner/metrics.go index 1f586e7ef7..6ca30a0edb 100644 --- a/operators/churner/metrics.go +++ b/operators/churner/metrics.go @@ -65,10 +65,7 @@ func NewMetrics(httpPort int, logger logging.Logger) (*Metrics, error) { reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) - metricsServer := metrics.NewMetrics(logger, &metrics.Config{ - Namespace: "eigenda_churner", - HTTPPort: httpPort, - }) + metricsServer := metrics.NewMetrics(logger, "eigenda_churner", httpPort) numRequests, err := metricsServer.NewCountMetric( "request", @@ -100,7 +97,7 @@ func NewMetrics(httpPort int, logger logging.Logger) (*Metrics, error) { // WriteMetricsDocumentation writes the metrics for the churner to a markdown file. func (g *Metrics) WriteMetricsDocumentation() error { - return g.metricsServer.WriteMetricsDocumentation("operators/churner/churner-metrics.md") + return g.metricsServer.WriteMetricsDocumentation("operators/churner/mdoc/churner-metrics.md") } // ObserveLatency observes the latency of a stage diff --git a/relay/blob_provider.go b/relay/blob_provider.go index 70cc310665..500156cbd4 100644 --- a/relay/blob_provider.go +++ b/relay/blob_provider.go @@ -33,7 +33,8 @@ func newBlobProvider( blobStore *blobstore.BlobStore, blobCacheSize uint64, maxIOConcurrency int, - fetchTimeout time.Duration) (*blobProvider, error) { + fetchTimeout time.Duration, + metrics *cache.CacheAccessorMetrics) (*blobProvider, error) { server := &blobProvider{ ctx: ctx, @@ -42,9 +43,12 @@ func newBlobProvider( fetchTimeout: fetchTimeout, } - c := cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize, computeBlobCacheWeight) + cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte]( + cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize, computeBlobCacheWeight), + maxIOConcurrency, + server.fetchBlob, + metrics) - cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](c, maxIOConcurrency, server.fetchBlob) if err != nil { return nil, fmt.Errorf("error creating blob cache: %w", err) } diff --git a/relay/blob_provider_test.go b/relay/blob_provider_test.go index 22368a5d5b..8ac2e6657d 100644 --- a/relay/blob_provider_test.go +++ b/relay/blob_provider_test.go @@ -41,7 +41,8 @@ func TestReadWrite(t *testing.T) { blobStore, 1024*1024*32, 32, - 10*time.Second) + 10*time.Second, + nil) require.NoError(t, err) // Read the blobs back. @@ -78,7 +79,8 @@ func TestNonExistentBlob(t *testing.T) { blobStore, 1024*1024*32, 32, - 10*time.Second) + 10*time.Second, + nil) require.NoError(t, err) for i := 0; i < 10; i++ { diff --git a/relay/cache/cache_accessor.go b/relay/cache/cache_accessor.go index a6389538b4..83c1f74484 100644 --- a/relay/cache/cache_accessor.go +++ b/relay/cache/cache_accessor.go @@ -4,6 +4,7 @@ import ( "context" "golang.org/x/sync/semaphore" "sync" + "time" ) // CacheAccessor is an interface for accessing a resource that is cached. It assumes that cache misses @@ -55,17 +56,24 @@ type cacheAccessor[K comparable, V any] struct { // accessor is the function used to fetch values that are not in the cache. accessor Accessor[K, V] + + // metrics is used to record metrics about the cache accessor's performance. + metrics *CacheAccessorMetrics } -// NewCacheAccessor creates a new CacheAccessor. The cacheSize parameter specifies the maximum number of items -// that can be stored in the cache. The concurrencyLimit parameter specifies the maximum number of concurrent -// lookups that can be in progress at any given time. If a greater number of lookups are requested, the excess -// lookups will block until a lookup completes. If concurrencyLimit is zero, then no limits are imposed. The accessor -// parameter is the function used to fetch values that are not in the cache. +// NewCacheAccessor creates a new CacheAccessor. +// +// The concurrencyLimit parameter specifies the maximum number of concurrent lookups that can be in progress at any +// given time. If a greater number of lookups are requested, the excess lookups will block until a lookup completes. +// If concurrencyLimit is zero, then no limits are imposed. The accessor parameter is the function used to fetch values that are not in the cache. +// +// If metrics is not nil, it will be used to record metrics about the cache accessor's performance. +// If nil, no metrics will be recorded. func NewCacheAccessor[K comparable, V any]( cache Cache[K, V], concurrencyLimit int, - accessor Accessor[K, V]) (CacheAccessor[K, V], error) { + accessor Accessor[K, V], + metrics *CacheAccessorMetrics) (CacheAccessor[K, V], error) { lookupsInProgress := make(map[K]*accessResult[V]) @@ -79,6 +87,7 @@ func NewCacheAccessor[K comparable, V any]( concurrencyLimiter: concurrencyLimiter, accessor: accessor, lookupsInProgress: lookupsInProgress, + metrics: metrics, }, nil } @@ -97,6 +106,10 @@ func (c *cacheAccessor[K, V]) Get(ctx context.Context, key K) (V, error) { v, ok := c.cache.Get(key) if ok { c.cacheLock.Unlock() + + if c.metrics != nil { + c.metrics.cacheHits.Increment() + } return v, nil } @@ -109,6 +122,10 @@ func (c *cacheAccessor[K, V]) Get(ctx context.Context, key K) (V, error) { c.cacheLock.Unlock() + if c.metrics != nil { + c.metrics.cacheMisses.Increment() + } + if alreadyLoading { // The result is being fetched on another goroutine. Wait for it to finish. return c.waitForResult(ctx, result) @@ -150,11 +167,30 @@ func (c *cacheAccessor[K, V]) fetchResult(ctx context.Context, key K, result *ac <-c.concurrencyLimiter } + if c.metrics != nil { + start := time.Now() + defer func() { + c.metrics.cacheMissLatency.ReportLatency(time.Since(start)) + }() + } + c.cacheLock.Lock() // Update the cache if the fetch was successful. if err == nil { c.cache.Put(key, value) + + if c.metrics != nil { + size := c.cache.Size() + weight := c.cache.Weight() + c.metrics.size.Set(float64(size)) + c.metrics.weight.Set(float64(weight)) + var averageWeight float64 + if size > 0 { + averageWeight = float64(weight) / float64(size) + } + c.metrics.averageWeight.Set(averageWeight) + } } // Provide the result to all other goroutines that may be waiting for it. diff --git a/relay/cache/cache_accessor_metrics.go b/relay/cache/cache_accessor_metrics.go new file mode 100644 index 0000000000..b28c54261d --- /dev/null +++ b/relay/cache/cache_accessor_metrics.go @@ -0,0 +1,85 @@ +package cache + +import ( + "fmt" + "github.com/Layr-Labs/eigenda/common/metrics" +) + +// CacheAccessorMetrics provides metrics for a CacheAccessor. +type CacheAccessorMetrics struct { + cacheHits metrics.CountMetric + cacheMisses metrics.CountMetric + size metrics.GaugeMetric + weight metrics.GaugeMetric + averageWeight metrics.GaugeMetric + cacheMissLatency metrics.LatencyMetric +} + +// NewCacheAccessorMetrics creates a new CacheAccessorMetrics. +func NewCacheAccessorMetrics( + server metrics.Metrics, + cacheName string) (*CacheAccessorMetrics, error) { + + cacheHits, err := server.NewCountMetric( + fmt.Sprintf("%s_cache_hit", cacheName), + fmt.Sprintf("Number of cache hits in the %s cache", cacheName), + nil) + if err != nil { + return nil, err + } + + cacheMisses, err := server.NewCountMetric( + fmt.Sprintf("%s_cache_miss", cacheName), + fmt.Sprintf("Number of cache misses in the %s cache", cacheName), + nil) + if err != nil { + return nil, err + } + + size, err := server.NewGaugeMetric( + fmt.Sprintf("%s_cache", cacheName), + "size", + fmt.Sprintf("Number of items in the %s cache", cacheName), + nil) + if err != nil { + return nil, err + } + + weight, err := server.NewGaugeMetric( + fmt.Sprintf("%s_cache", cacheName), + "weight", + fmt.Sprintf("Total weight of items in the %s cache", cacheName), + nil) + if err != nil { + return nil, err + } + + averageWeight, err := server.NewGaugeMetric( + fmt.Sprintf("%s_cache_item", cacheName), + "weight", + fmt.Sprintf("Weight of each item currently in the %s cache", cacheName), + nil) + if err != nil { + return nil, err + } + + cacheMissLatency, err := server.NewLatencyMetric( + fmt.Sprintf("%s_cache_miss_latency", cacheName), + fmt.Sprintf("Latency of cache misses in the %s cache", cacheName), + nil, + &metrics.Quantile{Quantile: 0.5, Error: 0.05}, + &metrics.Quantile{Quantile: 0.9, Error: 0.05}, + &metrics.Quantile{Quantile: 0.99, Error: 0.05}) + if err != nil { + return nil, err + } + + return &CacheAccessorMetrics{ + cacheHits: cacheHits, + cacheMisses: cacheMisses, + size: size, + weight: weight, + averageWeight: averageWeight, + cacheMissLatency: cacheMissLatency, + }, nil +} diff --git a/relay/cache/cache_accessor_test.go b/relay/cache/cache_accessor_test.go index 0f2ac501d4..68e895dafc 100644 --- a/relay/cache/cache_accessor_test.go +++ b/relay/cache/cache_accessor_test.go @@ -32,11 +32,9 @@ func TestRandomOperationsSingleThread(t *testing.T) { return &str, nil } cacheSize := rand.Intn(dataSize) + 1 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + cache := NewFIFOCache[int, *string](uint64(cacheSize), nil) + ca, err := NewCacheAccessor[int, *string](cache, 0, accessor, nil) require.NoError(t, err) for i := 0; i < dataSize; i++ { @@ -83,11 +81,8 @@ func TestCacheMisses(t *testing.T) { return &str, nil } - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + cache := NewFIFOCache[int, *string](uint64(cacheSize), nil) + ca, err := NewCacheAccessor[int, *string](cache, 0, accessor, nil) require.NoError(t, err) // Get the first cacheSize keys. This should fill the cache. @@ -150,11 +145,8 @@ func ParallelAccessTest(t *testing.T, sleepEnabled bool) { } cacheSize := rand.Intn(dataSize) + 1 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + cache := NewFIFOCache[int, *string](uint64(cacheSize), nil) + ca, err := NewCacheAccessor[int, *string](cache, 0, accessor, nil) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. @@ -223,11 +215,8 @@ func TestParallelAccessWithError(t *testing.T) { } cacheSize := 100 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + cache := NewFIFOCache[int, *string](uint64(cacheSize), nil) + ca, err := NewCacheAccessor[int, *string](cache, 0, accessor, nil) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. @@ -299,11 +288,9 @@ func TestConcurrencyLimiter(t *testing.T) { } cacheSize := 100 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - ca, err := NewCacheAccessor[int, *string](c, maxConcurrency, accessor) + cache := NewFIFOCache[int, *string](uint64(cacheSize), nil) + ca, err := NewCacheAccessor[int, *string](cache, maxConcurrency, accessor, nil) require.NoError(t, err) wg := sync.WaitGroup{} @@ -357,11 +344,8 @@ func TestOriginalRequesterTimesOut(t *testing.T) { } cacheSize := rand.Intn(dataSize) + 1 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + cache := NewFIFOCache[int, *string](uint64(cacheSize), nil) + ca, err := NewCacheAccessor[int, *string](cache, 0, accessor, nil) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. @@ -449,11 +433,8 @@ func TestSecondaryRequesterTimesOut(t *testing.T) { } cacheSize := rand.Intn(dataSize) + 1 - c := NewFIFOCache[int, *string](uint64(cacheSize), func(key int, value *string) uint64 { - return 1 - }) - - ca, err := NewCacheAccessor[int, *string](c, 0, accessor) + cache := NewFIFOCache[int, *string](uint64(cacheSize), nil) + ca, err := NewCacheAccessor[int, *string](cache, 0, accessor, nil) require.NoError(t, err) // Lock the accessor. This will cause all cache misses to block. diff --git a/relay/cache/fifo-cache.go b/relay/cache/fifo-cache.go index 1c2e7c6abd..253326a40a 100644 --- a/relay/cache/fifo-cache.go +++ b/relay/cache/fifo-cache.go @@ -18,8 +18,15 @@ type FIFOCache[K comparable, V any] struct { expirationQueue queues.Queue } -// NewFIFOCache creates a new FIFOCache. -func NewFIFOCache[K comparable, V any](maxWeight uint64, calculator WeightCalculator[K, V]) *FIFOCache[K, V] { +// NewFIFOCache creates a new FIFOCache. If the calculator is nil, the weight of each key-value pair will be 1. +func NewFIFOCache[K comparable, V any]( + maxWeight uint64, + calculator WeightCalculator[K, V]) Cache[K, V] { + + if calculator == nil { + calculator = func(K, V) uint64 { return 1 } + } + return &FIFOCache[K, V]{ maxWeight: maxWeight, data: make(map[K]V), diff --git a/relay/cache/fifo_cache_test.go b/relay/cache/fifo_cache_test.go index da4de5ad1f..b2dae451d7 100644 --- a/relay/cache/fifo_cache_test.go +++ b/relay/cache/fifo_cache_test.go @@ -11,9 +11,7 @@ func TestExpirationOrder(t *testing.T) { tu.InitializeRandom() maxWeight := uint64(10 + rand.Intn(10)) - c := NewFIFOCache[int, int](maxWeight, func(key int, value int) uint64 { - return 1 - }) + c := NewFIFOCache[int, int](maxWeight, nil) require.Equal(t, uint64(0), c.Weight()) require.Equal(t, 0, c.Size()) diff --git a/relay/chunk_provider.go b/relay/chunk_provider.go index 5bc2926732..2f9a33ead7 100644 --- a/relay/chunk_provider.go +++ b/relay/chunk_provider.go @@ -50,7 +50,8 @@ func newChunkProvider( cacheSize uint64, maxIOConcurrency int, proofFetchTimeout time.Duration, - coefficientFetchTimeout time.Duration) (*chunkProvider, error) { + coefficientFetchTimeout time.Duration, + metrics *cache.CacheAccessorMetrics) (*chunkProvider, error) { server := &chunkProvider{ ctx: ctx, @@ -60,12 +61,11 @@ func newChunkProvider( coefficientFetchTimeout: coefficientFetchTimeout, } - c := cache.NewFIFOCache[blobKeyWithMetadata, []*encoding.Frame](cacheSize, computeFramesCacheWeight) - cacheAccessor, err := cache.NewCacheAccessor[blobKeyWithMetadata, []*encoding.Frame]( - c, + cache.NewFIFOCache[blobKeyWithMetadata, []*encoding.Frame](cacheSize, computeFramesCacheWeight), maxIOConcurrency, - server.fetchFrames) + server.fetchFrames, + metrics) if err != nil { return nil, err } diff --git a/relay/chunk_provider_test.go b/relay/chunk_provider_test.go index 06ec215b80..99a85345d4 100644 --- a/relay/chunk_provider_test.go +++ b/relay/chunk_provider_test.go @@ -52,7 +52,8 @@ func TestFetchingIndividualBlobs(t *testing.T) { 1024*1024*32, 32, 10*time.Second, - 10*time.Second) + 10*time.Second, + nil) require.NoError(t, err) // Read it back. @@ -139,7 +140,8 @@ func TestFetchingBatchedBlobs(t *testing.T) { 1024*1024*32, 32, 10*time.Second, - 10*time.Second) + 10*time.Second, + nil) require.NoError(t, err) // Read it back. diff --git a/relay/cmd/config.go b/relay/cmd/config.go index e1358f6430..3337858569 100644 --- a/relay/cmd/config.go +++ b/relay/cmd/config.go @@ -93,6 +93,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { InternalGetProofsTimeout: ctx.Duration(flags.InternalGetProofsTimeoutFlag.Name), InternalGetCoefficientsTimeout: ctx.Duration(flags.InternalGetCoefficientsTimeoutFlag.Name), }, + MetricsPort: ctx.Int(flags.MetricsPortFlag.Name), }, EthClientConfig: geth.ReadEthClientConfigRPCOnly(ctx), BLSOperatorStateRetrieverAddr: ctx.String(flags.BlsOperatorStateRetrieverAddrFlag.Name), diff --git a/relay/cmd/flags/flags.go b/relay/cmd/flags/flags.go index 837c6cf9e1..6211509b35 100644 --- a/relay/cmd/flags/flags.go +++ b/relay/cmd/flags/flags.go @@ -274,6 +274,13 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "ONCHAIN_STATE_REFRESH_INTERVAL"), Value: 1 * time.Hour, } + MetricsPortFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "metrics-port"), + Usage: "Port to listen on for metrics", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "METRICS_PORT"), + Value: 9101, + } ) var requiredFlags = []cli.Flag{ @@ -318,6 +325,7 @@ var optionalFlags = []cli.Flag{ InternalGetProofsTimeoutFlag, InternalGetCoefficientsTimeoutFlag, OnchainStateRefreshIntervalFlag, + MetricsPortFlag, } var Flags []cli.Flag diff --git a/relay/limiter/blob_rate_limiter.go b/relay/limiter/blob_rate_limiter.go index 1131af863b..8766d9d870 100644 --- a/relay/limiter/blob_rate_limiter.go +++ b/relay/limiter/blob_rate_limiter.go @@ -2,6 +2,7 @@ package limiter import ( "fmt" + "github.com/Layr-Labs/eigenda/relay/metrics" "golang.org/x/time/rate" "sync" "time" @@ -23,12 +24,15 @@ type BlobRateLimiter struct { // operationsInFlight is the number of GetBlob operations currently in flight. operationsInFlight int + // Encapsulates relay metrics. + relayMetrics *metrics.RelayMetrics + // this lock is used to provide thread safety lock sync.Mutex } // NewBlobRateLimiter creates a new BlobRateLimiter. -func NewBlobRateLimiter(config *Config) *BlobRateLimiter { +func NewBlobRateLimiter(config *Config, relayMetrics *metrics.RelayMetrics) *BlobRateLimiter { globalGetBlobOpLimiter := rate.NewLimiter( rate.Limit(config.MaxGetBlobOpsPerSecond), config.GetBlobOpsBurstiness) @@ -41,6 +45,7 @@ func NewBlobRateLimiter(config *Config) *BlobRateLimiter { config: config, opLimiter: globalGetBlobOpLimiter, bandwidthLimiter: globalGetBlobBandwidthLimiter, + relayMetrics: relayMetrics, } } @@ -57,10 +62,16 @@ func (l *BlobRateLimiter) BeginGetBlobOperation(now time.Time) error { defer l.lock.Unlock() if l.operationsInFlight >= l.config.MaxConcurrentGetBlobOps { + if l.relayMetrics != nil { + l.relayMetrics.GetBlobRateLimited.Increment(metrics.RateLimitLabel{Reason: "global concurrency"}) + } return fmt.Errorf("global concurrent request limit %d exceeded for getBlob operations, try again later", l.config.MaxConcurrentGetBlobOps) } if l.opLimiter.TokensAt(now) < 1 { + if l.relayMetrics != nil { + l.relayMetrics.GetBlobRateLimited.Increment(metrics.RateLimitLabel{Reason: "global rate"}) + } return fmt.Errorf("global rate limit %0.1fhz exceeded for getBlob operations, try again later", l.config.MaxGetBlobOpsPerSecond) } @@ -98,6 +109,9 @@ func (l *BlobRateLimiter) RequestGetBlobBandwidth(now time.Time, bytes uint32) e allowed := l.bandwidthLimiter.AllowN(now, int(bytes)) if !allowed { + if l.relayMetrics != nil { + l.relayMetrics.GetBlobRateLimited.Increment(metrics.RateLimitLabel{Reason: "global bandwidth"}) + } return fmt.Errorf("global rate limit %dMib/s exceeded for getBlob bandwidth, try again later", int(l.config.MaxGetBlobBytesPerSecond/1024/1024)) } diff --git a/relay/limiter/blob_rate_limiter_test.go b/relay/limiter/blob_rate_limiter_test.go index 2966b6bea0..fc482adb44 100644 --- a/relay/limiter/blob_rate_limiter_test.go +++ b/relay/limiter/blob_rate_limiter_test.go @@ -38,7 +38,7 @@ func TestConcurrentBlobOperations(t *testing.T) { // Make the burstiness limit high enough that we won't be rate limited config.GetBlobOpsBurstiness = concurrencyLimit * 100 - limiter := NewBlobRateLimiter(config) + limiter := NewBlobRateLimiter(config, nil) // time starts at current time, but advances manually afterward now := time.Now() @@ -69,7 +69,7 @@ func TestGetBlobOpRateLimit(t *testing.T) { config.GetBlobOpsBurstiness = int(config.MaxGetBlobOpsPerSecond) + rand.Intn(10) config.MaxConcurrentGetBlobOps = 1 - limiter := NewBlobRateLimiter(config) + limiter := NewBlobRateLimiter(config, nil) // time starts at current time, but advances manually afterward now := time.Now() @@ -129,7 +129,7 @@ func TestGetBlobBandwidthLimit(t *testing.T) { config.MaxGetBlobBytesPerSecond = float64(1024 + rand.Intn(1024*1024)) config.GetBlobBytesBurstiness = int(config.MaxGetBlobBytesPerSecond) + rand.Intn(1024*1024) - limiter := NewBlobRateLimiter(config) + limiter := NewBlobRateLimiter(config, nil) // time starts at current time, but advances manually afterward now := time.Now() diff --git a/relay/limiter/chunk_rate_limiter.go b/relay/limiter/chunk_rate_limiter.go index af71b317b9..d5de3e650b 100644 --- a/relay/limiter/chunk_rate_limiter.go +++ b/relay/limiter/chunk_rate_limiter.go @@ -2,6 +2,7 @@ package limiter import ( "fmt" + "github.com/Layr-Labs/eigenda/relay/metrics" "golang.org/x/time/rate" "sync" "time" @@ -36,12 +37,17 @@ type ChunkRateLimiter struct { // perClientOperationsInFlight is the number of GetChunk operations currently in flight for each client. perClientOperationsInFlight map[string]int + // Encapsulates relay metrics. + relayMetrics *metrics.RelayMetrics + // this lock is used to provide thread safety lock sync.Mutex } // NewChunkRateLimiter creates a new ChunkRateLimiter. -func NewChunkRateLimiter(config *Config) *ChunkRateLimiter { +func NewChunkRateLimiter( + config *Config, + relayMetrics *metrics.RelayMetrics) *ChunkRateLimiter { globalOpLimiter := rate.NewLimiter(rate.Limit( config.MaxGetChunkOpsPerSecond), @@ -58,6 +64,7 @@ func NewChunkRateLimiter(config *Config) *ChunkRateLimiter { perClientOpLimiter: make(map[string]*rate.Limiter), perClientBandwidthLimiter: make(map[string]*rate.Limiter), perClientOperationsInFlight: make(map[string]int), + relayMetrics: relayMetrics, } } @@ -90,19 +97,31 @@ func (l *ChunkRateLimiter) BeginGetChunkOperation( } if l.globalOperationsInFlight >= l.config.MaxConcurrentGetChunkOps { + if l.relayMetrics != nil { + l.relayMetrics.GetChunksRateLimited.Increment(metrics.RateLimitLabel{Reason: "global concurrency"}) + } return fmt.Errorf( "global concurrent request limit %d exceeded for GetChunks operations, try again later", l.config.MaxConcurrentGetChunkOps) } if l.globalOpLimiter.TokensAt(now) < 1 { + if l.relayMetrics != nil { + l.relayMetrics.GetChunksRateLimited.Increment(metrics.RateLimitLabel{Reason: "global rate"}) + } return fmt.Errorf("global rate limit %0.1fhz exceeded for GetChunks operations, try again later", l.config.MaxGetChunkOpsPerSecond) } if l.perClientOperationsInFlight[requesterID] >= l.config.MaxConcurrentGetChunkOpsClient { + if l.relayMetrics != nil { + l.relayMetrics.GetChunksRateLimited.Increment(metrics.RateLimitLabel{Reason: "client concurrency"}) + } return fmt.Errorf("client concurrent request limit %d exceeded for GetChunks", l.config.MaxConcurrentGetChunkOpsClient) } if l.perClientOpLimiter[requesterID].TokensAt(now) < 1 { + if l.relayMetrics != nil { + l.relayMetrics.GetChunksRateLimited.Increment(metrics.RateLimitLabel{Reason: "client rate"}) + } return fmt.Errorf("client rate limit %0.1fhz exceeded for GetChunks, try again later", l.config.MaxGetChunkOpsPerSecondClient) } @@ -139,6 +158,9 @@ func (l *ChunkRateLimiter) RequestGetChunkBandwidth(now time.Time, requesterID s allowed := l.globalBandwidthLimiter.AllowN(now, bytes) if !allowed { + if l.relayMetrics != nil { + l.relayMetrics.GetChunksRateLimited.Increment(metrics.RateLimitLabel{Reason: "global bandwidth"}) + } return fmt.Errorf("global rate limit %dMiB exceeded for GetChunk bandwidth, try again later", int(l.config.MaxGetChunkBytesPerSecond/1024/1024)) } @@ -150,6 +172,9 @@ func (l *ChunkRateLimiter) RequestGetChunkBandwidth(now time.Time, requesterID s allowed = limiter.AllowN(now, bytes) if !allowed { l.globalBandwidthLimiter.AllowN(now, -bytes) + if l.relayMetrics != nil { + l.relayMetrics.GetChunksRateLimited.Increment(metrics.RateLimitLabel{Reason: "client bandwidth"}) + } return fmt.Errorf("client rate limit %dMiB exceeded for GetChunk bandwidth, try again later", int(l.config.MaxGetChunkBytesPerSecondClient/1024/1024)) } diff --git a/relay/limiter/chunk_rate_limiter_test.go b/relay/limiter/chunk_rate_limiter_test.go index 59399ca17f..98116e07c6 100644 --- a/relay/limiter/chunk_rate_limiter_test.go +++ b/relay/limiter/chunk_rate_limiter_test.go @@ -22,7 +22,7 @@ func TestConcurrentGetChunksOperations(t *testing.T) { userID := tu.RandomString(64) - limiter := NewChunkRateLimiter(config) + limiter := NewChunkRateLimiter(config, nil) // time starts at current time, but advances manually afterward now := time.Now() @@ -56,7 +56,7 @@ func TestGetChunksRateLimit(t *testing.T) { userID := tu.RandomString(64) - limiter := NewChunkRateLimiter(config) + limiter := NewChunkRateLimiter(config, nil) // time starts at current time, but advances manually afterward now := time.Now() @@ -120,7 +120,7 @@ func TestGetChunksBandwidthLimit(t *testing.T) { userID := tu.RandomString(64) - limiter := NewChunkRateLimiter(config) + limiter := NewChunkRateLimiter(config, nil) // time starts at current time, but advances manually afterward now := time.Now() @@ -170,7 +170,7 @@ func TestPerClientConcurrencyLimit(t *testing.T) { userID1 := tu.RandomString(64) userID2 := tu.RandomString(64) - limiter := NewChunkRateLimiter(config) + limiter := NewChunkRateLimiter(config, nil) // time starts at current time, but advances manually afterward now := time.Now() @@ -218,7 +218,7 @@ func TestOpLimitPerClient(t *testing.T) { userID1 := tu.RandomString(64) userID2 := tu.RandomString(64) - limiter := NewChunkRateLimiter(config) + limiter := NewChunkRateLimiter(config, nil) // time starts at current time, but advances manually afterward now := time.Now() @@ -276,7 +276,7 @@ func TestBandwidthLimitPerClient(t *testing.T) { userID1 := tu.RandomString(64) userID2 := tu.RandomString(64) - limiter := NewChunkRateLimiter(config) + limiter := NewChunkRateLimiter(config, nil) // time starts at current time, but advances manually afterward now := time.Now() diff --git a/relay/mdoc/main.go b/relay/mdoc/main.go new file mode 100644 index 0000000000..56c68999fd --- /dev/null +++ b/relay/mdoc/main.go @@ -0,0 +1,24 @@ +package main + +import ( + "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/relay/metrics" +) + +// main generates documentation for relay metrics. +func main() { + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + if err != nil { + panic(err) + } + + metrics, err := metrics.NewRelayMetrics(logger, 0) + if err != nil { + panic(err) + } + + err = metrics.WriteMetricsDocumentation() + if err != nil { + panic(err) + } +} diff --git a/relay/mdoc/relay-metrics.md b/relay/mdoc/relay-metrics.md new file mode 100644 index 0000000000..959c3a5493 --- /dev/null +++ b/relay/mdoc/relay-metrics.md @@ -0,0 +1,426 @@ +# Metrics Documentation for namespace 'relay' + +This documentation was automatically generated at time `2024-12-03T10:26:19-06:00` + +There are a total of `34` registered metrics. + +--- + +## blob_cache_size + +Number of items in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache` | +| **Unit** | `size` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_blob_cache_size` | +--- + +## blob_cache_weight + +Total weight of items in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_blob_cache_weight` | +--- + +## blob_cache_average_weight + +Average weight of items currently in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache_average` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_blob_cache_average_weight` | +--- + +## blob_cache_hit_count + +Number of cache hits in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache_hit` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_blob_cache_hit_count` | +--- + +## blob_cache_lifespan_ms + +Time an item remains in the blob cache before being evicted. + +| | | +|---|---| +| **Name** | `blob_cache_lifespan` | +| **Unit** | `ms` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_blob_cache_lifespan_ms` | +--- + +## blob_cache_miss_count + +Number of cache misses in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache_miss` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_blob_cache_miss_count` | +--- + +## blob_cache_miss_latency_ms + +Latency of cache misses in the blob cache + +| | | +|---|---| +| **Name** | `blob_cache_miss_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_blob_cache_miss_latency_ms` | +--- + +## chunk_cache_size + +Number of items in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache` | +| **Unit** | `size` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_chunk_cache_size` | +--- + +## chunk_cache_weight + +Total weight of items in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_chunk_cache_weight` | +--- + +## chunk_cache_average_weight + +Average weight of items currently in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache_average` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_chunk_cache_average_weight` | +--- + +## chunk_cache_hit_count + +Number of cache hits in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache_hit` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_chunk_cache_hit_count` | +--- + +## chunk_cache_lifespan_ms + +Time an item remains in the chunk cache before being evicted. + +| | | +|---|---| +| **Name** | `chunk_cache_lifespan` | +| **Unit** | `ms` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_chunk_cache_lifespan_ms` | +--- + +## chunk_cache_miss_count + +Number of cache misses in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache_miss` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_chunk_cache_miss_count` | +--- + +## chunk_cache_miss_latency_ms + +Latency of cache misses in the chunk cache + +| | | +|---|---| +| **Name** | `chunk_cache_miss_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_chunk_cache_miss_latency_ms` | +--- + +## get_blob_data_latency_ms + +Latency of the GetBlob RPC data retrieval + +| | | +|---|---| +| **Name** | `get_blob_data_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_get_blob_data_latency_ms` | +--- + +## get_blob_data_size_bytes + +Data size of requested blobs. + +| | | +|---|---| +| **Name** | `get_blob_data_size` | +| **Unit** | `bytes` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_get_blob_data_size_bytes` | +--- + +## get_blob_latency_ms + +Latency of the GetBlob RPC + +| | | +|---|---| +| **Name** | `get_blob_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_get_blob_latency_ms` | +--- + +## get_blob_metadata_latency_ms + +Latency of the GetBlob RPC metadata retrieval + +| | | +|---|---| +| **Name** | `get_blob_metadata_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_get_blob_metadata_latency_ms` | +--- + +## get_blob_rate_limited_count + +Number of GetBlob RPC rate limited + +| | | +|---|---| +| **Name** | `get_blob_rate_limited` | +| **Unit** | `count` | +| **Labels** | `reason` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_get_blob_rate_limited_count` | +--- + +## get_chunks_auth_failure_count + +Number of GetChunks RPC authentication failures + +| | | +|---|---| +| **Name** | `get_chunks_auth_failure` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_get_chunks_auth_failure_count` | +--- + +## get_chunks_authentication_latency_ms + +Latency of the GetChunks RPC client authentication + +| | | +|---|---| +| **Name** | `get_chunks_authentication_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_get_chunks_authentication_latency_ms` | +--- + +## get_chunks_data_latency_ms + +Latency of the GetChunks RPC data retrieval + +| | | +|---|---| +| **Name** | `get_chunks_data_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_get_chunks_data_latency_ms` | +--- + +## get_chunks_data_size_bytes + +Data size in a GetChunks request. + +| | | +|---|---| +| **Name** | `get_chunks_data_size` | +| **Unit** | `bytes` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_get_chunks_data_size_bytes` | +--- + +## get_chunks_key_count + +Number of keys in a GetChunks request. + +| | | +|---|---| +| **Name** | `get_chunks_key` | +| **Unit** | `count` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_get_chunks_key_count` | +--- + +## get_chunks_latency_ms + +Latency of the GetChunks RPC + +| | | +|---|---| +| **Name** | `get_chunks_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_get_chunks_latency_ms` | +--- + +## get_chunks_metadata_latency_ms + +Latency of the GetChunks RPC metadata retrieval + +| | | +|---|---| +| **Name** | `get_chunks_metadata_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_get_chunks_metadata_latency_ms` | +--- + +## get_chunks_rate_limited_count + +Number of GetChunks RPC rate limited + +| | | +|---|---| +| **Name** | `get_chunks_rate_limited` | +| **Unit** | `count` | +| **Labels** | `reason` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_get_chunks_rate_limited_count` | +--- + +## metadata_cache_size + +Number of items in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache` | +| **Unit** | `size` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_metadata_cache_size` | +--- + +## metadata_cache_weight + +Total weight of items in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_metadata_cache_weight` | +--- + +## metadata_cache_average_weight + +Average weight of items currently in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache_average` | +| **Unit** | `weight` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_metadata_cache_average_weight` | +--- + +## metadata_cache_hit_count + +Number of cache hits in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache_hit` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_metadata_cache_hit_count` | +--- + +## metadata_cache_lifespan_ms + +Time an item remains in the metadata cache before being evicted. + +| | | +|---|---| +| **Name** | `metadata_cache_lifespan` | +| **Unit** | `ms` | +| **Type** | `gauge` | +| **Fully Qualified Name** | `relay_metadata_cache_lifespan_ms` | +--- + +## metadata_cache_miss_count + +Number of cache misses in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache_miss` | +| **Unit** | `count` | +| **Type** | `counter` | +| **Fully Qualified Name** | `relay_metadata_cache_miss_count` | +--- + +## metadata_cache_miss_latency_ms + +Latency of cache misses in the metadata cache + +| | | +|---|---| +| **Name** | `metadata_cache_miss_latency` | +| **Unit** | `ms` | +| **Type** | `latency` | +| **Quantiles** | `0.500`, `0.900`, `0.990` | +| **Fully Qualified Name** | `relay_metadata_cache_miss_latency_ms` | diff --git a/relay/metadata_provider.go b/relay/metadata_provider.go index e1f188bb9e..7a6ace83ae 100644 --- a/relay/metadata_provider.go +++ b/relay/metadata_provider.go @@ -58,7 +58,8 @@ func newMetadataProvider( maxIOConcurrency int, relayIDs []v2.RelayKey, fetchTimeout time.Duration, - blobParamsMap *v2.BlobVersionParameterMap) (*metadataProvider, error) { + blobParamsMap *v2.BlobVersionParameterMap, + metrics *cache.CacheAccessorMetrics) (*metadataProvider, error) { relayIDSet := make(map[v2.RelayKey]struct{}, len(relayIDs)) for _, id := range relayIDs { @@ -74,15 +75,11 @@ func newMetadataProvider( } server.blobParamsMap.Store(blobParamsMap) - c := cache.NewFIFOCache[v2.BlobKey, *blobMetadata](uint64(metadataCacheSize), - func(key v2.BlobKey, value *blobMetadata) uint64 { - return uint64(1) - }) - metadataCache, err := cache.NewCacheAccessor[v2.BlobKey, *blobMetadata]( - c, + cache.NewFIFOCache[v2.BlobKey, *blobMetadata](uint64(metadataCacheSize), nil), maxIOConcurrency, - server.fetchMetadata) + server.fetchMetadata, + metrics) if err != nil { return nil, fmt.Errorf("error creating metadata cache: %w", err) } diff --git a/relay/metadata_provider_test.go b/relay/metadata_provider_test.go index b48e157ec0..093cfd9f65 100644 --- a/relay/metadata_provider_test.go +++ b/relay/metadata_provider_test.go @@ -32,7 +32,8 @@ func TestGetNonExistentBlob(t *testing.T) { 32, nil, 10*time.Second, - v2.NewBlobVersionParameterMap(mockBlobParamsMap())) + v2.NewBlobVersionParameterMap(mockBlobParamsMap()), + nil) require.NoError(t, err) // Try to fetch a non-existent blobs @@ -98,7 +99,8 @@ func TestFetchingIndividualMetadata(t *testing.T) { 32, nil, 10*time.Second, - v2.NewBlobVersionParameterMap(mockBlobParamsMap())) + v2.NewBlobVersionParameterMap(mockBlobParamsMap()), + nil) require.NoError(t, err) @@ -183,7 +185,8 @@ func TestBatchedFetch(t *testing.T) { 32, nil, 10*time.Second, - v2.NewBlobVersionParameterMap(mockBlobParamsMap())) + v2.NewBlobVersionParameterMap(mockBlobParamsMap()), + nil) require.NoError(t, err) // Each iteration, choose a random subset of the keys to fetch @@ -289,7 +292,8 @@ func TestIndividualFetchWithSharding(t *testing.T) { 32, shardList, 10*time.Second, - v2.NewBlobVersionParameterMap(mockBlobParamsMap())) + v2.NewBlobVersionParameterMap(mockBlobParamsMap()), + nil) require.NoError(t, err) // Fetch the metadata from the server. @@ -421,7 +425,8 @@ func TestBatchedFetchWithSharding(t *testing.T) { 32, shardList, 10*time.Second, - v2.NewBlobVersionParameterMap(mockBlobParamsMap())) + v2.NewBlobVersionParameterMap(mockBlobParamsMap()), + nil) require.NoError(t, err) // Each iteration, choose two random keys to fetch. There will be a 25% chance that both blobs map to valid shards. diff --git a/relay/metrics/metrics.go b/relay/metrics/metrics.go new file mode 100644 index 0000000000..93ba8da1e1 --- /dev/null +++ b/relay/metrics/metrics.go @@ -0,0 +1,228 @@ +package metrics + +import ( + "github.com/Layr-Labs/eigenda/common/metrics" + "github.com/Layr-Labs/eigenda/relay/cache" + "github.com/Layr-Labs/eigensdk-go/logging" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "google.golang.org/grpc" +) + +type RelayMetrics struct { + metricsServer metrics.Metrics + grpcServerOption grpc.ServerOption + + // Cache metrics + MetadataCacheMetrics *cache.CacheAccessorMetrics + ChunkCacheMetrics *cache.CacheAccessorMetrics + BlobCacheMetrics *cache.CacheAccessorMetrics + + // GetChunks metrics + GetChunksLatency metrics.LatencyMetric + GetChunksAuthenticationLatency metrics.LatencyMetric + GetChunksMetadataLatency metrics.LatencyMetric + GetChunksDataLatency metrics.LatencyMetric + GetChunksAuthFailures metrics.CountMetric + GetChunksRateLimited metrics.CountMetric + GetChunksKeyCount metrics.GaugeMetric + GetChunksDataSize metrics.GaugeMetric + + // GetBlob metrics + GetBlobLatency metrics.LatencyMetric + GetBlobMetadataLatency metrics.LatencyMetric + GetBlobDataLatency metrics.LatencyMetric + GetBlobRateLimited metrics.CountMetric + GetBlobDataSize metrics.GaugeMetric +} + +type RateLimitLabel struct { + Reason string +} + +// NewRelayMetrics creates a new RelayMetrics instance, which encapsulates all metrics related to the relay. +func NewRelayMetrics(logger logging.Logger, port int) (*RelayMetrics, error) { + + server := metrics.NewMetrics(logger, "relay", port) + + grpcMetrics := grpcprom.NewServerMetrics() + server.RegisterExternalMetrics(grpcMetrics) + grpcServerOption := grpc.UnaryInterceptor( + grpcMetrics.UnaryServerInterceptor(), + ) + + standardQuantiles := []*metrics.Quantile{ + metrics.NewQuantile(0.5), + metrics.NewQuantile(0.9), + metrics.NewQuantile(0.99), + } + + metadataCacheMetrics, err := cache.NewCacheAccessorMetrics(server, "metadata") + if err != nil { + return nil, err + } + + chunkCacheMetrics, err := cache.NewCacheAccessorMetrics(server, "chunk") + if err != nil { + return nil, err + } + + blobCacheMetrics, err := cache.NewCacheAccessorMetrics(server, "blob") + if err != nil { + return nil, err + } + + getChunksLatencyMetric, err := server.NewLatencyMetric( + "get_chunks_latency", + "Latency of the GetChunks RPC", + nil, + standardQuantiles...) + if err != nil { + return nil, err + } + + getChunksAuthenticationLatencyMetric, err := server.NewLatencyMetric( + "get_chunks_authentication_latency", + "Latency of the GetChunks RPC client authentication", + nil, + standardQuantiles...) + if err != nil { + return nil, err + } + + getChunksMetadataLatencyMetric, err := server.NewLatencyMetric( + "get_chunks_metadata_latency", + "Latency of the GetChunks RPC metadata retrieval", + nil, + standardQuantiles...) + if err != nil { + return nil, err + } + + getChunksDataLatencyMetric, err := server.NewLatencyMetric( + "get_chunks_data_latency", + "Latency of the GetChunks RPC data retrieval", + nil, + standardQuantiles...) + if err != nil { + return nil, err + } + + getChunksAuthFailures, err := server.NewCountMetric( + "get_chunks_auth_failure", + "Number of GetChunks RPC authentication failures", + nil) + if err != nil { + return nil, err + } + + getChunksRateLimited, err := server.NewCountMetric( + "get_chunks_rate_limited", + "Number of GetChunks RPC rate limited", + RateLimitLabel{}) + if err != nil { + return nil, err + } + + getChunksKeyCount, err := server.NewGaugeMetric( + "get_chunks_key", + "count", + "Number of keys in a GetChunks request.", + nil) + if err != nil { + return nil, err + } + + getChunksDataSize, err := server.NewGaugeMetric( + "get_chunks_data_size", + "bytes", + "Data size in a GetChunks request.", + nil) + if err != nil { + return nil, err + } + + getBlobLatencyMetric, err := server.NewLatencyMetric( + "get_blob_latency", + "Latency of the GetBlob RPC", + nil, + standardQuantiles...) + if err != nil { + return nil, err + } + + getBlobMetadataLatencyMetric, err := server.NewLatencyMetric( + "get_blob_metadata_latency", + "Latency of the GetBlob RPC metadata retrieval", + nil, + standardQuantiles...) + if err != nil { + return nil, err + } + + getBlobDataLatencyMetric, err := server.NewLatencyMetric( + "get_blob_data_latency", + "Latency of the GetBlob RPC data retrieval", + nil, + standardQuantiles...) + if err != nil { + return nil, err + } + + getBlobRateLimited, err := server.NewCountMetric( + "get_blob_rate_limited", + "Number of GetBlob RPC rate limited", + RateLimitLabel{}) + if err != nil { + return nil, err + } + + getBlobDataSize, err := server.NewGaugeMetric( + "get_blob_data_size", + "bytes", + "Data size of requested blobs.", + nil) + if err != nil { + return nil, err + } + + return &RelayMetrics{ + metricsServer: server, + MetadataCacheMetrics: metadataCacheMetrics, + ChunkCacheMetrics: chunkCacheMetrics, + BlobCacheMetrics: blobCacheMetrics, + grpcServerOption: grpcServerOption, + GetChunksLatency: getChunksLatencyMetric, + GetChunksAuthenticationLatency: getChunksAuthenticationLatencyMetric, + GetChunksMetadataLatency: getChunksMetadataLatencyMetric, + GetChunksDataLatency: getChunksDataLatencyMetric, + GetChunksAuthFailures: getChunksAuthFailures, + GetChunksRateLimited: getChunksRateLimited, + GetChunksKeyCount: getChunksKeyCount, + GetChunksDataSize: getChunksDataSize, + GetBlobLatency: getBlobLatencyMetric, + GetBlobMetadataLatency: getBlobMetadataLatencyMetric, + GetBlobDataLatency: getBlobDataLatencyMetric, + GetBlobRateLimited: getBlobRateLimited, + GetBlobDataSize: getBlobDataSize, + }, nil +} + +// Start starts the metrics server. +func (m *RelayMetrics) Start() error { + return m.metricsServer.Start() +} + +// Stop stops the metrics server. +func (m *RelayMetrics) Stop() error { + return m.metricsServer.Stop() +} + +// GetGRPCServerOption returns the gRPC server option that enables automatic GRPC metrics collection. +func (m *RelayMetrics) GetGRPCServerOption() grpc.ServerOption { + return m.grpcServerOption +} + +// WriteMetricsDocumentation writes the metrics for the churner to a markdown file. +func (m *RelayMetrics) WriteMetricsDocumentation() error { + return m.metricsServer.WriteMetricsDocumentation("relay/mdoc/relay-metrics.md") +} diff --git a/relay/server.go b/relay/server.go index eb00709e9f..4a300762ed 100644 --- a/relay/server.go +++ b/relay/server.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/Layr-Labs/eigenda/relay/metrics" "net" "time" @@ -57,6 +58,9 @@ type Server struct { // chainReader is the core.Reader used to fetch blob parameters. chainReader core.Reader + + // metrics encapsulates the metrics for the relay server. + metrics *metrics.RelayMetrics } type Config struct { @@ -113,6 +117,9 @@ type Config struct { // OnchainStateRefreshInterval is the interval at which the onchain state is refreshed. OnchainStateRefreshInterval time.Duration + + // MetricsPort is the port that the relay metrics server listens on. + MetricsPort int } // NewServer creates a new relay Server. @@ -126,6 +133,7 @@ func NewServer( chainReader core.Reader, ics core.IndexedChainState, ) (*Server, error) { + if chainReader == nil { return nil, errors.New("chainReader is required") } @@ -135,6 +143,11 @@ func NewServer( return nil, fmt.Errorf("error fetching blob params: %w", err) } + relayMetrics, err := metrics.NewRelayMetrics(logger, config.MetricsPort) + if err != nil { + return nil, fmt.Errorf("error creating relayMetrics: %w", err) + } + mp, err := newMetadataProvider( ctx, logger, @@ -143,7 +156,8 @@ func NewServer( config.MetadataMaxConcurrency, config.RelayIDs, config.Timeouts.InternalGetMetadataTimeout, - v2.NewBlobVersionParameterMap(blobParams)) + v2.NewBlobVersionParameterMap(blobParams), + relayMetrics.MetadataCacheMetrics) if err != nil { return nil, fmt.Errorf("error creating metadata provider: %w", err) @@ -155,7 +169,8 @@ func NewServer( blobStore, config.BlobCacheBytes, config.BlobMaxConcurrency, - config.Timeouts.InternalGetBlobTimeout) + config.Timeouts.InternalGetBlobTimeout, + relayMetrics.BlobCacheMetrics) if err != nil { return nil, fmt.Errorf("error creating blob provider: %w", err) } @@ -167,7 +182,8 @@ func NewServer( config.ChunkCacheSize, config.ChunkMaxConcurrency, config.Timeouts.InternalGetProofsTimeout, - config.Timeouts.InternalGetCoefficientsTimeout) + config.Timeouts.InternalGetCoefficientsTimeout, + relayMetrics.ChunkCacheMetrics) if err != nil { return nil, fmt.Errorf("error creating chunk provider: %w", err) } @@ -190,14 +206,17 @@ func NewServer( metadataProvider: mp, blobProvider: bp, chunkProvider: cp, - blobRateLimiter: limiter.NewBlobRateLimiter(&config.RateLimits), - chunkRateLimiter: limiter.NewChunkRateLimiter(&config.RateLimits), + blobRateLimiter: limiter.NewBlobRateLimiter(&config.RateLimits, relayMetrics), + chunkRateLimiter: limiter.NewChunkRateLimiter(&config.RateLimits, relayMetrics), authenticator: authenticator, + metrics: relayMetrics, }, nil } // GetBlob retrieves a blob stored by the relay. func (s *Server) GetBlob(ctx context.Context, request *pb.GetBlobRequest) (*pb.GetBlobReply, error) { + start := time.Now() + if s.config.Timeouts.GetBlobTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, s.config.Timeouts.GetBlobTimeout) @@ -226,6 +245,9 @@ func (s *Server) GetBlob(ctx context.Context, request *pb.GetBlobRequest) (*pb.G return nil, fmt.Errorf("blob not found") } + finishedFetchingMetadata := time.Now() + s.metrics.GetBlobMetadataLatency.ReportLatency(finishedFetchingMetadata.Sub(start)) + err = s.blobRateLimiter.RequestGetBlobBandwidth(time.Now(), metadata.blobSizeBytes) if err != nil { return nil, err @@ -236,15 +258,20 @@ func (s *Server) GetBlob(ctx context.Context, request *pb.GetBlobRequest) (*pb.G return nil, fmt.Errorf("error fetching blob %s: %w", key.Hex(), err) } + s.metrics.GetBlobDataSize.Set(float64(len(data))) + s.metrics.GetBlobDataLatency.ReportLatency(time.Since(finishedFetchingMetadata)) + s.metrics.GetBlobLatency.ReportLatency(time.Since(start)) + reply := &pb.GetBlobReply{ Blob: data, } - return reply, nil } // GetChunks retrieves chunks from blobs stored by the relay. func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (*pb.GetChunksReply, error) { + start := time.Now() + if s.config.Timeouts.GetChunksTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, s.config.Timeouts.GetChunksTimeout) @@ -258,6 +285,7 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (* return nil, fmt.Errorf( "too many chunk requests provided, max is %d", s.config.MaxKeysPerGetChunksRequest) } + s.metrics.GetChunksKeyCount.Set(float64(len(request.ChunkRequests))) if s.authenticator != nil { client, ok := peer.FromContext(ctx) @@ -268,13 +296,18 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (* err := s.authenticator.AuthenticateGetChunksRequest(ctx, clientAddress, request, time.Now()) if err != nil { + s.metrics.GetChunksAuthFailures.Increment() return nil, fmt.Errorf("auth failed: %w", err) } } + finishedAuthenticating := time.Now() + if s.authenticator != nil { + s.metrics.GetChunksAuthenticationLatency.ReportLatency(finishedAuthenticating.Sub(start)) + } + clientID := string(request.OperatorId) err := s.chunkRateLimiter.BeginGetChunkOperation(time.Now(), clientID) - if err != nil { return nil, err } @@ -292,6 +325,9 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (* "error fetching metadata for blob, check if blob exists and is assigned to this relay: %w", err) } + finishedFetchingMetadata := time.Now() + s.metrics.GetChunksMetadataLatency.ReportLatency(finishedFetchingMetadata.Sub(finishedAuthenticating)) + requiredBandwidth, err := computeChunkRequestRequiredBandwidth(request, mMap) if err != nil { return nil, fmt.Errorf("error computing required bandwidth: %w", err) @@ -300,6 +336,7 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (* if err != nil { return nil, err } + s.metrics.GetChunksDataSize.Set(float64(requiredBandwidth)) frames, err := s.chunkProvider.GetFrames(ctx, mMap) if err != nil { @@ -311,6 +348,9 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (* return nil, fmt.Errorf("error gathering chunk data: %w", err) } + s.metrics.GetChunksDataLatency.ReportLatency(time.Since(finishedFetchingMetadata)) + s.metrics.GetChunksLatency.ReportLatency(time.Since(start)) + return &pb.GetChunksReply{ Data: bytesToSend, }, nil @@ -430,6 +470,11 @@ func computeChunkRequestRequiredBandwidth(request *pb.GetChunksRequest, mMap met // Start starts the server listening for requests. This method will block until the server is stopped. func (s *Server) Start(ctx context.Context) error { + err := s.metrics.Start() + if err != nil { + return fmt.Errorf("error starting metrics server: %w", err) + } + if s.chainReader != nil && s.metadataProvider != nil { go func() { _ = s.RefreshOnchainState(ctx) @@ -445,7 +490,7 @@ func (s *Server) Start(ctx context.Context) error { opt := grpc.MaxRecvMsgSize(s.config.MaxGRPCMessageSize) - s.grpcServer = grpc.NewServer(opt) + s.grpcServer = grpc.NewServer(opt, s.metrics.GetGRPCServerOption()) reflection.Register(s.grpcServer) pb.RegisterRelayServer(s.grpcServer, s) @@ -454,7 +499,6 @@ func (s *Server) Start(ctx context.Context) error { healthcheck.RegisterHealthServer(name, s.grpcServer) s.logger.Info("GRPC Listening", "port", s.config.GRPCPort, "address", listener.Addr().String()) - if err = s.grpcServer.Serve(listener); err != nil { return errors.New("could not start GRPC server") } @@ -483,8 +527,15 @@ func (s *Server) RefreshOnchainState(ctx context.Context) error { } // Stop stops the server. -func (s *Server) Stop() { +func (s *Server) Stop() error { if s.grpcServer != nil { s.grpcServer.Stop() } + + err := s.metrics.Stop() + if err != nil { + return fmt.Errorf("error stopping metrics server: %w", err) + } + + return nil } diff --git a/relay/server_test.go b/relay/server_test.go index 58b8893714..801ebeef87 100644 --- a/relay/server_test.go +++ b/relay/server_test.go @@ -56,6 +56,7 @@ func defaultConfig() *Config { InternalGetProofsTimeout: 10 * time.Second, InternalGetCoefficientsTimeout: 10 * time.Second, }, + MetricsPort: 9101, } } @@ -122,7 +123,10 @@ func TestReadWriteBlobs(t *testing.T) { err = server.Start(context.Background()) require.NoError(t, err) }() - defer server.Stop() + defer func() { + err = server.Stop() + require.NoError(t, err) + }() expectedData := make(map[v2.BlobKey][]byte) @@ -202,7 +206,10 @@ func TestReadNonExistentBlob(t *testing.T) { err = server.Start(context.Background()) require.NoError(t, err) }() - defer server.Stop() + defer func() { + err = server.Stop() + require.NoError(t, err) + }() for i := 0; i < 10; i++ { request := &pb.GetBlobRequest{ @@ -257,7 +264,10 @@ func TestReadWriteBlobsWithSharding(t *testing.T) { err = server.Start(context.Background()) require.NoError(t, err) }() - defer server.Stop() + defer func() { + err = server.Stop() + require.NoError(t, err) + }() expectedData := make(map[v2.BlobKey][]byte) shardMap := make(map[v2.BlobKey][]v2.RelayKey) @@ -377,7 +387,10 @@ func TestReadWriteChunks(t *testing.T) { err = server.Start(context.Background()) require.NoError(t, err) }() - defer server.Stop() + defer func() { + err = server.Stop() + require.NoError(t, err) + }() expectedData := make(map[v2.BlobKey][]*encoding.Frame) fragmentInfoMap := make(map[v2.BlobKey]*encoding.FragmentInfo) @@ -575,7 +588,10 @@ func TestBatchedReadWriteChunks(t *testing.T) { err = server.Start(context.Background()) require.NoError(t, err) }() - defer server.Stop() + defer func() { + err = server.Stop() + require.NoError(t, err) + }() expectedData := make(map[v2.BlobKey][]*encoding.Frame) fragmentInfoMap := make(map[v2.BlobKey]*encoding.FragmentInfo) @@ -703,7 +719,10 @@ func TestReadWriteChunksWithSharding(t *testing.T) { err = server.Start(context.Background()) require.NoError(t, err) }() - defer server.Stop() + defer func() { + err = server.Stop() + require.NoError(t, err) + }() expectedData := make(map[v2.BlobKey][]*encoding.Frame) fragmentInfoMap := make(map[v2.BlobKey]*encoding.FragmentInfo) @@ -980,7 +999,10 @@ func TestBatchedReadWriteChunksWithSharding(t *testing.T) { err = server.Start(context.Background()) require.NoError(t, err) }() - defer server.Stop() + defer func() { + err = server.Stop() + require.NoError(t, err) + }() expectedData := make(map[v2.BlobKey][]*encoding.Frame) fragmentInfoMap := make(map[v2.BlobKey]*encoding.FragmentInfo)