Skip to content

Commit

Permalink
op-supervisor: Create clients and monitor chain heads for each L2 cha…
Browse files Browse the repository at this point in the history
…in (#11009)

* op-supervisor: Create clients and monitor chain heads for each L2 chain

* op-supervisor: Remove rpc url from log message

* op-supervisor: Update tickets in TODOs
  • Loading branch information
ajsutton authored Jun 27, 2024
1 parent cce7f9c commit 379973e
Show file tree
Hide file tree
Showing 9 changed files with 582 additions and 9 deletions.
8 changes: 6 additions & 2 deletions op-service/sources/l1_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ type L1ClientConfig struct {
func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProviderKind) *L1ClientConfig {
// Cache 3/2 worth of sequencing window of receipts and txs
span := int(config.SeqWindowSize) * 3 / 2
fullSpan := span
return L1ClientSimpleConfig(trustRPC, kind, span)
}

func L1ClientSimpleConfig(trustRPC bool, kind RPCProviderKind, cacheSize int) *L1ClientConfig {
span := cacheSize
if span > 1000 { // sanity cap. If a large sequencing window is configured, do not make the cache too large
span = 1000
}
Expand All @@ -44,7 +48,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide
MethodResetDuration: time.Minute,
},
// Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors.
L1BlockRefsCacheSize: fullSpan,
L1BlockRefsCacheSize: cacheSize,
}
}

Expand Down
55 changes: 55 additions & 0 deletions op-supervisor/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import (
"math/big"

"github.com/prometheus/client_golang/prometheus"

opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
Expand All @@ -14,6 +16,9 @@ type Metricer interface {

opmetrics.RPCMetricer

CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool)
CacheGet(chainID *big.Int, label string, hit bool)

Document() []opmetrics.DocumentedMetric
}

Expand All @@ -24,6 +29,10 @@ type Metrics struct {

opmetrics.RPCMetrics

SizeVec *prometheus.GaugeVec
GetVec *prometheus.CounterVec
AddVec *prometheus.CounterVec

info prometheus.GaugeVec
up prometheus.Gauge
}
Expand Down Expand Up @@ -61,6 +70,33 @@ func NewMetrics(procName string) *Metrics {
Name: "up",
Help: "1 if the op-supervisor has finished starting up",
}),

SizeVec: factory.NewGaugeVec(prometheus.GaugeOpts{
Namespace: ns,
Name: "source_rpc_cache_size",
Help: "source rpc cache cache size",
}, []string{
"chain",
"type",
}),
GetVec: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "source_rpc_cache_get",
Help: "source rpc cache lookups, hitting or not",
}, []string{
"chain",
"type",
"hit",
}),
AddVec: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Name: "source_rpc_cache_add",
Help: "source rpc cache additions, evicting previous values or not",
}, []string{
"chain",
"type",
"evicted",
}),
}
}

Expand All @@ -82,3 +118,22 @@ func (m *Metrics) RecordUp() {
prometheus.MustRegister()
m.up.Set(1)
}

func (m *Metrics) CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool) {
chain := chainID.String()
m.SizeVec.WithLabelValues(chain, label).Set(float64(cacheSize))
if evicted {
m.AddVec.WithLabelValues(chain, label, "true").Inc()
} else {
m.AddVec.WithLabelValues(chain, label, "false").Inc()
}
}

func (m *Metrics) CacheGet(chainID *big.Int, label string, hit bool) {
chain := chainID.String()
if hit {
m.GetVec.WithLabelValues(chain, label, "true").Inc()
} else {
m.GetVec.WithLabelValues(chain, label, "false").Inc()
}
}
5 changes: 5 additions & 0 deletions op-supervisor/metrics/noop.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import (
"math/big"

opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
)

Expand All @@ -14,3 +16,6 @@ func (*noopMetrics) Document() []opmetrics.DocumentedMetric { return nil }

func (*noopMetrics) RecordInfo(version string) {}
func (*noopMetrics) RecordUp() {}

func (m *noopMetrics) CacheAdd(_ *big.Int, _ string, _ int, _ bool) {}
func (m *noopMetrics) CacheGet(_ *big.Int, _ string, _ bool) {}
39 changes: 36 additions & 3 deletions op-supervisor/supervisor/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,29 @@ package backend
import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/ethereum-optimism/optimism/op-supervisor/config"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/backend/source"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/frontend"
"github.com/ethereum-optimism/optimism/op-supervisor/supervisor/types"
)

type Metrics interface {
source.Metrics
}

