From 4e24b442978438bb72e75267a9f380cfd384acaf Mon Sep 17 00:00:00 2001 From: Giulio rebuffo Date: Mon, 30 Sep 2024 18:45:48 +0200 Subject: [PATCH] Add caplin metrics for aggregation and block processing (#12134) --- cl/monitor/metrics.go | 71 ++++++++++++++++++- cl/phase1/core/state/ssz.go | 4 -- cl/phase1/forkchoice/on_block.go | 7 +- cl/phase1/network/gossip_manager.go | 2 + .../services/batch_signature_verification.go | 3 + cl/sentinel/gossip.go | 1 + cl/transition/impl/eth2/operations.go | 22 +----- cl/transition/machine/block.go | 3 - erigon-lib/state/state_changeset.go | 1 + eth/stagedsync/exec3.go | 8 ++- 10 files changed, 92 insertions(+), 30 deletions(-) diff --git a/cl/monitor/metrics.go b/cl/monitor/metrics.go index b7f07976c39..a3b012c111b 100644 --- a/cl/monitor/metrics.go +++ b/cl/monitor/metrics.go @@ -1,8 +1,15 @@ package monitor -import "github.com/erigontech/erigon-lib/metrics" +import ( + "sync" + "time" + + "github.com/erigontech/erigon-lib/metrics" +) var ( + // VALIDATOR METRICS + // metricAttestHit is the number of attestations that hit for those validators we observe within current_epoch-2 metricAttestHit = metrics.GetOrCreateCounter("validator_attestation_hit") // metricAttestMiss is the number of attestations that miss for those validators we observe within current_epoch-2 @@ -11,4 +18,66 @@ var ( metricProposerHit = metrics.GetOrCreateCounter("validator_proposal_hit") // metricProposerMiss is the number of proposals that miss for those validators we observe in previous slot metricProposerMiss = metrics.GetOrCreateCounter("validator_proposal_miss") + + // Block processing metrics + fullBlockProcessingTime = metrics.GetOrCreateGauge("full_block_processing_time") + attestationBlockProcessingTime = metrics.GetOrCreateGauge("attestation_block_processing_time") + batchVerificationThroughput = metrics.GetOrCreateGauge("aggregation_per_signature") + + // Network metrics + gossipTopicsMetricCounterPrefix = "gossip_topics_seen" + gossipMetricsMap = sync.Map{} ) + +type batchVerificationThroughputMetric struct { + totalVerified uint64 + currentAverageSecs float64 + mu sync.Mutex +} + +var batchVerificationThroughputMetricStruct = &batchVerificationThroughputMetric{} + +func (b *batchVerificationThroughputMetric) observe(t time.Duration, totalSigs int) float64 { + b.mu.Lock() + defer b.mu.Unlock() + elapsedInMillisecs := float64(t.Microseconds()) / 1000 + if b.totalVerified == 0 { + b.currentAverageSecs = elapsedInMillisecs + } else { + b.currentAverageSecs = (b.currentAverageSecs*float64(b.totalVerified) + elapsedInMillisecs) / float64(b.totalVerified+uint64(totalSigs)) + } + b.totalVerified += uint64(totalSigs) + return b.currentAverageSecs +} + +func microToMilli(micros int64) float64 { + return float64(micros) / 1000 +} + +// ObserveAttestHit increments the attestation hit metric +func ObserveAttestationBlockProcessingTime(startTime time.Time) { + attestationBlockProcessingTime.Set(microToMilli(time.Since(startTime).Microseconds())) +} + +// ObserveFullBlockProcessingTime increments the full block processing time metric +func ObserveFullBlockProcessingTime(startTime time.Time) { + fullBlockProcessingTime.Set(microToMilli(time.Since(startTime).Microseconds())) +} + +// ObserveBatchVerificationThroughput increments the batch verification throughput metric +func ObserveBatchVerificationThroughput(d time.Duration, totalSigs int) { + batchVerificationThroughput.Set(batchVerificationThroughputMetricStruct.observe(d, totalSigs)) +} + +// ObserveGossipTopicSeen increments the gossip topic seen metric +func ObserveGossipTopicSeen(topic string, l int) { + var metric metrics.Counter + metricI, ok := gossipMetricsMap.LoadOrStore(topic, metrics.GetOrCreateCounter(gossipTopicsMetricCounterPrefix+"_"+topic)) + if ok { + metric = metricI.(metrics.Counter) + } else { + metric = metrics.GetOrCreateCounter(gossipTopicsMetricCounterPrefix + "_" + topic) + gossipMetricsMap.Store(topic, metric) + } + metric.Add(float64(l)) +} diff --git a/cl/phase1/core/state/ssz.go b/cl/phase1/core/state/ssz.go index 5623ccf043d..1ada94bc535 100644 --- a/cl/phase1/core/state/ssz.go +++ b/cl/phase1/core/state/ssz.go @@ -22,25 +22,21 @@ import ( ) func (b *CachingBeaconState) EncodeSSZ(buf []byte) ([]byte, error) { - h := metrics.NewHistTimer("encode_ssz_beacon_state_dur") bts, err := b.BeaconState.EncodeSSZ(buf) if err != nil { return nil, err } - h.PutSince() sz := metrics.NewHistTimer("encode_ssz_beacon_state_size") sz.Observe(float64(len(bts))) return bts, err } func (b *CachingBeaconState) DecodeSSZ(buf []byte, version int) error { - h := metrics.NewHistTimer("decode_ssz_beacon_state_dur") if err := b.BeaconState.DecodeSSZ(buf, version); err != nil { return err } sz := metrics.NewHistTimer("decode_ssz_beacon_state_size") sz.Observe(float64(len(buf))) - h.PutSince() return b.InitBeaconState() } diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index e9f502f89d2..f9648968a8a 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -32,6 +32,7 @@ import ( "github.com/erigontech/erigon/cl/clparams" "github.com/erigontech/erigon/cl/cltypes" "github.com/erigontech/erigon/cl/cltypes/solid" + "github.com/erigontech/erigon/cl/monitor" "github.com/erigontech/erigon/cl/phase1/core/state" "github.com/erigontech/erigon/cl/phase1/execution_client" "github.com/erigontech/erigon/cl/phase1/forkchoice/fork_graph" @@ -145,10 +146,12 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac } } log.Trace("OnBlock: engine", "elapsed", time.Since(startEngine)) + startStateProcess := time.Now() lastProcessedState, status, err := f.forkGraph.AddChainSegment(block, fullValidation) if err != nil { return err } + monitor.ObserveFullBlockProcessingTime(startStateProcess) switch status { case fork_graph.PreValidated: return nil @@ -209,11 +212,12 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac finalizedCheckpoint = lastProcessedState.FinalizedCheckpoint().Copy() justificationBits = lastProcessedState.JustificationBits().Copy() ) + f.operationsPool.NotifyBlock(block.Block) + // Eagerly compute unrealized justification and finality if err := statechange.ProcessJustificationBitsAndFinality(lastProcessedState, nil); err != nil { return err } - f.operationsPool.NotifyBlock(block.Block) f.updateUnrealizedCheckpoints(lastProcessedState.CurrentJustifiedCheckpoint().Copy(), lastProcessedState.FinalizedCheckpoint().Copy()) // Set the changed value pre-simulation lastProcessedState.SetPreviousJustifiedCheckpoint(previousJustifiedCheckpoint) @@ -244,6 +248,7 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac if f.validatorMonitor != nil { f.validatorMonitor.OnNewBlock(lastProcessedState, block.Block) } + log.Trace("OnBlock", "elapsed", time.Since(start)) return nil } diff --git a/cl/phase1/network/gossip_manager.go b/cl/phase1/network/gossip_manager.go index 8b9e09e9da5..604f4c65bf2 100644 --- a/cl/phase1/network/gossip_manager.go +++ b/cl/phase1/network/gossip_manager.go @@ -34,6 +34,7 @@ import ( "github.com/erigontech/erigon/cl/cltypes" "github.com/erigontech/erigon/cl/cltypes/solid" "github.com/erigontech/erigon/cl/gossip" + "github.com/erigontech/erigon/cl/monitor" "github.com/erigontech/erigon/cl/phase1/forkchoice" "github.com/erigontech/erigon/cl/phase1/network/services" "github.com/erigontech/erigon/cl/utils/eth_clock" @@ -136,6 +137,7 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l SubnetId: data.SubnetId, Data: common.CopyBytes(data.Data), } + monitor.ObserveGossipTopicSeen(data.Name, len(data.Data)) if err := g.routeAndProcess(ctx, data); err != nil { return err diff --git a/cl/phase1/network/services/batch_signature_verification.go b/cl/phase1/network/services/batch_signature_verification.go index cd262000c50..3f6d83ae625 100644 --- a/cl/phase1/network/services/batch_signature_verification.go +++ b/cl/phase1/network/services/batch_signature_verification.go @@ -8,6 +8,7 @@ import ( "github.com/Giulio2002/bls" sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon/cl/monitor" ) const ( @@ -163,10 +164,12 @@ func (b *BatchSignatureVerifier) handleIncorrectSignatures(aggregateVerification } func (b *BatchSignatureVerifier) runBatchVerification(signatures [][]byte, signRoots [][]byte, pks [][]byte, fns []func()) error { + start := time.Now() valid, err := blsVerifyMultipleSignatures(signatures, signRoots, pks) if err != nil { return errors.New("batch signature verification failed with the error: " + err.Error()) } + monitor.ObserveBatchVerificationThroughput(time.Since(start), len(signatures)) if !valid { return errors.New("batch invalid signature") diff --git a/cl/sentinel/gossip.go b/cl/sentinel/gossip.go index 44c44cd36e9..1c434f24ed3 100644 --- a/cl/sentinel/gossip.go +++ b/cl/sentinel/gossip.go @@ -629,6 +629,7 @@ func (s *GossipSubscription) run(ctx context.Context, sub *pubsub.Subscription, if msg.ReceivedFrom == s.host { continue } + s.ch <- &GossipMessage{ From: msg.ReceivedFrom, TopicName: topicName, diff --git a/cl/transition/impl/eth2/operations.go b/cl/transition/impl/eth2/operations.go index a0328bfda6d..37b35009912 100644 --- a/cl/transition/impl/eth2/operations.go +++ b/cl/transition/impl/eth2/operations.go @@ -23,9 +23,8 @@ import ( "slices" "time" - "github.com/erigontech/erigon-lib/metrics" - "github.com/erigontech/erigon/cl/abstract" + "github.com/erigontech/erigon/cl/monitor" "github.com/erigontech/erigon/cl/transition/impl/eth2/statechange" @@ -536,10 +535,8 @@ func (I *impl) ProcessAttestations( attestations *solid.ListSSZ[*solid.Attestation], ) error { attestingIndiciesSet := make([][]uint64, attestations.Len()) - h := metrics.NewHistTimer("beacon_process_attestations") baseRewardPerIncrement := s.BaseRewardPerIncrement() - c := h.Tag("attestation_step", "process") var err error if err := solid.RangeErr[*solid.Attestation](attestations, func(i int, a *solid.Attestation, _ int) error { if attestingIndiciesSet[i], err = I.processAttestation(s, a, baseRewardPerIncrement); err != nil { @@ -553,9 +550,8 @@ func (I *impl) ProcessAttestations( return err } var valid bool - c.PutSince() if I.FullValidation { - c = h.Tag("attestation_step", "validate") + start := time.Now() valid, err = verifyAttestations(s, attestations, attestingIndiciesSet) if err != nil { return err @@ -563,7 +559,7 @@ func (I *impl) ProcessAttestations( if !valid { return errors.New("ProcessAttestation: wrong bls data") } - c.PutSince() + monitor.ObserveAttestationBlockProcessingTime(start) } return nil @@ -579,9 +575,6 @@ func (I *impl) processAttestationPostAltair( stateSlot := s.Slot() beaconConfig := s.BeaconConfig() - h := metrics.NewHistTimer("beacon_process_attestation_post_altair") - - c := h.Tag("step", "get_participation_flag") participationFlagsIndicies, err := s.GetAttestationParticipationFlagIndicies( data, stateSlot-data.Slot(), @@ -590,22 +583,16 @@ func (I *impl) processAttestationPostAltair( if err != nil { return nil, err } - c.PutSince() - - c = h.Tag("step", "get_attesting_indices") attestingIndicies, err := s.GetAttestingIndicies(data, attestation.AggregationBits(), true) if err != nil { return nil, err } - c.PutSince() - var proposerRewardNumerator uint64 isCurrentEpoch := data.Target().Epoch() == currentEpoch - c = h.Tag("step", "update_attestation") for _, attesterIndex := range attestingIndicies { val, err := s.ValidatorEffectiveBalance(int(attesterIndex)) if err != nil { @@ -630,14 +617,11 @@ func (I *impl) processAttestationPostAltair( proposerRewardNumerator += baseReward * weight } } - c.PutSince() // Reward proposer - c = h.Tag("step", "get_proposer_index") proposer, err := s.GetBeaconProposerIndex() if err != nil { return nil, err } - c.PutSince() proposerRewardDenominator := (beaconConfig.WeightDenominator - beaconConfig.ProposerWeight) * beaconConfig.WeightDenominator / beaconConfig.ProposerWeight reward := proposerRewardNumerator / proposerRewardDenominator if I.BlockRewardsCollector != nil { diff --git a/cl/transition/machine/block.go b/cl/transition/machine/block.go index 262c9595187..608206f6a27 100644 --- a/cl/transition/machine/block.go +++ b/cl/transition/machine/block.go @@ -19,7 +19,6 @@ package machine import ( "fmt" - "github.com/erigontech/erigon-lib/metrics" "github.com/erigontech/erigon/cl/abstract" "github.com/erigontech/erigon/cl/phase1/core/state" "github.com/pkg/errors" @@ -44,7 +43,6 @@ func ProcessBlock(impl BlockProcessor, s abstract.BeaconState, block cltypes.Gen if block.Version() != version { return fmt.Errorf("processBlindedBlock: wrong state version for block at slot %d", block.GetSlot()) } - h := metrics.NewHistTimer("beacon_process_blinded_block") bodyRoot, err := body.HashSSZ() if err != nil { return errors.WithMessagef(err, "processBlindedBlock: failed to hash block body") @@ -92,7 +90,6 @@ func ProcessBlock(impl BlockProcessor, s abstract.BeaconState, block cltypes.Gen } } - h.PutSince() return nil } diff --git a/erigon-lib/state/state_changeset.go b/erigon-lib/state/state_changeset.go index 71827d0a03f..ae02a03a204 100644 --- a/erigon-lib/state/state_changeset.go +++ b/erigon-lib/state/state_changeset.go @@ -247,6 +247,7 @@ func (d *StateChangeSet) SerializeKeys(out []byte) []byte { ret := out tmp := make([]byte, 4) for i := range d.Diffs { + diffSet := d.Diffs[i].GetDiffSet() binary.BigEndian.PutUint32(tmp, uint32(SerializeDiffSetBufLen(diffSet))) ret = append(ret, tmp...) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index cac480a120a..6b231c96f1d 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -74,7 +74,7 @@ var ( mxExecGas = metrics.NewCounter(`exec_gas`) mxExecBlocks = metrics.NewGauge("exec_blocks") - mxMgas = metrics.NewSummary(`exec_mgas`) + mxMgas = metrics.NewGauge(`exec_mgas`) ) const ( @@ -203,7 +203,11 @@ func ExecV3(ctx context.Context, chainConfig, genesis := cfg.chainConfig, cfg.genesis totalGasUsed := uint64(0) start := time.Now() - defer func() { mxMgas.Observe((float64(totalGasUsed) / 1e6) / time.Since(start).Seconds()) }() + defer func() { + if totalGasUsed > 0 { + mxMgas.Set((float64(totalGasUsed) / 1e6) / time.Since(start).Seconds()) + } + }() applyTx := txc.Tx useExternalTx := applyTx != nil