Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add caplin metrics for aggregation and block processing #12134

Merged
merged 20 commits into from
Sep 30, 2024
71 changes: 70 additions & 1 deletion cl/monitor/metrics.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package monitor

import "github.com/erigontech/erigon-lib/metrics"
import (
"sync"
"time"

"github.com/erigontech/erigon-lib/metrics"
)

var (
// VALIDATOR METRICS

// metricAttestHit is the number of attestations that hit for those validators we observe within current_epoch-2
metricAttestHit = metrics.GetOrCreateCounter("validator_attestation_hit")
// metricAttestMiss is the number of attestations that miss for those validators we observe within current_epoch-2
Expand All @@ -11,4 +18,66 @@ var (
metricProposerHit = metrics.GetOrCreateCounter("validator_proposal_hit")
// metricProposerMiss is the number of proposals that miss for those validators we observe in previous slot
metricProposerMiss = metrics.GetOrCreateCounter("validator_proposal_miss")

// Block processing metrics
fullBlockProcessingTime = metrics.GetOrCreateGauge("full_block_processing_time")
attestationBlockProcessingTime = metrics.GetOrCreateGauge("attestation_block_processing_time")
batchVerificationThroughput = metrics.GetOrCreateGauge("aggregation_per_signature")

// Network metrics
gossipTopicsMetricCounterPrefix = "gossip_topics_seen"
gossipMetricsMap = sync.Map{}
)

type batchVerificationThroughputMetric struct {
totalVerified uint64
currentAverageSecs float64
mu sync.Mutex
}

var batchVerificationThroughputMetricStruct = &batchVerificationThroughputMetric{}

func (b *batchVerificationThroughputMetric) observe(t time.Duration, totalSigs int) float64 {
b.mu.Lock()
defer b.mu.Unlock()
elapsedInMillisecs := float64(t.Microseconds()) / 1000
if b.totalVerified == 0 {
b.currentAverageSecs = elapsedInMillisecs
} else {
b.currentAverageSecs = (b.currentAverageSecs*float64(b.totalVerified) + elapsedInMillisecs) / float64(b.totalVerified+uint64(totalSigs))
}
b.totalVerified += uint64(totalSigs)
return b.currentAverageSecs
}

func microToMilli(micros int64) float64 {
return float64(micros) / 1000
}

// ObserveAttestHit increments the attestation hit metric
func ObserveAttestationBlockProcessingTime(startTime time.Time) {
attestationBlockProcessingTime.Set(microToMilli(time.Since(startTime).Microseconds()))
}

// ObserveFullBlockProcessingTime increments the full block processing time metric
func ObserveFullBlockProcessingTime(startTime time.Time) {
fullBlockProcessingTime.Set(microToMilli(time.Since(startTime).Microseconds()))
}

// ObserveBatchVerificationThroughput increments the batch verification throughput metric
func ObserveBatchVerificationThroughput(d time.Duration, totalSigs int) {
batchVerificationThroughput.Set(batchVerificationThroughputMetricStruct.observe(d, totalSigs))
}

// ObserveGossipTopicSeen increments the gossip topic seen metric
func ObserveGossipTopicSeen(topic string, l int) {
var metric metrics.Counter
metricI, ok := gossipMetricsMap.LoadOrStore(topic, metrics.GetOrCreateCounter(gossipTopicsMetricCounterPrefix+"_"+topic))
if ok {
metric = metricI.(metrics.Counter)
} else {
metric = metrics.GetOrCreateCounter(gossipTopicsMetricCounterPrefix + "_" + topic)
gossipMetricsMap.Store(topic, metric)
}
metric.Add(float64(l))
}
4 changes: 0 additions & 4 deletions cl/phase1/core/state/ssz.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,21 @@ import (
)

func (b *CachingBeaconState) EncodeSSZ(buf []byte) ([]byte, error) {
h := metrics.NewHistTimer("encode_ssz_beacon_state_dur")
bts, err := b.BeaconState.EncodeSSZ(buf)
if err != nil {
return nil, err
}
h.PutSince()
sz := metrics.NewHistTimer("encode_ssz_beacon_state_size")
sz.Observe(float64(len(bts)))
return bts, err
}

func (b *CachingBeaconState) DecodeSSZ(buf []byte, version int) error {
h := metrics.NewHistTimer("decode_ssz_beacon_state_dur")
if err := b.BeaconState.DecodeSSZ(buf, version); err != nil {
return err
}
sz := metrics.NewHistTimer("decode_ssz_beacon_state_size")
sz.Observe(float64(len(buf)))
h.PutSince()
return b.InitBeaconState()
}

Expand Down
7 changes: 6 additions & 1 deletion cl/phase1/forkchoice/on_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/cl/phase1/execution_client"
"github.com/erigontech/erigon/cl/phase1/forkchoice/fork_graph"
Expand Down Expand Up @@ -145,10 +146,12 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac
}
}
log.Trace("OnBlock: engine", "elapsed", time.Since(startEngine))
startStateProcess := time.Now()
lastProcessedState, status, err := f.forkGraph.AddChainSegment(block, fullValidation)
if err != nil {
return err
}
monitor.ObserveFullBlockProcessingTime(startStateProcess)
switch status {
case fork_graph.PreValidated:
return nil
Expand Down Expand Up @@ -209,11 +212,12 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac
finalizedCheckpoint = lastProcessedState.FinalizedCheckpoint().Copy()
justificationBits = lastProcessedState.JustificationBits().Copy()
)
f.operationsPool.NotifyBlock(block.Block)

