diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index dd0e551690..3c5cf859c0 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -1,13 +1,10 @@ package processor import ( - "encoding/hex" - "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - ethcommon "github.com/ethereum/go-ethereum/common" + ethCommon "github.com/ethereum/go-ethereum/common" "google.golang.org/protobuf/proto" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" @@ -21,12 +18,6 @@ var ( Help: "Total number of signed observations queued for broadcast", }) - observationsPostedInternally = promauto.NewCounter( - prometheus.CounterOpts{ - Name: "wormhole_observations_posted_internally", - Help: "Total number of our observations posted internally", - }) - signedVAAsBroadcast = promauto.NewCounter( prometheus.CounterOpts{ Name: "wormhole_signed_vaas_queued_for_broadcast", @@ -36,17 +27,17 @@ var ( // broadcastSignature broadcasts the observation for something we observed locally. func (p *Processor) broadcastSignature( - o Observation, - signature []byte, + messageID string, txhash []byte, -) { - digest := o.SigningDigest() + digest ethCommon.Hash, + signature []byte, +) (*gossipv1.SignedObservation, []byte) { obsv := gossipv1.SignedObservation{ Addr: p.ourAddr.Bytes(), Hash: digest.Bytes(), Signature: signature, TxHash: txhash, - MessageId: o.MessageID(), + MessageId: messageID, } w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservation{SignedObservation: &obsv}} @@ -59,35 +50,10 @@ func (p *Processor) broadcastSignature( // Broadcast the observation. p.gossipSendC <- msg observationsBroadcast.Inc() - observationsPostedInternally.Inc() - - hash := hex.EncodeToString(digest.Bytes()) - - s := p.state.signatures[hash] - if s == nil { - s = &state{ - firstObserved: time.Now(), - nextRetry: time.Now().Add(nextRetryDuration(0)), - signatures: map[ethcommon.Address][]byte{}, - source: "loopback", - } - - p.state.signatures[hash] = s - } - - s.ourObservation = o - s.ourMsg = msg - s.txHash = txhash - s.source = o.GetEmitterChain().String() - s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs - s.signatures[p.ourAddr] = signature - - // Fast path for our own signature. - start := time.Now() - p.handleObservationAlreadyVerified(&obsv, s, s.gs, hash) - timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) + return &obsv, msg } +// broadcastSignedVAA broadcasts a VAA to the gossip network. func (p *Processor) broadcastSignedVAA(v *vaa.VAA) { b, err := v.Marshal() if err != nil { diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index 14c72d1019..18843ffe33 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -290,9 +290,9 @@ func (p *Processor) signedVaaAlreadyInDB(hash string, s *state) (bool, error) { ) } return false, nil - } else { - return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err) } + + return false, fmt.Errorf(`failed to look up message id "%s" in db: %w`, s.ourObservation.MessageID(), err) } v, err = vaa.Unmarshal(vb) diff --git a/node/pkg/processor/message.go b/node/pkg/processor/message.go index c5351bf363..c23c0a55e9 100644 --- a/node/pkg/processor/message.go +++ b/node/pkg/processor/message.go @@ -2,12 +2,14 @@ package processor import ( "encoding/hex" + "time" "github.com/mr-tron/base58" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + ethCommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -26,13 +28,6 @@ var ( Help: "Total number of messages observed", }, []string{"emitter_chain"}) - - messagesSignedTotal = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "wormhole_message_observations_signed_total", - Help: "Total number of message observations that were successfully signed", - }, - []string{"emitter_chain"}) ) // handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An @@ -48,18 +43,7 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { return } - if p.logger.Core().Enabled(zapcore.DebugLevel) { - p.logger.Debug("message publication confirmed", - zap.String("message_id", k.MessageIDString()), - zap.Uint32("nonce", k.Nonce), - zap.Stringer("txhash", k.TxHash), - zap.Time("timestamp", k.Timestamp), - ) - } - - messagesObservedTotal.With(prometheus.Labels{ - "emitter_chain": k.EmitterChain.String(), - }).Add(1) + messagesObservedTotal.WithLabelValues(k.EmitterChain.String()).Inc() // All nodes will create the exact same VAA and sign its digest. // Consensus is established on this digest. @@ -83,9 +67,10 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { // Generate digest of the unsigned VAA. digest := v.SigningDigest() + hash := hex.EncodeToString(digest.Bytes()) // Sign the digest using our node's guardian key. - s, err := crypto.Sign(digest.Bytes(), p.gk) + signature, err := crypto.Sign(digest.Bytes(), p.gk) if err != nil { panic(err) } @@ -95,16 +80,43 @@ func (p *Processor) handleMessage(k *common.MessagePublication) { zap.String("message_id", k.MessageIDString()), zap.Stringer("txhash", k.TxHash), zap.String("txhash_b58", base58.Encode(k.TxHash.Bytes())), - zap.String("digest", hex.EncodeToString(digest.Bytes())), + zap.String("hash", hash), zap.Uint32("nonce", k.Nonce), + zap.Time("timestamp", k.Timestamp), zap.Uint8("consistency_level", k.ConsistencyLevel), - zap.String("signature", hex.EncodeToString(s)), + zap.String("signature", hex.EncodeToString(signature)), zap.Bool("isReobservation", k.IsReobservation), ) } - messagesSignedTotal.With(prometheus.Labels{ - "emitter_chain": k.EmitterChain.String()}).Add(1) + // Broadcast the signature. + obsv, msg := p.broadcastSignature(v.MessageID(), k.TxHash.Bytes(), digest, signature) - p.broadcastSignature(v, s, k.TxHash.Bytes()) + // Get / create our state entry. + s := p.state.signatures[hash] + if s == nil { + s = &state{ + firstObserved: time.Now(), + nextRetry: time.Now().Add(nextRetryDuration(0)), + signatures: map[ethCommon.Address][]byte{}, + source: "loopback", + } + + p.state.signatures[hash] = s + } + + // Update our state. + s.ourObservation = v + s.txHash = k.TxHash.Bytes() + s.source = v.GetEmitterChain().String() + s.gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs + s.signatures[p.ourAddr] = signature + s.ourMsg = msg + + // Fast path for our own signature. + if !s.submitted { + start := time.Now() + p.checkForQuorum(obsv, s, s.gs, hash) + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) + } } diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index 5a647cf758..31a34b98c3 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -209,74 +209,76 @@ func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1 } s.signatures[their_addr] = m.Signature - p.handleObservationAlreadyVerified(m, s, gs, hash) - timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) - observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) -} -// handleObservationAlreadyVerified handles an observation after it's validity has been confirmed. It is called both for local and external observations. -func (p *Processor) handleObservationAlreadyVerified(m *gossipv1.SignedObservation, s *state, gs *node_common.GuardianSet, hash string) { - if s.submitted { - return - } if s.ourObservation != nil { - // We have made this observation on chain! - - // Check if we have more signatures than required for quorum. - // s.signatures may contain signatures from multiple guardian sets during guardian set updates - // Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation, - // but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set. - // We will later check for quorum again after assembling the VAA for a particular guardian set. - if len(s.signatures) < gs.Quorum() { - // no quorum yet, we're done here - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("quorum not yet met", - zap.String("messageId", m.MessageId), - zap.String("digest", hash), - ) - } - return + p.checkForQuorum(m, s, gs, hash) + } else { + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("we have not yet seen this observation yet", + zap.String("messageId", m.MessageId), + zap.String("digest", hash), + ) } + // Keep going to update metrics. + } - // Now we *may* have quorum, depending on the guardian set in use. - // Let's construct the VAA and check if we actually have quorum. - sigsVaaFormat := signaturesToVaaFormat(s.signatures, gs.Keys) + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) + observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) +} +// checkForQuorum checks for quorum after a valid signature has been added to the observation state. If quorum is met, it broadcasts the signed VAA. This function +// is called both for local and external observations. It assumes we that we have made the observation ourselves but have not already submitted the VAA. +func (p *Processor) checkForQuorum(m *gossipv1.SignedObservation, s *state, gs *node_common.GuardianSet, hash string) { + // Check if we have more signatures than required for quorum. + // s.signatures may contain signatures from multiple guardian sets during guardian set updates + // Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation, + // but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set. + // We will later check for quorum again after assembling the VAA for a particular guardian set. + if len(s.signatures) < gs.Quorum() { + // no quorum yet, we're done here if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian + p.logger.Debug("quorum not yet met", zap.String("messageId", m.MessageId), zap.String("digest", hash), - zap.Any("set", gs.KeysAsHexStrings()), - zap.Uint32("index", gs.Index), - zap.Int("required_sigs", gs.Quorum()), - zap.Int("have_sigs", len(sigsVaaFormat)), - zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum()), ) } + return + } - if len(sigsVaaFormat) >= gs.Quorum() { - // we have reached quorum *with the active guardian set* - start := time.Now() - s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) - timeToHandleQuorum.Observe(float64(time.Since(start).Microseconds())) - } else { - if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("quorum not met, doing nothing", - zap.String("messageId", m.MessageId), - zap.String("digest", hash), - ) - } - } - } else { + // Now we *may* have quorum, depending on the guardian set in use. + // Let's construct the VAA and check if we actually have quorum. + sigsVaaFormat := signaturesToVaaFormat(s.signatures, gs.Keys) + + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian + zap.String("messageId", m.MessageId), + zap.String("digest", hash), + zap.Any("set", gs.KeysAsHexStrings()), + zap.Uint32("index", gs.Index), + zap.Int("required_sigs", gs.Quorum()), + zap.Int("have_sigs", len(sigsVaaFormat)), + zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum()), + ) + } + + if len(sigsVaaFormat) < gs.Quorum() { if p.logger.Level().Enabled(zapcore.DebugLevel) { - p.logger.Debug("we have not yet seen this observation - temporarily storing signature", // 175K out of 3M info messages / hour / guardian + p.logger.Debug("quorum not met, doing nothing", zap.String("messageId", m.MessageId), zap.String("digest", hash), ) } + return } + + // We have reached quorum *with the active guardian set*. + start := time.Now() + s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) + s.submitted = true + timeToHandleQuorum.Observe(float64(time.Since(start).Microseconds())) } +// handleInboundSignedVAAWithQuorum takes a VAA received from the network. If we have not already seen it and it is valid, we store it in the database. func (p *Processor) handleInboundSignedVAAWithQuorum(m *gossipv1.SignedVAAWithQuorum) { v, err := vaa.Unmarshal(m.Vaa) if err != nil { diff --git a/node/pkg/processor/vaa.go b/node/pkg/processor/vaa.go index 7d41bbc244..1f4a64fc86 100644 --- a/node/pkg/processor/vaa.go +++ b/node/pkg/processor/vaa.go @@ -11,6 +11,7 @@ type VAA struct { Reobservation bool } +// HandleQuorum is called when a VAA reaches quorum. It publishes the VAA to the gossip network and stores it in the database. func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { // Deep copy the observation and add signatures signed := &vaa.VAA{ @@ -26,12 +27,12 @@ func (v *VAA) HandleQuorum(sigs []*vaa.Signature, hash string, p *Processor) { ConsistencyLevel: v.ConsistencyLevel, } - // Store signed VAA in database. p.logger.Info("signed VAA with quorum", zap.String("message_id", signed.MessageID()), zap.String("digest", hash), ) + // Broadcast the VAA and store it in the database. p.broadcastSignedVAA(signed) p.storeSignedVAA(signed) }