type SupervisorBackend struct {
started atomic.Bool
logger log.Logger

chainMonitors []*source.ChainMonitor

// TODO(protocol-quest#287): collection of logdbs per chain
// TODO(protocol-quest#288): collection of logdb updating services per chain
Expand All @@ -24,15 +35,31 @@ var _ frontend.Backend = (*SupervisorBackend)(nil)

var _ io.Closer = (*SupervisorBackend)(nil)

func NewSupervisorBackend() *SupervisorBackend {
return &SupervisorBackend{}
func NewSupervisorBackend(ctx context.Context, logger log.Logger, m Metrics, cfg *config.Config) (*SupervisorBackend, error) {
chainMonitors := make([]*source.ChainMonitor, len(cfg.L2RPCs))
for i, rpc := range cfg.L2RPCs {
monitor, err := source.NewChainMonitor(ctx, logger, m, rpc)
if err != nil {
return nil, fmt.Errorf("failed to create monitor for rpc %v: %w", rpc, err)
}
chainMonitors[i] = monitor
}
return &SupervisorBackend{
logger: logger,
chainMonitors: chainMonitors,
}, nil
}

func (su *SupervisorBackend) Start(ctx context.Context) error {
if !su.started.CompareAndSwap(false, true) {
return errors.New("already started")
}
// TODO(protocol-quest#288): start logdb updating services of all chains
for _, monitor := range su.chainMonitors {
if err := monitor.Start(); err != nil {
return fmt.Errorf("failed to start chain monitor: %w", err)
}
}
return nil
}

Expand All @@ -41,7 +68,13 @@ func (su *SupervisorBackend) Stop(ctx context.Context) error {
return errors.New("already stopped")
}
// TODO(protocol-quest#288): stop logdb updating services of all chains
return nil
var errs error
for _, monitor := range su.chainMonitors {
if err := monitor.Stop(); err != nil {
errs = errors.Join(errs, fmt.Errorf("failed to stop chain monitor: %w", err))
}
}
return errs
}

func (su *SupervisorBackend) Close() error {
Expand Down
94 changes: 94 additions & 0 deletions op-supervisor/supervisor/backend/source/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package source

import (
"context"
"fmt"
"math/big"
"time"

"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/sources"
"github.com/ethereum-optimism/optimism/op-service/sources/caching"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
)

// TODO(optimism#11032) Make these configurable and a sensible default
const epochPollInterval = 30 * time.Second
const pollInterval = 2 * time.Second
const trustRpc = false
const rpcKind = sources.RPCKindStandard

type Metrics interface {
CacheAdd(chainID *big.Int, label string, cacheSize int, evicted bool)
CacheGet(chainID *big.Int, label string, hit bool)
}

// ChainMonitor monitors a source L2 chain, retrieving the data required to populate the database and perform
// interop consolidation. It detects and notifies when reorgs occur.
type ChainMonitor struct {
headMonitor *HeadMonitor
}

func NewChainMonitor(ctx context.Context, logger log.Logger, genericMetrics Metrics, rpc string) (*ChainMonitor, error) {
// First dial a simple client and get the chain ID so we have a simple identifier for the chain.
ethClient, err := dial.DialEthClientWithTimeout(ctx, 10*time.Second, logger, rpc)
if err != nil {
return nil, fmt.Errorf("failed to connect to rpc %v: %w", rpc, err)
}
chainID, err := ethClient.ChainID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load chain id for rpc %v: %w", rpc, err)
}
logger = logger.New("chainID", chainID)
m := newChainMetrics(chainID, genericMetrics)
cl, err := newClient(ctx, logger, m, rpc, ethClient.Client(), pollInterval, trustRpc, rpcKind)
if err != nil {
return nil, err
}
logger.Info("Monitoring chain")
headMonitor := NewHeadMonitor(logger, epochPollInterval, cl, &loggingCallback{logger})
return &ChainMonitor{
headMonitor: headMonitor,
}, nil
}

func (c *ChainMonitor) Start() error {
return c.headMonitor.Start()
}

func (c *ChainMonitor) Stop() error {
return c.headMonitor.Stop()
}

// loggingCallback is a temporary implementation of the head monitor callback that just logs the events.
type loggingCallback struct {
log log.Logger
}

func (n *loggingCallback) OnNewUnsafeHead(_ context.Context, block eth.L1BlockRef) {
n.log.Info("New unsafe head", "block", block)
}

func (n *loggingCallback) OnNewSafeHead(_ context.Context, block eth.L1BlockRef) {
n.log.Info("New safe head", "block", block)
}

func (n *loggingCallback) OnNewFinalizedHead(_ context.Context, block eth.L1BlockRef) {
n.log.Info("New finalized head", "block", block)
}

func newClient(ctx context.Context, logger log.Logger, m caching.Metrics, rpc string, rpcClient *rpc.Client, pollRate time.Duration, trustRPC bool, kind sources.RPCProviderKind) (*sources.L1Client, error) {
c, err := client.NewRPCWithClient(ctx, logger, rpc, client.NewBaseRPCClient(rpcClient), pollRate)
if err != nil {
return nil, fmt.Errorf("failed to create new RPC client: %w", err)
}

l1Client, err := sources.NewL1Client(c, logger, m, sources.L1ClientSimpleConfig(trustRPC, kind, 100))
if err != nil {
return nil, fmt.Errorf("failed to connect client: %w", err)
}
return l1Client, nil
}
31 changes: 31 additions & 0 deletions op-supervisor/supervisor/backend/source/chain_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package source

import (
"math/big"

"github.com/ethereum-optimism/optimism/op-service/sources/caching"
)

// chainMetrics is an adapter between the metrics API expected by clients that assume there's only a single chain
// and the actual metrics implementation which requires a chain ID to identify the source chain.
type chainMetrics struct {
chainID *big.Int
delegate Metrics
}

func newChainMetrics(chainID *big.Int, delegate Metrics) *chainMetrics {
return &chainMetrics{
chainID: chainID,
delegate: delegate,
}
}

func (c *chainMetrics) CacheAdd(label string, cacheSize int, evicted bool) {
c.delegate.CacheAdd(c.chainID, label, cacheSize, evicted)
}

func (c *chainMetrics) CacheGet(label string, hit bool) {
c.delegate.CacheGet(c.chainID, label, hit)
}

var _ caching.Metrics = (*chainMetrics)(nil)
Loading

0 comments on commit 379973e

Please sign in to comment.