// Eagerly compute unrealized justification and finality
if err := statechange.ProcessJustificationBitsAndFinality(lastProcessedState, nil); err != nil {
return err
}
f.operationsPool.NotifyBlock(block.Block)
f.updateUnrealizedCheckpoints(lastProcessedState.CurrentJustifiedCheckpoint().Copy(), lastProcessedState.FinalizedCheckpoint().Copy())
// Set the changed value pre-simulation
lastProcessedState.SetPreviousJustifiedCheckpoint(previousJustifiedCheckpoint)
Expand Down Expand Up @@ -244,6 +248,7 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac
if f.validatorMonitor != nil {
f.validatorMonitor.OnNewBlock(lastProcessedState, block.Block)
}

log.Trace("OnBlock", "elapsed", time.Since(start))
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/erigontech/erigon/cl/cltypes"
"github.com/erigontech/erigon/cl/cltypes/solid"
"github.com/erigontech/erigon/cl/gossip"
"github.com/erigontech/erigon/cl/monitor"
"github.com/erigontech/erigon/cl/phase1/forkchoice"
"github.com/erigontech/erigon/cl/phase1/network/services"
"github.com/erigontech/erigon/cl/utils/eth_clock"
Expand Down Expand Up @@ -136,6 +137,7 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l
SubnetId: data.SubnetId,
Data: common.CopyBytes(data.Data),
}
monitor.ObserveGossipTopicSeen(data.Name, len(data.Data))

if err := g.routeAndProcess(ctx, data); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions cl/phase1/network/services/batch_signature_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/Giulio2002/bls"
sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cl/monitor"
)

