Skip to content

Commit

Permalink
Relay metrics (#938)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@eigenlabs.org>
  • Loading branch information
cody-littley authored Dec 6, 2024
1 parent b8deee5 commit 9f68f74
Show file tree
Hide file tree
Showing 30 changed files with 1,045 additions and 132 deletions.
10 changes: 0 additions & 10 deletions common/metrics/config.go

This file was deleted.

8 changes: 7 additions & 1 deletion common/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package metrics

import "time"
import (
"github.com/prometheus/client_golang/prometheus"
"time"
)

// Metrics provides a convenient interface for reporting metrics.
type Metrics interface {
Expand Down Expand Up @@ -62,6 +65,9 @@ type Metrics interface {
pollPeriod time.Duration,
source func() float64,
label ...any) error

// RegisterExternalMetrics registers prometheus collectors created outside the metrics framework.
RegisterExternalMetrics(collectors ...prometheus.Collector)
}

// Metric represents a metric that can be reported.
Expand Down
26 changes: 15 additions & 11 deletions common/metrics/metrics_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type metrics struct {
// logger is the logger used to log messages.
logger logging.Logger

// config is the configuration for the metrics.
config *Config
// namespace is prepended to all metric names.
namespace string

// registry is the prometheus registry used to report metrics.
registry *prometheus.Registry
Expand Down Expand Up @@ -55,13 +55,13 @@ type metrics struct {
}

// NewMetrics creates a new Metrics instance.
func NewMetrics(logger logging.Logger, config *Config) Metrics {
func NewMetrics(logger logging.Logger, namespace string, port int) Metrics {
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

logger.Infof("Starting metrics server at port %d", config.HTTPPort)
addr := fmt.Sprintf(":%d", config.HTTPPort)
logger.Infof("Starting metrics server at port %d", port)
addr := fmt.Sprintf(":%d", port)
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(
reg,
Expand All @@ -74,7 +74,7 @@ func NewMetrics(logger logging.Logger, config *Config) Metrics {

m := &metrics{
logger: logger,
config: config,
namespace: namespace,
registry: reg,
metricMap: make(map[metricID]Metric),
isAlive: atomic.Bool{},
Expand Down Expand Up @@ -198,7 +198,7 @@ func (m *metrics) NewLatencyMetric(
metric, err := newLatencyMetric(
m.logger,
m.registry,
m.config.Namespace,
m.namespace,
name,
description,
objectives,
Expand Down Expand Up @@ -238,7 +238,7 @@ func (m *metrics) NewCountMetric(
metric, err := newCountMetric(
m.logger,
m.registry,
m.config.Namespace,
m.namespace,
name, description,
labelTemplate)

Expand Down Expand Up @@ -287,7 +287,7 @@ func (m *metrics) newGaugeMetricUnsafe(
metric, err := newGaugeMetric(
m.logger,
m.registry,
m.config.Namespace,
m.namespace,
name,
unit,
description,
Expand Down Expand Up @@ -368,7 +368,7 @@ func (m *metrics) GenerateMetricsDocumentation() string {
}
slices.SortFunc(metricIDs, sortFunc)

sb.Write([]byte(fmt.Sprintf("# Metrics Documentation for namespace '%s'\n\n", m.config.Namespace)))
sb.Write([]byte(fmt.Sprintf("# Metrics Documentation for namespace '%s'\n\n", m.namespace)))
sb.Write([]byte(fmt.Sprintf("This documentation was automatically generated at time `%s`\n\n",
time.Now().Format(time.RFC3339))))

Expand Down Expand Up @@ -402,7 +402,7 @@ func (m *metrics) GenerateMetricsDocumentation() string {
sb.Write([]byte(fmt.Sprintf("| **Quantiles** | %s |\n", m.quantilesMap[*id])))
}
sb.Write([]byte(fmt.Sprintf("| **Fully Qualified Name** | `%s_%s_%s` |\n",
m.config.Namespace, id.name, id.unit)))
m.namespace, id.name, id.unit)))
}

return sb.String()
Expand All @@ -428,3 +428,7 @@ func (m *metrics) WriteMetricsDocumentation(fileName string) error {

return nil
}

func (m *metrics) RegisterExternalMetrics(collectors ...prometheus.Collector) {
m.registry.MustRegister(collectors...)
}
5 changes: 5 additions & 0 deletions common/metrics/mock_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"time"
)

Expand Down Expand Up @@ -62,6 +63,10 @@ func (m *mockMetrics) NewAutoGauge(
return nil
}

func (m *mockMetrics) RegisterExternalMetrics(collectors ...prometheus.Collector) {

}

var _ CountMetric = &mockCountMetric{}

type mockCountMetric struct {
Expand Down
8 changes: 1 addition & 7 deletions common/metrics/test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,12 @@ type LabelType2 struct {
}

func main() {

metricsConfig := &metrics.Config{
Namespace: "test",
HTTPPort: 9101,
}

logger, err := common.NewLogger(common.DefaultLoggerConfig())
if err != nil {
panic(err)
}

metricsServer := metrics.NewMetrics(logger, metricsConfig)
metricsServer := metrics.NewMetrics(logger, "test", 9101)

l1, err := metricsServer.NewLatencyMetric(
"l1",
Expand Down
3 changes: 2 additions & 1 deletion metrics.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# EigenDA Metrics Documentation

- [churner](operators/churner/churner-metrics.md)
- [churner](operators/churner/mdoc/churner-metrics.md)
- [relay](relay/mdoc/relay-metrics.md)

File renamed without changes.
7 changes: 2 additions & 5 deletions operators/churner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,7 @@ func NewMetrics(httpPort int, logger logging.Logger) (*Metrics, error) {
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

metricsServer := metrics.NewMetrics(logger, &metrics.Config{
Namespace: "eigenda_churner",
HTTPPort: httpPort,
})
metricsServer := metrics.NewMetrics(logger, "eigenda_churner", httpPort)

numRequests, err := metricsServer.NewCountMetric(
"request",
Expand Down Expand Up @@ -100,7 +97,7 @@ func NewMetrics(httpPort int, logger logging.Logger) (*Metrics, error) {

// WriteMetricsDocumentation writes the metrics for the churner to a markdown file.
func (g *Metrics) WriteMetricsDocumentation() error {
return g.metricsServer.WriteMetricsDocumentation("operators/churner/churner-metrics.md")
return g.metricsServer.WriteMetricsDocumentation("operators/churner/mdoc/churner-metrics.md")
}

// ObserveLatency observes the latency of a stage
Expand Down
10 changes: 7 additions & 3 deletions relay/blob_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func newBlobProvider(
blobStore *blobstore.BlobStore,
blobCacheSize uint64,
maxIOConcurrency int,
fetchTimeout time.Duration) (*blobProvider, error) {
fetchTimeout time.Duration,
metrics *cache.CacheAccessorMetrics) (*blobProvider, error) {

server := &blobProvider{
ctx: ctx,
Expand All @@ -42,9 +43,12 @@ func newBlobProvider(
fetchTimeout: fetchTimeout,
}

c := cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize, computeBlobCacheWeight)
cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](
cache.NewFIFOCache[v2.BlobKey, []byte](blobCacheSize, computeBlobCacheWeight),
maxIOConcurrency,
server.fetchBlob,
metrics)

cacheAccessor, err := cache.NewCacheAccessor[v2.BlobKey, []byte](c, maxIOConcurrency, server.fetchBlob)
if err != nil {
return nil, fmt.Errorf("error creating blob cache: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions relay/blob_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func TestReadWrite(t *testing.T) {
blobStore,
1024*1024*32,
32,
10*time.Second)
10*time.Second,
nil)
require.NoError(t, err)

// Read the blobs back.
Expand Down Expand Up @@ -78,7 +79,8 @@ func TestNonExistentBlob(t *testing.T) {
blobStore,
1024*1024*32,
32,
10*time.Second)
10*time.Second,
nil)
require.NoError(t, err)

for i := 0; i < 10; i++ {
Expand Down
48 changes: 42 additions & 6 deletions relay/cache/cache_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"golang.org/x/sync/semaphore"
"sync"
"time"
)

// CacheAccessor is an interface for accessing a resource that is cached. It assumes that cache misses
Expand Down Expand Up @@ -55,17 +56,24 @@ type cacheAccessor[K comparable, V any] struct {

// accessor is the function used to fetch values that are not in the cache.
accessor Accessor[K, V]

// metrics is used to record metrics about the cache accessor's performance.
metrics *CacheAccessorMetrics
}

// NewCacheAccessor creates a new CacheAccessor. The cacheSize parameter specifies the maximum number of items
// that can be stored in the cache. The concurrencyLimit parameter specifies the maximum number of concurrent
// lookups that can be in progress at any given time. If a greater number of lookups are requested, the excess
// lookups will block until a lookup completes. If concurrencyLimit is zero, then no limits are imposed. The accessor
// parameter is the function used to fetch values that are not in the cache.
// NewCacheAccessor creates a new CacheAccessor.
//
// The concurrencyLimit parameter specifies the maximum number of concurrent lookups that can be in progress at any
// given time. If a greater number of lookups are requested, the excess lookups will block until a lookup completes.
// If concurrencyLimit is zero, then no limits are imposed. The accessor parameter is the function used to fetch values that are not in the cache.
//
// If metrics is not nil, it will be used to record metrics about the cache accessor's performance.
// If nil, no metrics will be recorded.
func NewCacheAccessor[K comparable, V any](
cache Cache[K, V],
concurrencyLimit int,
accessor Accessor[K, V]) (CacheAccessor[K, V], error) {
accessor Accessor[K, V],
metrics *CacheAccessorMetrics) (CacheAccessor[K, V], error) {

lookupsInProgress := make(map[K]*accessResult[V])

Expand All @@ -79,6 +87,7 @@ func NewCacheAccessor[K comparable, V any](
concurrencyLimiter: concurrencyLimiter,
accessor: accessor,
lookupsInProgress: lookupsInProgress,
metrics: metrics,
}, nil
}

Expand All @@ -97,6 +106,10 @@ func (c *cacheAccessor[K, V]) Get(ctx context.Context, key K) (V, error) {
v, ok := c.cache.Get(key)
if ok {
c.cacheLock.Unlock()

if c.metrics != nil {
c.metrics.cacheHits.Increment()
}
return v, nil
}

Expand All @@ -109,6 +122,10 @@ func (c *cacheAccessor[K, V]) Get(ctx context.Context, key K) (V, error) {

c.cacheLock.Unlock()

if c.metrics != nil {
c.metrics.cacheMisses.Increment()
}

if alreadyLoading {
// The result is being fetched on another goroutine. Wait for it to finish.
return c.waitForResult(ctx, result)
Expand Down Expand Up @@ -150,11 +167,30 @@ func (c *cacheAccessor[K, V]) fetchResult(ctx context.Context, key K, result *ac
<-c.concurrencyLimiter
}

if c.metrics != nil {
start := time.Now()
defer func() {
c.metrics.cacheMissLatency.ReportLatency(time.Since(start))
}()
}

c.cacheLock.Lock()

// Update the cache if the fetch was successful.
if err == nil {
c.cache.Put(key, value)

if c.metrics != nil {
size := c.cache.Size()
weight := c.cache.Weight()
c.metrics.size.Set(float64(size))
c.metrics.weight.Set(float64(weight))
var averageWeight float64
if size > 0 {
averageWeight = float64(weight) / float64(size)
}
c.metrics.averageWeight.Set(averageWeight)
}
}

// Provide the result to all other goroutines that may be waiting for it.
Expand Down
Loading

0 comments on commit 9f68f74

Please sign in to comment.