const (
Expand Down Expand Up @@ -163,10 +164,12 @@ func (b *BatchSignatureVerifier) handleIncorrectSignatures(aggregateVerification
}

func (b *BatchSignatureVerifier) runBatchVerification(signatures [][]byte, signRoots [][]byte, pks [][]byte, fns []func()) error {
start := time.Now()
valid, err := blsVerifyMultipleSignatures(signatures, signRoots, pks)
if err != nil {
return errors.New("batch signature verification failed with the error: " + err.Error())
}
monitor.ObserveBatchVerificationThroughput(time.Since(start), len(signatures))

if !valid {
return errors.New("batch invalid signature")
Expand Down
1 change: 1 addition & 0 deletions cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ func (s *GossipSubscription) run(ctx context.Context, sub *pubsub.Subscription,
if msg.ReceivedFrom == s.host {
continue
}

s.ch <- &GossipMessage{
From: msg.ReceivedFrom,
TopicName: topicName,
Expand Down
22 changes: 3 additions & 19 deletions cl/transition/impl/eth2/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import (
"slices"
"time"

"github.com/erigontech/erigon-lib/metrics"

"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/monitor"

"github.com/erigontech/erigon/cl/transition/impl/eth2/statechange"

Expand Down Expand Up @@ -536,10 +535,8 @@ func (I *impl) ProcessAttestations(
attestations *solid.ListSSZ[*solid.Attestation],
) error {
attestingIndiciesSet := make([][]uint64, attestations.Len())
h := metrics.NewHistTimer("beacon_process_attestations")
baseRewardPerIncrement := s.BaseRewardPerIncrement()

c := h.Tag("attestation_step", "process")
var err error
if err := solid.RangeErr[*solid.Attestation](attestations, func(i int, a *solid.Attestation, _ int) error {
if attestingIndiciesSet[i], err = I.processAttestation(s, a, baseRewardPerIncrement); err != nil {
Expand All @@ -553,17 +550,16 @@ func (I *impl) ProcessAttestations(
return err
}
var valid bool
c.PutSince()
if I.FullValidation {
c = h.Tag("attestation_step", "validate")
start := time.Now()
valid, err = verifyAttestations(s, attestations, attestingIndiciesSet)
if err != nil {
return err
}
if !valid {
return errors.New("ProcessAttestation: wrong bls data")
}
c.PutSince()
monitor.ObserveAttestationBlockProcessingTime(start)
}

return nil
Expand All @@ -579,9 +575,6 @@ func (I *impl) processAttestationPostAltair(
stateSlot := s.Slot()
beaconConfig := s.BeaconConfig()

h := metrics.NewHistTimer("beacon_process_attestation_post_altair")

c := h.Tag("step", "get_participation_flag")
participationFlagsIndicies, err := s.GetAttestationParticipationFlagIndicies(
data,
stateSlot-data.Slot(),
Expand All @@ -590,22 +583,16 @@ func (I *impl) processAttestationPostAltair(
if err != nil {
return nil, err
}
c.PutSince()

c = h.Tag("step", "get_attesting_indices")

attestingIndicies, err := s.GetAttestingIndicies(data, attestation.AggregationBits(), true)
if err != nil {
return nil, err
}

c.PutSince()

var proposerRewardNumerator uint64

isCurrentEpoch := data.Target().Epoch() == currentEpoch

c = h.Tag("step", "update_attestation")
for _, attesterIndex := range attestingIndicies {
val, err := s.ValidatorEffectiveBalance(int(attesterIndex))
if err != nil {
Expand All @@ -630,14 +617,11 @@ func (I *impl) processAttestationPostAltair(
proposerRewardNumerator += baseReward * weight
}
}
c.PutSince()
// Reward proposer
c = h.Tag("step", "get_proposer_index")
proposer, err := s.GetBeaconProposerIndex()
if err != nil {
return nil, err
}
c.PutSince()
proposerRewardDenominator := (beaconConfig.WeightDenominator - beaconConfig.ProposerWeight) * beaconConfig.WeightDenominator / beaconConfig.ProposerWeight
reward := proposerRewardNumerator / proposerRewardDenominator
if I.BlockRewardsCollector != nil {
Expand Down
3 changes: 0 additions & 3 deletions cl/transition/machine/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package machine
import (
"fmt"

"github.com/erigontech/erigon-lib/metrics"
"github.com/erigontech/erigon/cl/abstract"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/pkg/errors"
Expand All @@ -44,7 +43,6 @@ func ProcessBlock(impl BlockProcessor, s abstract.BeaconState, block cltypes.Gen
if block.Version() != version {
return fmt.Errorf("processBlindedBlock: wrong state version for block at slot %d", block.GetSlot())
}
h := metrics.NewHistTimer("beacon_process_blinded_block")
bodyRoot, err := body.HashSSZ()
if err != nil {
return errors.WithMessagef(err, "processBlindedBlock: failed to hash block body")
Expand Down Expand Up @@ -92,7 +90,6 @@ func ProcessBlock(impl BlockProcessor, s abstract.BeaconState, block cltypes.Gen
}
}

h.PutSince()
return nil
}

Expand Down
1 change: 1 addition & 0 deletions erigon-lib/state/state_changeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func (d *StateChangeSet) SerializeKeys(out []byte) []byte {
ret := out
tmp := make([]byte, 4)
for i := range d.Diffs {

diffSet := d.Diffs[i].GetDiffSet()
binary.BigEndian.PutUint32(tmp, uint32(SerializeDiffSetBufLen(diffSet)))
ret = append(ret, tmp...)
Expand Down
8 changes: 6 additions & 2 deletions eth/stagedsync/exec3.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var (
mxExecGas = metrics.NewCounter(`exec_gas`)
mxExecBlocks = metrics.NewGauge("exec_blocks")

mxMgas = metrics.NewSummary(`exec_mgas`)
mxMgas = metrics.NewGauge(`exec_mgas`)
)

const (
Expand Down Expand Up @@ -203,7 +203,11 @@ func ExecV3(ctx context.Context,
chainConfig, genesis := cfg.chainConfig, cfg.genesis
totalGasUsed := uint64(0)
start := time.Now()
defer func() { mxMgas.Observe((float64(totalGasUsed) / 1e6) / time.Since(start).Seconds()) }()
defer func() {
if totalGasUsed > 0 {
mxMgas.Set((float64(totalGasUsed) / 1e6) / time.Since(start).Seconds())
}
}()

applyTx := txc.Tx
useExternalTx := applyTx != nil
Expand Down
Loading