From b92af5ffa9c856c0e8d7671c8037fd289813108c Mon Sep 17 00:00:00 2001 From: Evan Gray Date: Mon, 17 Jun 2024 13:25:13 -0400 Subject: [PATCH 1/5] WIP: topic split --- node/pkg/node/node.go | 20 +- node/pkg/node/options.go | 5 +- node/pkg/p2p/p2p.go | 910 ++++++++++++++++++++---------- node/pkg/p2p/run_params.go | 5 + node/pkg/p2p/watermark_test.go | 14 +- node/pkg/processor/broadcast.go | 4 +- node/pkg/processor/cleanup.go | 5 +- node/pkg/processor/observation.go | 2 + node/pkg/processor/processor.go | 68 ++- 9 files changed, 721 insertions(+), 312 deletions(-) diff --git a/node/pkg/node/node.go b/node/pkg/node/node.go index d957db3218..92cff95d29 100644 --- a/node/pkg/node/node.go +++ b/node/pkg/node/node.go @@ -20,8 +20,14 @@ import ( ) const ( - // gossipSendBufferSize configures the size of the gossip network send buffer - gossipSendBufferSize = 5000 + // gossipControlSendBufferSize configures the size of the gossip network send buffer + gossipControlSendBufferSize = 100 + + // gossipAttestationSendBufferSize configures the size of the gossip network send buffer + gossipAttestationSendBufferSize = 5000 + + // gossipVaaSendBufferSize configures the size of the gossip network send buffer + gossipVaaSendBufferSize = 5000 // inboundObservationBufferSize configures the size of the obsvC channel that contains observations from other Guardians. // One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s @@ -69,8 +75,10 @@ type G struct { runnables map[string]supervisor.Runnable // various channels - // Outbound gossip message queue (needs to be read/write because p2p needs read/write) - gossipSendC chan []byte + // Outbound gossip message queues (needs to be read/write because p2p needs read/write) + gossipControlSendC chan []byte + gossipAttestationSendC chan []byte + gossipVaaSendC chan []byte // Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations. obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation] // Finalized guardian observations aggregated across all chains @@ -109,7 +117,9 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) { g.rootCtxCancel = rootCtxCancel // Setup various channels... - g.gossipSendC = make(chan []byte, gossipSendBufferSize) + g.gossipControlSendC = make(chan []byte, gossipControlSendBufferSize) + g.gossipAttestationSendC = make(chan []byte, gossipAttestationSendBufferSize) + g.gossipVaaSendC = make(chan []byte, gossipVaaSendBufferSize) g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize) g.msgC = makeChannelPair[*common.MessagePublication](0) g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup. diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 7bd0d88be1..8bdef28724 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -65,7 +65,7 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers, nodeName, g.gk, g.obsvC, - g.signedInC.writeC, + nil, // g.signedInC.writeC, g.obsvReqC.writeC, g.gossipSendC, g.obsvReqSendC.readC, @@ -564,7 +564,8 @@ func GuardianOptionProcessor() *GuardianOption { g.db, g.msgC.readC, g.setC.readC, - g.gossipSendC, + g.gossipAttestationSendC, + g.gossipVaaSendC, g.obsvC, g.obsvReqSendC.writeC, g.signedInC.readC, diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index ed353938b3..9b2bddc379 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -54,11 +54,11 @@ var ( Name: "wormhole_p2p_heartbeats_sent_total", Help: "Total number of p2p heartbeats sent", }) - p2pMessagesSent = promauto.NewCounter( + p2pMessagesSent = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "wormhole_p2p_broadcast_messages_sent_total", Help: "Total number of p2p pubsub broadcast messages sent", - }) + }, []string{"type"}) p2pMessagesReceived = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "wormhole_p2p_broadcast_messages_received_total", @@ -301,6 +301,9 @@ func Run(params *RunParams) func(ctx context.Context) error { } return func(ctx context.Context) error { + p2pMessagesSent.WithLabelValues("control").Add(0) + p2pMessagesSent.WithLabelValues("attestation").Add(0) + p2pMessagesSent.WithLabelValues("vaa").Add(0) p2pReceiveChannelOverflow.WithLabelValues("observation").Add(0) p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Add(0) p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Add(0) @@ -330,8 +333,6 @@ func Run(params *RunParams) func(ctx context.Context) error { panic(err) } - topic := fmt.Sprintf("%s/%s", params.networkID, "broadcast") - bootstrappers, bootstrapNode := BootstrapAddrs(logger, params.bootstrapPeers, h.ID()) if bootstrapNode { @@ -342,7 +343,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } } - logger.Info("Subscribing pubsub topic", zap.String("topic", topic)) + logger.Info("connecting to pubsub") ourTracer := &traceHandler{} ps, err := pubsub.NewGossipSub(ctx, h, pubsub.WithValidateQueueSize(P2P_VALIDATE_QUEUE_SIZE), @@ -355,24 +356,84 @@ func Run(params *RunParams) func(ctx context.Context) error { panic(err) } - th, err := ps.Join(topic) - if err != nil { - return fmt.Errorf("failed to join topic: %w", err) + // These will only be non-nil if the application plans to listen for or publish to that topic. + var controlPubsubTopic, attestationPubsubTopic, vaaPubsubTopic *pubsub.Topic + var controlSubscription, vaaSubscription, attestationSubscription *pubsub.Subscription + + // Set up the control channel. //////////////////////////////////////////////////////////////////// + if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqC != nil || params.signedGovCfg != nil || params.signedGovSt != nil { + controlTopic := fmt.Sprintf("%s/%s", params.networkID, "control") + logger.Info("joining the control topic", zap.String("topic", controlTopic)) + controlPubsubTopic, err = ps.Join(controlTopic) + if err != nil { + return fmt.Errorf("failed to join the control topic: %w", err) + } + + defer func() { + if err := controlPubsubTopic.Close(); err != nil && !errors.Is(err, context.Canceled) { + logger.Error("Error closing the control topic", zap.Error(err)) + } + }() + + if params.obsvReqC != nil || params.signedGovCfg != nil || params.signedGovSt != nil { + logger.Info("subscribing to the control topic", zap.String("topic", controlTopic)) + controlSubscription, err = controlPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) + if err != nil { + return fmt.Errorf("failed to subscribe to the control topic: %w", err) + } + defer controlSubscription.Cancel() + } } - defer func() { - if err := th.Close(); err != nil && !errors.Is(err, context.Canceled) { - logger.Error("Error closing the topic", zap.Error(err)) + // Set up the attestation channel. //////////////////////////////////////////////////////////////////// + if params.gossipAttestationSendC != nil || params.obsvC != nil || params.batchObsvC != nil { + attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation") + logger.Info("joining the attestation topic", zap.String("topic", attestationTopic)) + attestationPubsubTopic, err = ps.Join(attestationTopic) + if err != nil { + return fmt.Errorf("failed to join the attestation topic: %w", err) } - }() - // Increase the buffer size to prevent failed delivery - // to slower subscribers - sub, err := th.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) - if err != nil { - return fmt.Errorf("failed to subscribe topic: %w", err) + defer func() { + if err := attestationPubsubTopic.Close(); err != nil && !errors.Is(err, context.Canceled) { + logger.Error("Error closing the attestation topic", zap.Error(err)) + } + }() + + if params.obsvC != nil || params.batchObsvC != nil { + logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic)) + attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) + if err != nil { + return fmt.Errorf("failed to subscribe to the attestation topic: %w", err) + } + defer attestationSubscription.Cancel() + } + } + + // Set up the VAA channel. //////////////////////////////////////////////////////////////////// + if params.gossipVaaSendC != nil || params.signedInC != nil { + vaaTopic := fmt.Sprintf("%s/%s", params.networkID, "broadcast") + logger.Info("joining the vaa topic", zap.String("topic", vaaTopic)) + vaaPubsubTopic, err = ps.Join(vaaTopic) + if err != nil { + return fmt.Errorf("failed to join the vaa topic: %w", err) + } + + defer func() { + if err := vaaPubsubTopic.Close(); err != nil && !errors.Is(err, context.Canceled) { + logger.Error("Error closing the vaa topic", zap.Error(err)) + } + }() + + if params.signedInC != nil { + logger.Info("subscribing to the vaa topic", zap.String("topic", vaaTopic)) + vaaSubscription, err = vaaPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) + if err != nil { + return fmt.Errorf("failed to subscribe to the vaa topic: %w", err) + } + defer vaaSubscription.Cancel() + } } - defer sub.Cancel() // Make sure we connect to at least 1 bootstrap node (this is particularly important in a local devnet and CI // as peer discovery can take a long time). @@ -426,336 +487,613 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - go func() { - // Disable heartbeat when no node name is provided (spy mode) - if params.nodeName == "" { - return - } - ourAddr := ethcrypto.PubkeyToAddress(params.gk.PublicKey) + // Start up heartbeating if it is enabled. + if params.nodeName != "" { + go func() { + ourAddr := ethcrypto.PubkeyToAddress(params.gk.PublicKey) + + ctr := int64(0) + // Guardians should send out their first heartbeat immediately to speed up test runs. + // But we also want to wait a little bit such that network connections can be established by then. + timer := time.NewTimer(time.Second * 2) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + timer.Reset(15 * time.Second) + + // create a heartbeat + b := func() []byte { + DefaultRegistry.mu.Lock() + defer DefaultRegistry.mu.Unlock() + networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats)) + for _, v := range DefaultRegistry.networkStats { + errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id)) + v.ErrorCount = errCtr + networks = append(networks, v) + } + + features := make([]string, 0) + if params.gov != nil { + if params.gov.IsFlowCancelEnabled() { + features = append(features, "governor:fc") + } else { + features = append(features, "governor") + } + } + if params.acct != nil { + features = append(features, params.acct.FeatureString()) + } + if params.ibcFeaturesFunc != nil { + ibcFlags := params.ibcFeaturesFunc() + if ibcFlags != "" { + features = append(features, ibcFlags) + } + } + if params.gatewayRelayerEnabled { + features = append(features, "gwrelayer") + } + if params.ccqEnabled { + features = append(features, "ccq") + } + + heartbeat := &gossipv1.Heartbeat{ + NodeName: params.nodeName, + Counter: ctr, + Timestamp: time.Now().UnixNano(), + Networks: networks, + Version: version.Version(), + GuardianAddr: ourAddr.String(), + BootTimestamp: bootTime.UnixNano(), + Features: features, + } + + if params.components.P2PIDInHeartbeat { + heartbeat.P2PNodeId = nodeIdBytes + } - ctr := int64(0) - // Guardians should send out their first heartbeat immediately to speed up test runs. - // But we also want to wait a little bit such that network connections can be established by then. - timer := time.NewTimer(time.Second * 2) - defer timer.Stop() + if err := params.gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil { + panic(err) + } + collectNodeMetrics(ourAddr, h.ID(), heartbeat) + + if params.gov != nil { + params.gov.CollectMetrics(heartbeat, params.gossipControlSendC, params.gk, ourAddr) + } + + msg := gossipv1.GossipMessage{ + Message: &gossipv1.GossipMessage_SignedHeartbeat{ + SignedHeartbeat: createSignedHeartbeat(params.gk, heartbeat), + }, + } + + b, err := proto.Marshal(&msg) + if err != nil { + panic(err) + } + return b + }() + err = controlPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Warn("failed to publish heartbeat message", zap.Error(err)) + } + + p2pHeartbeatsSent.Inc() + ctr += 1 + } + } + }() + } + + // This routine processes messages received from the internal channels and publishes them to gossip. /////////////////// + // NOTE: The go specification says that it is safe to receive on a nil channel, it just blocks forever. + go func() { for { select { case <-ctx.Done(): return - case <-timer.C: - timer.Reset(15 * time.Second) - - // create a heartbeat - b := func() []byte { - DefaultRegistry.mu.Lock() - defer DefaultRegistry.mu.Unlock() - networks := make([]*gossipv1.Heartbeat_Network, 0, len(DefaultRegistry.networkStats)) - for _, v := range DefaultRegistry.networkStats { - errCtr := DefaultRegistry.GetErrorCount(vaa.ChainID(v.Id)) - v.ErrorCount = errCtr - networks = append(networks, v) - } - - features := make([]string, 0) - if params.gov != nil { - if params.gov.IsFlowCancelEnabled() { - features = append(features, "governor:fc") - } else { - features = append(features, "governor") - } - } - if params.acct != nil { - features = append(features, params.acct.FeatureString()) + case msg := <-params.gossipControlSendC: + if controlPubsubTopic != nil { + err := controlPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Error("failed to publish message from control queue", zap.Error(err)) } - if params.ibcFeaturesFunc != nil { - ibcFlags := params.ibcFeaturesFunc() - if ibcFlags != "" { - features = append(features, ibcFlags) - } + } else { + logger.Error("received a message on the control queue when we do not have a control topic") + } + case msg := <-params.gossipAttestationSendC: + if attestationPubsubTopic != nil { + err := attestationPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("attestation").Inc() + if err != nil { + logger.Error("failed to publish message from attestation queue", zap.Error(err)) } - if params.gatewayRelayerEnabled { - features = append(features, "gwrelayer") + } else { + logger.Error("received a message on the attestation queue when we do not have an attestation topic") + } + case msg := <-params.gossipVaaSendC: + if vaaPubsubTopic != nil { + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("vaa").Inc() + if err != nil { + logger.Error("failed to publish message from vaa queue", zap.Error(err)) } - if params.ccqEnabled { - features = append(features, "ccq") + } else { + logger.Error("received a message on the vaa queue when we do not have a vaa topic") + } + case msg := <-params.obsvReqSendC: + if controlPubsubTopic != nil { + b, err := proto.Marshal(msg) + if err != nil { + panic(err) } - heartbeat := &gossipv1.Heartbeat{ - NodeName: params.nodeName, - Counter: ctr, - Timestamp: time.Now().UnixNano(), - Networks: networks, - Version: version.Version(), - GuardianAddr: ourAddr.String(), - BootTimestamp: bootTime.UnixNano(), - Features: features, + // Sign the observation request using our node's guardian key. + digest := signedObservationRequestDigest(b) + sig, err := ethcrypto.Sign(digest.Bytes(), params.gk) + if err != nil { + panic(err) } - if params.components.P2PIDInHeartbeat { - heartbeat.P2PNodeId = nodeIdBytes + sReq := &gossipv1.SignedObservationRequest{ + ObservationRequest: b, + Signature: sig, + GuardianAddr: ethcrypto.PubkeyToAddress(params.gk.PublicKey).Bytes(), } - if err := params.gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil { - panic(err) - } - collectNodeMetrics(ourAddr, h.ID(), heartbeat) + envelope := &gossipv1.GossipMessage{ + Message: &gossipv1.GossipMessage_SignedObservationRequest{ + SignedObservationRequest: sReq}} - if params.gov != nil { - params.gov.CollectMetrics(heartbeat, params.gossipSendC, params.gk, ourAddr) + b, err = proto.Marshal(envelope) + if err != nil { + panic(err) } - msg := gossipv1.GossipMessage{ - Message: &gossipv1.GossipMessage_SignedHeartbeat{ - SignedHeartbeat: createSignedHeartbeat(params.gk, heartbeat), - }, + // Send to local observation request queue (the loopback message is ignored) + if params.obsvReqC != nil { + params.obsvReqC <- msg } - b, err := proto.Marshal(&msg) + err = controlPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("control").Inc() if err != nil { - panic(err) + logger.Error("failed to publish observation request", zap.Error(err)) + } else { + logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) } - return b - }() - - err = th.Publish(ctx, b) - if err != nil { - logger.Warn("failed to publish heartbeat message", zap.Error(err)) } - - p2pHeartbeatsSent.Inc() - ctr += 1 } } }() - go func() { - for { - select { - case <-ctx.Done(): - return - case msg := <-params.gossipSendC: - err := th.Publish(ctx, msg) - p2pMessagesSent.Inc() + errC := make(chan error) + + // This routine processes control messages received from gossip. ////////////////////////////////////////////// + if controlSubscription != nil { + go func() { + for { + envelope, err := controlSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled if err != nil { - logger.Error("failed to publish message from queue", zap.Error(err)) + errC <- fmt.Errorf("failed to receive pubsub message on control subscription: %w", err) + return } - case msg := <-params.obsvReqSendC: - b, err := proto.Marshal(msg) + + var msg gossipv1.GossipMessage + err = proto.Unmarshal(envelope.Data, &msg) if err != nil { - panic(err) + logger.Info("received invalid message", + zap.Binary("data", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + p2pMessagesReceived.WithLabelValues("invalid").Inc() + continue } - // Sign the observation request using our node's guardian key. - digest := signedObservationRequestDigest(b) - sig, err := ethcrypto.Sign(digest.Bytes(), params.gk) - if err != nil { - panic(err) + if envelope.GetFrom() == h.ID() { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) + } + p2pMessagesReceived.WithLabelValues("loopback").Inc() + continue } - sReq := &gossipv1.SignedObservationRequest{ - ObservationRequest: b, - Signature: sig, - GuardianAddr: ethcrypto.PubkeyToAddress(params.gk.PublicKey).Bytes(), + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("received message", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) } - envelope := &gossipv1.GossipMessage{ - Message: &gossipv1.GossipMessage_SignedObservationRequest{ - SignedObservationRequest: sReq}} + switch m := msg.Message.(type) { + case *gossipv1.GossipMessage_SignedHeartbeat: + s := m.SignedHeartbeat + gs := params.gst.Get() + if gs == nil { + // No valid guardian set yet - dropping heartbeat + logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", + zap.Any("value", s), + zap.String("from", envelope.GetFrom().String())) + break + } + if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil { + p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() + logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", + zap.Error(err), + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } else { + p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc() + logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received", + zap.Any("value", heartbeat), + zap.String("from", envelope.GetFrom().String())) - b, err = proto.Marshal(envelope) - if err != nil { - panic(err) - } + func() { + if len(heartbeat.P2PNodeId) != 0 { + params.components.ProtectedHostByGuardianKeyLock.Lock() + defer params.components.ProtectedHostByGuardianKeyLock.Unlock() + var peerId peer.ID + if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil { + logger.Error("p2p_node_id_in_heartbeat_invalid", + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } else { + guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) + if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) { + prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr] + if ok { + if prevPeerId != peerId { + logger.Info("p2p_guardian_peer_changed", + zap.String("guardian_addr", guardianAddr.String()), + zap.String("prevPeerId", prevPeerId.String()), + zap.String("newPeerId", peerId.String()), + ) + params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat") + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId + } + } else { + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId + } + } + } + } else { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName)) + } + } + }() + } + case *gossipv1.GossipMessage_SignedObservationRequest: + if params.obsvReqC != nil { + s := m.SignedObservationRequest + gs := params.gst.Get() + if gs == nil { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) + } + break + } + r, err := processSignedObservationRequest(s, gs) + if err != nil { + p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc() + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("invalid signed observation request received", + zap.Error(err), + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } + } else { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String())) + } - // Send to local observation request queue (the loopback message is ignored) - if params.obsvReqC != nil { - params.obsvReqC <- msg + select { + case params.obsvReqC <- r: + p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() + default: + p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() + } + } + } + case *gossipv1.GossipMessage_SignedChainGovernorConfig: + if params.signedGovCfg != nil { + params.signedGovCfg <- m.SignedChainGovernorConfig + } + case *gossipv1.GossipMessage_SignedChainGovernorStatus: + if params.signedGovSt != nil { + params.signedGovSt <- m.SignedChainGovernorStatus + } + default: + p2pMessagesReceived.WithLabelValues("unknown").Inc() + logger.Warn("received unknown message type on control topic (running outdated software?)", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) } + } + }() + } - err = th.Publish(ctx, b) - p2pMessagesSent.Inc() + // This routine processes attestation messages received from gossip. ////////////////////////////////////////////// + if attestationSubscription != nil { + go func() { + for { + envelope, err := attestationSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled if err != nil { - logger.Error("failed to publish observation request", zap.Error(err)) - } else { - logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) + errC <- fmt.Errorf("failed to receive pubsub message on attestation subscription: %w", err) + return } - } - } - }() - for { - envelope, err := sub.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled - if err != nil { - return fmt.Errorf("failed to receive pubsub message: %w", err) - } - - var msg gossipv1.GossipMessage - err = proto.Unmarshal(envelope.Data, &msg) - if err != nil { - logger.Info("received invalid message", - zap.Binary("data", envelope.Data), - zap.String("from", envelope.GetFrom().String())) - p2pMessagesReceived.WithLabelValues("invalid").Inc() - continue - } + var msg gossipv1.GossipMessage + err = proto.Unmarshal(envelope.Data, &msg) + if err != nil { + logger.Info("received invalid message", + zap.Binary("data", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + p2pMessagesReceived.WithLabelValues("invalid").Inc() + continue + } - if envelope.GetFrom() == h.ID() { - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) - } - p2pMessagesReceived.WithLabelValues("loopback").Inc() - continue - } + if envelope.GetFrom() == h.ID() { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) + } + p2pMessagesReceived.WithLabelValues("loopback").Inc() + continue + } - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message", - zap.Any("payload", msg.Message), - zap.Binary("raw", envelope.Data), - zap.String("from", envelope.GetFrom().String())) - } + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("received message", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } - switch m := msg.Message.(type) { - case *gossipv1.GossipMessage_SignedHeartbeat: - s := m.SignedHeartbeat - gs := params.gst.Get() - if gs == nil { - // No valid guardian set yet - dropping heartbeat - logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", - zap.Any("value", s), - zap.String("from", envelope.GetFrom().String())) - break - } - if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil { - p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() - logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", - zap.Error(err), - zap.Any("payload", msg.Message), - zap.Any("value", s), - zap.Binary("raw", envelope.Data), - zap.String("from", envelope.GetFrom().String())) - } else { - p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc() - logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received", - zap.Any("value", heartbeat), - zap.String("from", envelope.GetFrom().String())) - - func() { - if len(heartbeat.P2PNodeId) != 0 { - params.components.ProtectedHostByGuardianKeyLock.Lock() - defer params.components.ProtectedHostByGuardianKeyLock.Unlock() - var peerId peer.ID - if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil { - logger.Error("p2p_node_id_in_heartbeat_invalid", - zap.Any("payload", msg.Message), - zap.Any("value", s), - zap.Binary("raw", envelope.Data), - zap.String("from", envelope.GetFrom().String())) + switch m := msg.Message.(type) { + case *gossipv1.GossipMessage_SignedObservation: + if params.obsvC != nil { + if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, params.obsvC); err == nil { + p2pMessagesReceived.WithLabelValues("observation").Inc() } else { - guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) - if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) { - prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr] - if ok { - if prevPeerId != peerId { - logger.Info("p2p_guardian_peer_changed", - zap.String("guardian_addr", guardianAddr.String()), - zap.String("prevPeerId", prevPeerId.String()), - zap.String("newPeerId", peerId.String()), - ) - params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat") - params.components.ConnMgr.Protect(peerId, "heartbeat") - params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId - } - } else { - params.components.ConnMgr.Protect(peerId, "heartbeat") - params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId - } + if params.components.WarnChannelOverflow { + logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) } - } - } else { - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName)) + p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() } } - }() - } - case *gossipv1.GossipMessage_SignedObservation: - if params.obsvC != nil { - if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, params.obsvC); err == nil { - p2pMessagesReceived.WithLabelValues("observation").Inc() - } else { - if params.components.WarnChannelOverflow { - logger.Warn("Ignoring SignedObservation because obsvC full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) - } - p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() - } - } - case *gossipv1.GossipMessage_SignedVaaWithQuorum: - if params.signedInC != nil { - select { - case params.signedInC <- m.SignedVaaWithQuorum: - p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() - default: - if params.components.WarnChannelOverflow { - // TODO do not log this in production - var hexStr string - if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil { - hexStr = vaa.HexDigest() + case *gossipv1.GossipMessage_SignedObservationBatch: + if params.batchObsvC != nil { + if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, params.batchObsvC); err == nil { + p2pMessagesReceived.WithLabelValues("batch_observation").Inc() + } else { + if params.components.WarnChannelOverflow { + logger.Warn("Ignoring SignedObservationBatch because batchObsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr))) + } + p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc() } - logger.Warn("Ignoring SignedVaaWithQuorum because signedInC full", zap.String("hash", hexStr)) } - p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc() + default: + p2pMessagesReceived.WithLabelValues("unknown").Inc() + logger.Warn("received unknown message type on attestation topic (running outdated software?)", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) } } - case *gossipv1.GossipMessage_SignedObservationRequest: - if params.obsvReqC != nil { - s := m.SignedObservationRequest - gs := params.gst.Get() - if gs == nil { - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) - } - break + }() + } + + // This routine processes signed VAA messages received from gossip. ////////////////////////////////////////////// + if vaaSubscription != nil { + go func() { + for { + envelope, err := vaaSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled + if err != nil { + errC <- fmt.Errorf("failed to receive pubsub message on vaa subscription: %w", err) + errC <- fmt.Errorf("failed to receive pubsub message on vaa subscription: %w", err) + return } - r, err := processSignedObservationRequest(s, gs) + + var msg gossipv1.GossipMessage + err = proto.Unmarshal(envelope.Data, &msg) if err != nil { - p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc() - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("invalid signed observation request received", - zap.Error(err), - zap.Any("payload", msg.Message), - zap.Any("value", s), - zap.Binary("raw", envelope.Data), - zap.String("from", envelope.GetFrom().String())) - } - } else { + logger.Info("received invalid message", + zap.Binary("data", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + p2pMessagesReceived.WithLabelValues("invalid").Inc() + continue + } + + if envelope.GetFrom() == h.ID() { if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String())) + logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) } + p2pMessagesReceived.WithLabelValues("loopback").Inc() + continue + } + + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("received message", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } - select { - case params.obsvReqC <- r: - p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() - default: - p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() + switch m := msg.Message.(type) { + // case *gossipv1.GossipMessage_SignedHeartbeat: + // s := m.SignedHeartbeat + // gs := gst.Get() + // if gs == nil { + // // No valid guardian set yet - dropping heartbeat + // logger.Log(components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", + // zap.Any("value", s), + // zap.String("from", envelope.GetFrom().String())) + // break + // } + // if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, gst, disableHeartbeatVerify); err != nil { + // p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() + // logger.Log(components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", + // zap.Error(err), + // zap.Any("payload", msg.Message), + // zap.Any("value", s), + // zap.Binary("raw", envelope.Data), + // zap.String("from", envelope.GetFrom().String())) + // } else { + // p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc() + // logger.Log(components.SignedHeartbeatLogLevel, "valid signed heartbeat received", + // zap.Any("value", heartbeat), + // zap.String("from", envelope.GetFrom().String())) + + // func() { + // if len(heartbeat.P2PNodeId) != 0 { + // components.ProtectedHostByGuardianKeyLock.Lock() + // defer components.ProtectedHostByGuardianKeyLock.Unlock() + // var peerId peer.ID + // if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil { + // logger.Error("p2p_node_id_in_heartbeat_invalid", + // zap.Any("payload", msg.Message), + // zap.Any("value", s), + // zap.Binary("raw", envelope.Data), + // zap.String("from", envelope.GetFrom().String())) + // } else { + // guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) + // if gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(gk.PublicKey) { + // prevPeerId, ok := components.ProtectedHostByGuardianKey[guardianAddr] + // if ok { + // if prevPeerId != peerId { + // logger.Info("p2p_guardian_peer_changed", + // zap.String("guardian_addr", guardianAddr.String()), + // zap.String("prevPeerId", prevPeerId.String()), + // zap.String("newPeerId", peerId.String()), + // ) + // components.ConnMgr.Unprotect(prevPeerId, "heartbeat") + // components.ConnMgr.Protect(peerId, "heartbeat") + // components.ProtectedHostByGuardianKey[guardianAddr] = peerId + // } + // } else { + // components.ConnMgr.Protect(peerId, "heartbeat") + // components.ProtectedHostByGuardianKey[guardianAddr] = peerId + // } + // } + // } + // } else { + // if logger.Level().Enabled(zapcore.DebugLevel) { + // logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName)) + // } + // } + // }() + // } + // case *gossipv1.GossipMessage_SignedObservation: + // if obsvC != nil { + // if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, obsvC); err == nil { + // p2pMessagesReceived.WithLabelValues("observation").Inc() + // } else { + // if components.WarnChannelOverflow { + // logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) + // } + // p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() + // } + // } + // case *gossipv1.GossipMessage_SignedObservationBatch: + // if batchObsvC != nil { + // if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, batchObsvC); err == nil { + // p2pMessagesReceived.WithLabelValues("batch_observation").Inc() + // } else { + // if components.WarnChannelOverflow { + // logger.Warn("Ignoring SignedObservationBatch because batchObsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr))) + // } + // p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc() + // } + // } + case *gossipv1.GossipMessage_SignedVaaWithQuorum: + if params.signedInC != nil { + select { + case params.signedInC <- m.SignedVaaWithQuorum: + p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() + default: + if params.components.WarnChannelOverflow { + // TODO do not log this in production + var hexStr string + if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil { + hexStr = vaa.HexDigest() + } + logger.Warn("Ignoring SignedVaaWithQuorum because signedInC full", zap.String("hash", hexStr)) + } + p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc() + } } + // case *gossipv1.GossipMessage_SignedObservationRequest: + // if obsvReqC != nil { + // s := m.SignedObservationRequest + // gs := gst.Get() + // if gs == nil { + // if logger.Level().Enabled(zapcore.DebugLevel) { + // logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) + // } + // break + // } + // r, err := processSignedObservationRequest(s, gs) + // if err != nil { + // p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc() + // if logger.Level().Enabled(zapcore.DebugLevel) { + // logger.Debug("invalid signed observation request received", + // zap.Error(err), + // zap.Any("payload", msg.Message), + // zap.Any("value", s), + // zap.Binary("raw", envelope.Data), + // zap.String("from", envelope.GetFrom().String())) + // } + // } else { + // if logger.Level().Enabled(zapcore.DebugLevel) { + // logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String())) + // } + + // select { + // case obsvReqC <- r: + // p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() + // default: + // p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() + // } + // } + // } + // case *gossipv1.GossipMessage_SignedChainGovernorConfig: + // if signedGovCfg != nil { + // signedGovCfg <- m.SignedChainGovernorConfig + // } + // case *gossipv1.GossipMessage_SignedChainGovernorStatus: + // if signedGovSt != nil { + // signedGovSt <- m.SignedChainGovernorStatus + // } + default: + p2pMessagesReceived.WithLabelValues("unknown").Inc() + logger.Warn("received unknown message type on vaa topic (running outdated software?)", + zap.Any("payload", msg.Message), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) } } - case *gossipv1.GossipMessage_SignedChainGovernorConfig: - if params.signedGovCfg != nil { - params.signedGovCfg <- m.SignedChainGovernorConfig - } - case *gossipv1.GossipMessage_SignedChainGovernorStatus: - if params.signedGovSt != nil { - params.signedGovSt <- m.SignedChainGovernorStatus - } - default: - p2pMessagesReceived.WithLabelValues("unknown").Inc() - logger.Warn("received unknown message type (running outdated software?)", - zap.Any("payload", msg.Message), - zap.Binary("raw", envelope.Data), - zap.String("from", envelope.GetFrom().String())) - } + }() + } + + // Wait for either a shutdown or a fatal error from a pubsub subscription. + select { + case <-ctx.Done(): + return nil + case err := <-errC: + return err } } } diff --git a/node/pkg/p2p/run_params.go b/node/pkg/p2p/run_params.go index 59652f21ca..f5a1ac310b 100644 --- a/node/pkg/p2p/run_params.go +++ b/node/pkg/p2p/run_params.go @@ -57,6 +57,11 @@ type ( ccqBootstrapPeers string ccqPort uint ccqAllowedPeers string + + // This is junk: + gossipControlSendC chan []byte + gossipAttestationSendC chan []byte + gossipVaaSendC chan []byte } // RunOpt is used to specify optional parameters. diff --git a/node/pkg/p2p/watermark_test.go b/node/pkg/p2p/watermark_test.go index 661decf30e..62cc28aa63 100644 --- a/node/pkg/p2p/watermark_test.go +++ b/node/pkg/p2p/watermark_test.go @@ -30,7 +30,9 @@ type G struct { obsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation] obsvReqC chan *gossipv1.ObservationRequest obsvReqSendC chan *gossipv1.ObservationRequest - sendC chan []byte + controlSendC chan []byte + attestationSendC chan []byte + vaaSendC chan []byte signedInC chan *gossipv1.SignedVAAWithQuorum priv p2pcrypto.PrivKey gk *ecdsa.PrivateKey @@ -67,7 +69,9 @@ func NewG(t *testing.T, nodeName string) *G { obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs), obsvReqC: make(chan *gossipv1.ObservationRequest, cs), obsvReqSendC: make(chan *gossipv1.ObservationRequest, cs), - sendC: make(chan []byte, cs), + controlSendC: make(chan []byte, cs), + attestationSendC: make(chan []byte, cs), + vaaSendC: make(chan []byte, cs), signedInC: make(chan *gossipv1.SignedVAAWithQuorum, cs), priv: p2ppriv, gk: guardianpriv, @@ -91,7 +95,9 @@ func NewG(t *testing.T, nodeName string) *G { case <-g.signedInC: case <-g.signedGovCfg: case <-g.signedGovSt: - case <-g.sendC: + case <-g.controlSendC: + case <-g.attestationSendC: + case <-g.vaaSendC: } }() @@ -178,7 +184,7 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) { g.obsvC, g.signedInC, g.obsvReqC, - g.sendC, + g.attestationSendC, g.obsvReqSendC, g.acct, g.gov, diff --git a/node/pkg/processor/broadcast.go b/node/pkg/processor/broadcast.go index 641332de33..a35f8a773c 100644 --- a/node/pkg/processor/broadcast.go +++ b/node/pkg/processor/broadcast.go @@ -57,7 +57,7 @@ func (p *Processor) broadcastSignature( } // Broadcast the observation. - p.gossipSendC <- msg + p.gossipAttestationSendC <- msg observationsBroadcast.Inc() hash := hex.EncodeToString(digest.Bytes()) @@ -106,7 +106,7 @@ func (p *Processor) broadcastSignedVAA(v *vaa.VAA) { } // Broadcast the signed VAA. - p.gossipSendC <- msg + p.gossipVaaSendC <- msg signedVAAsBroadcast.Inc() if p.gatewayRelayer != nil { diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index bcc5e6cd6d..a36017750e 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -228,7 +228,10 @@ func (p *Processor) handleCleanup(ctx context.Context) { if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil { p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err)) } - p.gossipSendC <- s.ourMsg + p.postObservationToBatch(s.ourObs) + if s.ourMsg != nil { + p.gossipAttestationSendC <- s.ourMsg // TODO: Get rid of this + } s.retryCtr++ s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr)) aggregationStateRetries.Inc() diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index e1545ce49c..ac94d046bc 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -258,7 +258,9 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW if len(sigsVaaFormat) >= gs.Quorum() { // we have reached quorum *with the active guardian set* + start := time.Now() s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) + timeToHandleQuorum.Observe(float64(time.Since(start).Microseconds())) } else { if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("quorum not met, doing nothing", diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index b1e88268ef..1c76b67d6b 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -103,8 +103,13 @@ type Processor struct { msgC <-chan *common.MessagePublication // setC is a channel of guardian set updates setC <-chan *common.GuardianSet - // gossipSendC is a channel of outbound messages to broadcast on p2p - gossipSendC chan<- []byte + + // gossipAttestationSendC is a channel of outbound observation messages to broadcast on p2p + gossipAttestationSendC chan<- []byte + + // gossipVaaSendC is a channel of outbound VAA messages to broadcast on p2p + gossipVaaSendC chan<- []byte + // obsvC is a channel of inbound decoded observations from p2p obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation] @@ -155,14 +160,52 @@ var ( Help: "Latency histogram for total time to process signed observations", Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, }) + + timeToHandleObservation = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "wormhole_time_to_handle_observation_us", + Help: "Latency histogram for total time to handle observation on an observation", + Buckets: []float64{100.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 5000.0, 10_000.0}, + }) + + timeToHandleQuorum = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "wormhole_time_to_handle_quorum_us", + Help: "Latency histogram for total time to handle quorum on an observation", + Buckets: []float64{25.0, 50.0, 75.0, 100.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 5000.0, 10_000.0}, + }) + + batchObservationChanDelay = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "wormhole_batch_observation_channel_delay_us", + Help: "Latency histogram for delay of batched observations in channel", + Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, + }) + + batchObservationTotalDelay = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "wormhole_batch_observation_total_delay_us", + Help: "Latency histogram for total time to process batched observations", + Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, + }) + + batchObservationChannelOverflow = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "wormhole_batch_observation_channel_overflow", + Help: "Total number of times a write to the batch observation publish channel failed", + }, []string{"channel"}) ) +// batchObsvPubChanSize specifies the size of the channel used to publish observation batches. Allow five seconds worth. +const batchObsvPubChanSize = p2p.MaxObservationBatchSize * 5 + func NewProcessor( ctx context.Context, db *db.Database, msgC <-chan *common.MessagePublication, setC <-chan *common.GuardianSet, - gossipSendC chan<- []byte, + gossipAttestationSendC chan<- []byte, + gossipVaaSendC chan<- []byte, obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], obsvReqSendC chan<- *gossipv1.ObservationRequest, signedInC <-chan *gossipv1.SignedVAAWithQuorum, @@ -175,15 +218,16 @@ func NewProcessor( ) *Processor { return &Processor{ - msgC: msgC, - setC: setC, - gossipSendC: gossipSendC, - obsvC: obsvC, - obsvReqSendC: obsvReqSendC, - signedInC: signedInC, - gk: gk, - gst: gst, - db: db, + msgC: msgC, + setC: setC, + gossipAttestationSendC: gossipAttestationSendC, + gossipVaaSendC: gossipVaaSendC, + obsvC: obsvC, + obsvReqSendC: obsvReqSendC, + signedInC: signedInC, + gk: gk, + gst: gst, + db: db, logger: supervisor.Logger(ctx), state: &aggregationState{observationMap{}}, From d893cc2c1ba70a2e700b582b1808067e24b47fde Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Wed, 26 Jun 2024 15:41:54 -0500 Subject: [PATCH 2/5] Add cutover support --- node/pkg/node/options.go | 4 +- node/pkg/p2p/gossip_cutover.go | 101 ++++++++ node/pkg/p2p/gossip_cutover_test.go | 81 ++++++ node/pkg/p2p/p2p.go | 371 +++++++++++++++------------- node/pkg/p2p/run_params.go | 39 +-- node/pkg/p2p/run_params_test.go | 12 +- node/pkg/p2p/watermark_test.go | 2 + node/pkg/processor/cleanup.go | 6 +- node/pkg/processor/observation.go | 6 +- node/pkg/processor/processor.go | 27 +- 10 files changed, 420 insertions(+), 229 deletions(-) create mode 100644 node/pkg/p2p/gossip_cutover.go create mode 100644 node/pkg/p2p/gossip_cutover_test.go diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 8bdef28724..477c71ea36 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -67,7 +67,9 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers, g.obsvC, nil, // g.signedInC.writeC, g.obsvReqC.writeC, - g.gossipSendC, + g.gossipControlSendC, + g.gossipAttestationSendC, + g.gossipVaaSendC, g.obsvReqSendC.readC, g.acct, g.gov, diff --git a/node/pkg/p2p/gossip_cutover.go b/node/pkg/p2p/gossip_cutover.go new file mode 100644 index 0000000000..f5f3ebac40 --- /dev/null +++ b/node/pkg/p2p/gossip_cutover.go @@ -0,0 +1,101 @@ +package p2p + +import ( + "fmt" + "strings" + "sync/atomic" + "time" + + "go.uber.org/zap" +) + +// The format of this time is very picky. Please use the exact format specified by cutOverFmtStr! +const mainnetCutOverTimeStr = "" +const testnetCutOverTimeStr = "" +const devnetCutOverTimeStr = "" +const cutOverFmtStr = "2006-01-02T15:04:05-0700" + +// gossipCutoverCompleteFlag indicates if the cutover time has passed, meaning we should publish only on the new topics. +var gossipCutoverCompleteFlag atomic.Bool + +// GossipCutoverComplete returns true if the cutover time has passed, meaning we should publish on the new topic. +func GossipCutoverComplete() bool { + return gossipCutoverCompleteFlag.Load() +} + +// evaluateCutOver determines if the gossip cutover time has passed yet and sets the global flag accordingly. If the time has +// not yet passed, it creates a go routine to wait for that time and then set the flag. +func evaluateGossipCutOver(logger *zap.Logger, networkID string) error { + cutOverTimeStr := getCutOverTimeStr(networkID) + + sco, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, time.Now()) + if err != nil { + return err + } + + gossipCutoverCompleteFlag.Store(sco) + logger.Info("evaluated cutover flag", zap.Bool("cutOverFlag", GossipCutoverComplete()), zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco")) + + if delay != time.Duration(0) { + // Wait for the cut over time and then update the flag. + go func() { + time.Sleep(delay) + logger.Info("time to cut over to new gossip topics", zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco")) + gossipCutoverCompleteFlag.Store(true) + }() + } + + return nil +} + +// evaluateGossipCutOverImpl performs the actual cut over check. It is a separate function for testing purposes. +func evaluateGossipCutOverImpl(logger *zap.Logger, cutOverTimeStr string, now time.Time) (bool, time.Duration, error) { + if cutOverTimeStr == "" { + return false, 0, nil + } + + cutOverTime, err := time.Parse(cutOverFmtStr, cutOverTimeStr) + if err != nil { + return false, 0, fmt.Errorf(`failed to parse cut over time: %w`, err) + } + + if cutOverTime.Before(now) { + logger.Info("cut over time has passed, should use new gossip topics", zap.String("cutOverTime", cutOverTime.Format(cutOverFmtStr)), zap.String("now", now.Format(cutOverFmtStr)), zap.String("component", "p2pco")) + return true, 0, nil + } + + // If we get here, we need to wait for the cutover and then force a restart. + delay := cutOverTime.Sub(now) + logger.Info("still waiting for cut over time", + zap.Stringer("cutOverTime", cutOverTime), + zap.String("now", now.Format(cutOverFmtStr)), + zap.Stringer("delay", delay), + zap.String("component", "p2pco")) + + return false, delay, nil +} + +// getCutOverTimeStr returns the cut over time string based on the network ID passed in. +func getCutOverTimeStr(networkID string) string { //nolint:unparam + if strings.Contains(networkID, "/mainnet/") { + return mainnetCutOverTimeStr + } + if strings.Contains(networkID, "/testnet/") { + return testnetCutOverTimeStr + } + return devnetCutOverTimeStr +} + +// GossipAttestationMsg is the payload of the `gossipAttestationSendC` channel. This will be used instead of just `[]byte` +// until after the cutover is complete and support for publishing `SignedObservations` is removed. Then this can be deleted. +type GossipAttestationMsg struct { + MsgType GossipAttestationMsgType + Msg []byte +} + +type GossipAttestationMsgType uint8 + +const ( + GossipAttestationSignedObservation GossipAttestationMsgType = iota + GossipAttestationSignedObservationBatch +) diff --git a/node/pkg/p2p/gossip_cutover_test.go b/node/pkg/p2p/gossip_cutover_test.go new file mode 100644 index 0000000000..f398985b08 --- /dev/null +++ b/node/pkg/p2p/gossip_cutover_test.go @@ -0,0 +1,81 @@ +package p2p + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestVerifyCutOverTime(t *testing.T) { + if mainnetCutOverTimeStr != "" { + _, err := time.Parse(cutOverFmtStr, mainnetCutOverTimeStr) + require.NoError(t, err) + } + if testnetCutOverTimeStr != "" { + _, err := time.Parse(cutOverFmtStr, testnetCutOverTimeStr) + require.NoError(t, err) + } + if devnetCutOverTimeStr != "" { + _, err := time.Parse(cutOverFmtStr, devnetCutOverTimeStr) + require.NoError(t, err) + } +} + +func TestGetCutOverTimeStr(t *testing.T) { + assert.Equal(t, mainnetCutOverTimeStr, getCutOverTimeStr("blah/blah/mainnet/blah")) + assert.Equal(t, testnetCutOverTimeStr, getCutOverTimeStr("blah/blah/testnet/blah")) + assert.Equal(t, devnetCutOverTimeStr, getCutOverTimeStr("blah/blah/devnet/blah")) +} + +func TestCutOverDisabled(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000") + require.NoError(t, err) + + cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.NoError(t, err) + assert.False(t, cuttingOver) + assert.Equal(t, time.Duration(0), delay) +} + +func TestCutOverInvalidTime(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "Hello World" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000") + require.NoError(t, err) + + _, _, err = evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.EqualError(t, err, `failed to parse cut over time: parsing time "Hello World" as "2006-01-02T15:04:05-0700": cannot parse "Hello World" as "2006"`) +} + +func TestCutOverAlreadyHappened(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "2023-10-06T18:18:00-0000" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000") + require.NoError(t, err) + + cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.NoError(t, err) + assert.True(t, cuttingOver) + assert.Equal(t, time.Duration(0), delay) +} + +func TestCutOverDelayRequired(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "2023-10-06T18:18:00-0000" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T17:18:00-0000") + require.NoError(t, err) + + cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.NoError(t, err) + assert.False(t, cuttingOver) + assert.Equal(t, time.Duration(60*time.Minute), delay) +} diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index 9b2bddc379..49736e33e1 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -310,6 +310,16 @@ func Run(params *RunParams) func(ctx context.Context) error { logger := supervisor.Logger(ctx) + // Evaluate the gossip cutover time. If it has passed, then the flag will be set to make us publish on the new topics. + // If not, a routine will be started to wait for that time before starting to publish on the new topics. + cutoverErr := evaluateGossipCutOver(logger, params.networkID) + if cutoverErr != nil { + panic(cutoverErr) + } + + // If the cutover has not happened yet, we need to join and subscribe to the VAA topic because it is also the old topic. + needOldTopic := !GossipCutoverComplete() + defer func() { // TODO: Right now we're canceling the root context because it used to be the case that libp2p cannot be cleanly restarted. // But that seems to no longer be the case. We may want to revisit this. See (https://github.com/libp2p/go-libp2p/issues/992) for background. @@ -358,7 +368,7 @@ func Run(params *RunParams) func(ctx context.Context) error { // These will only be non-nil if the application plans to listen for or publish to that topic. var controlPubsubTopic, attestationPubsubTopic, vaaPubsubTopic *pubsub.Topic - var controlSubscription, vaaSubscription, attestationSubscription *pubsub.Subscription + var controlSubscription, attestationSubscription, vaaSubscription *pubsub.Subscription // Set up the control channel. //////////////////////////////////////////////////////////////////// if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqC != nil || params.signedGovCfg != nil || params.signedGovSt != nil { @@ -386,7 +396,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } // Set up the attestation channel. //////////////////////////////////////////////////////////////////// - if params.gossipAttestationSendC != nil || params.obsvC != nil || params.batchObsvC != nil { + if params.gossipAttestationSendC != nil || params.obsvC != nil { attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation") logger.Info("joining the attestation topic", zap.String("topic", attestationTopic)) attestationPubsubTopic, err = ps.Join(attestationTopic) @@ -400,7 +410,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - if params.obsvC != nil || params.batchObsvC != nil { + if params.obsvC != nil { logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic)) attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) if err != nil { @@ -411,7 +421,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } // Set up the VAA channel. //////////////////////////////////////////////////////////////////// - if params.gossipVaaSendC != nil || params.signedInC != nil { + if params.gossipVaaSendC != nil || params.signedInC != nil || needOldTopic { vaaTopic := fmt.Sprintf("%s/%s", params.networkID, "broadcast") logger.Info("joining the vaa topic", zap.String("topic", vaaTopic)) vaaPubsubTopic, err = ps.Join(vaaTopic) @@ -425,7 +435,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - if params.signedInC != nil { + if params.signedInC != nil || needOldTopic { logger.Info("subscribing to the vaa topic", zap.String("topic", vaaTopic)) vaaSubscription, err = vaaPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) if err != nil { @@ -577,10 +587,18 @@ func Run(params *RunParams) func(ctx context.Context) error { return b }() - err = controlPubsubTopic.Publish(ctx, b) - p2pMessagesSent.WithLabelValues("control").Inc() - if err != nil { - logger.Warn("failed to publish heartbeat message", zap.Error(err)) + if GossipCutoverComplete() { + err = controlPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Warn("failed to publish heartbeat message", zap.Error(err)) + } + } else if vaaPubsubTopic != nil { + err = vaaPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("old_control").Inc() + if err != nil { + logger.Warn("failed to publish heartbeat message to old topic", zap.Error(err)) + } } p2pHeartbeatsSent.Inc() @@ -599,20 +617,36 @@ func Run(params *RunParams) func(ctx context.Context) error { return case msg := <-params.gossipControlSendC: if controlPubsubTopic != nil { - err := controlPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("control").Inc() - if err != nil { - logger.Error("failed to publish message from control queue", zap.Error(err)) + if GossipCutoverComplete() { + err := controlPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Error("failed to publish message from control queue", zap.Error(err)) + } + } else if vaaPubsubTopic != nil { + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("old_control").Inc() + if err != nil { + logger.Error("failed to publish message from control queue to old topic", zap.Error(err)) + } } } else { logger.Error("received a message on the control queue when we do not have a control topic") } case msg := <-params.gossipAttestationSendC: if attestationPubsubTopic != nil { - err := attestationPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("attestation").Inc() - if err != nil { - logger.Error("failed to publish message from attestation queue", zap.Error(err)) + if GossipCutoverComplete() { + err := attestationPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("attestation").Inc() + if err != nil { + logger.Error("failed to publish message from attestation queue", zap.Error(err)) + } + } else if vaaPubsubTopic != nil { + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("old_attestation").Inc() + if err != nil { + logger.Error("failed to publish message from attestation queue to old topic", zap.Error(err)) + } } } else { logger.Error("received a message on the attestation queue when we do not have an attestation topic") @@ -661,12 +695,20 @@ func Run(params *RunParams) func(ctx context.Context) error { params.obsvReqC <- msg } - err = controlPubsubTopic.Publish(ctx, b) - p2pMessagesSent.WithLabelValues("control").Inc() - if err != nil { - logger.Error("failed to publish observation request", zap.Error(err)) - } else { - logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) + if GossipCutoverComplete() { + err = controlPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Error("failed to publish observation request", zap.Error(err)) + } else { + logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) + } + } else if vaaPubsubTopic != nil { + err = vaaPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("old_control").Inc() + if err != nil { + logger.Error("failed to publish observation request to old topic", zap.Error(err)) + } } } } @@ -681,14 +723,14 @@ func Run(params *RunParams) func(ctx context.Context) error { for { envelope, err := controlSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled if err != nil { - errC <- fmt.Errorf("failed to receive pubsub message on control subscription: %w", err) + errC <- fmt.Errorf("failed to receive pubsub message on control topic: %w", err) return } var msg gossipv1.GossipMessage err = proto.Unmarshal(envelope.Data, &msg) if err != nil { - logger.Info("received invalid message", + logger.Info("received invalid message on control topic", zap.Binary("data", envelope.Data), zap.String("from", envelope.GetFrom().String())) p2pMessagesReceived.WithLabelValues("invalid").Inc() @@ -697,14 +739,14 @@ func Run(params *RunParams) func(ctx context.Context) error { if envelope.GetFrom() == h.ID() { if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) + logger.Debug("received message from ourselves on control topic, ignoring", zap.Any("payload", msg.Message)) } p2pMessagesReceived.WithLabelValues("loopback").Inc() continue } if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message", + logger.Debug("received message on control topic", zap.Any("payload", msg.Message), zap.Binary("raw", envelope.Data), zap.String("from", envelope.GetFrom().String())) @@ -833,14 +875,14 @@ func Run(params *RunParams) func(ctx context.Context) error { for { envelope, err := attestationSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled if err != nil { - errC <- fmt.Errorf("failed to receive pubsub message on attestation subscription: %w", err) + errC <- fmt.Errorf("failed to receive pubsub message on attestation topic: %w", err) return } var msg gossipv1.GossipMessage err = proto.Unmarshal(envelope.Data, &msg) if err != nil { - logger.Info("received invalid message", + logger.Info("received invalid message on attestation topic", zap.Binary("data", envelope.Data), zap.String("from", envelope.GetFrom().String())) p2pMessagesReceived.WithLabelValues("invalid").Inc() @@ -849,14 +891,14 @@ func Run(params *RunParams) func(ctx context.Context) error { if envelope.GetFrom() == h.ID() { if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) + logger.Debug("received message from ourselves on attestation topic, ignoring", zap.Any("payload", msg.Message)) } p2pMessagesReceived.WithLabelValues("loopback").Inc() continue } if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message", + logger.Debug("received message on attestation topic", zap.Any("payload", msg.Message), zap.Binary("raw", envelope.Data), zap.String("from", envelope.GetFrom().String())) @@ -865,26 +907,15 @@ func Run(params *RunParams) func(ctx context.Context) error { switch m := msg.Message.(type) { case *gossipv1.GossipMessage_SignedObservation: if params.obsvC != nil { - if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, params.obsvC); err == nil { + if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvC); err == nil { p2pMessagesReceived.WithLabelValues("observation").Inc() } else { if params.components.WarnChannelOverflow { - logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) + logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr))) } p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() } } - case *gossipv1.GossipMessage_SignedObservationBatch: - if params.batchObsvC != nil { - if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, params.batchObsvC); err == nil { - p2pMessagesReceived.WithLabelValues("batch_observation").Inc() - } else { - if params.components.WarnChannelOverflow { - logger.Warn("Ignoring SignedObservationBatch because batchObsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr))) - } - p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc() - } - } default: p2pMessagesReceived.WithLabelValues("unknown").Inc() logger.Warn("received unknown message type on attestation topic (running outdated software?)", @@ -902,15 +933,14 @@ func Run(params *RunParams) func(ctx context.Context) error { for { envelope, err := vaaSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled if err != nil { - errC <- fmt.Errorf("failed to receive pubsub message on vaa subscription: %w", err) - errC <- fmt.Errorf("failed to receive pubsub message on vaa subscription: %w", err) + errC <- fmt.Errorf("failed to receive pubsub message on vaa topic: %w", err) return } var msg gossipv1.GossipMessage err = proto.Unmarshal(envelope.Data, &msg) if err != nil { - logger.Info("received invalid message", + logger.Info("received invalid message on vaa topic", zap.Binary("data", envelope.Data), zap.String("from", envelope.GetFrom().String())) p2pMessagesReceived.WithLabelValues("invalid").Inc() @@ -919,105 +949,94 @@ func Run(params *RunParams) func(ctx context.Context) error { if envelope.GetFrom() == h.ID() { if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) + logger.Debug("received message from ourselves on vaa topic, ignoring", zap.Any("payload", msg.Message)) } p2pMessagesReceived.WithLabelValues("loopback").Inc() continue } if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message", + logger.Debug("received message on vaa topic", zap.Any("payload", msg.Message), zap.Binary("raw", envelope.Data), zap.String("from", envelope.GetFrom().String())) } switch m := msg.Message.(type) { - // case *gossipv1.GossipMessage_SignedHeartbeat: - // s := m.SignedHeartbeat - // gs := gst.Get() - // if gs == nil { - // // No valid guardian set yet - dropping heartbeat - // logger.Log(components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", - // zap.Any("value", s), - // zap.String("from", envelope.GetFrom().String())) - // break - // } - // if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, gst, disableHeartbeatVerify); err != nil { - // p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() - // logger.Log(components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", - // zap.Error(err), - // zap.Any("payload", msg.Message), - // zap.Any("value", s), - // zap.Binary("raw", envelope.Data), - // zap.String("from", envelope.GetFrom().String())) - // } else { - // p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc() - // logger.Log(components.SignedHeartbeatLogLevel, "valid signed heartbeat received", - // zap.Any("value", heartbeat), - // zap.String("from", envelope.GetFrom().String())) - - // func() { - // if len(heartbeat.P2PNodeId) != 0 { - // components.ProtectedHostByGuardianKeyLock.Lock() - // defer components.ProtectedHostByGuardianKeyLock.Unlock() - // var peerId peer.ID - // if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil { - // logger.Error("p2p_node_id_in_heartbeat_invalid", - // zap.Any("payload", msg.Message), - // zap.Any("value", s), - // zap.Binary("raw", envelope.Data), - // zap.String("from", envelope.GetFrom().String())) - // } else { - // guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) - // if gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(gk.PublicKey) { - // prevPeerId, ok := components.ProtectedHostByGuardianKey[guardianAddr] - // if ok { - // if prevPeerId != peerId { - // logger.Info("p2p_guardian_peer_changed", - // zap.String("guardian_addr", guardianAddr.String()), - // zap.String("prevPeerId", prevPeerId.String()), - // zap.String("newPeerId", peerId.String()), - // ) - // components.ConnMgr.Unprotect(prevPeerId, "heartbeat") - // components.ConnMgr.Protect(peerId, "heartbeat") - // components.ProtectedHostByGuardianKey[guardianAddr] = peerId - // } - // } else { - // components.ConnMgr.Protect(peerId, "heartbeat") - // components.ProtectedHostByGuardianKey[guardianAddr] = peerId - // } - // } - // } - // } else { - // if logger.Level().Enabled(zapcore.DebugLevel) { - // logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName)) - // } - // } - // }() - // } - // case *gossipv1.GossipMessage_SignedObservation: - // if obsvC != nil { - // if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, obsvC); err == nil { - // p2pMessagesReceived.WithLabelValues("observation").Inc() - // } else { - // if components.WarnChannelOverflow { - // logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) - // } - // p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() - // } - // } - // case *gossipv1.GossipMessage_SignedObservationBatch: - // if batchObsvC != nil { - // if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, batchObsvC); err == nil { - // p2pMessagesReceived.WithLabelValues("batch_observation").Inc() - // } else { - // if components.WarnChannelOverflow { - // logger.Warn("Ignoring SignedObservationBatch because batchObsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr))) - // } - // p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc() - // } - // } + case *gossipv1.GossipMessage_SignedHeartbeat: // TODO: Get rid of this after the cutover. + s := m.SignedHeartbeat + gs := params.gst.Get() + if gs == nil { + // No valid guardian set yet - dropping heartbeat + logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", + zap.Any("value", s), + zap.String("from", envelope.GetFrom().String())) + break + } + if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil { + p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() + logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", + zap.Error(err), + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } else { + p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc() + logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received", + zap.Any("value", heartbeat), + zap.String("from", envelope.GetFrom().String())) + + func() { + if len(heartbeat.P2PNodeId) != 0 { + params.components.ProtectedHostByGuardianKeyLock.Lock() + defer params.components.ProtectedHostByGuardianKeyLock.Unlock() + var peerId peer.ID + if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil { + logger.Error("p2p_node_id_in_heartbeat_invalid", + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } else { + guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) + if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) { + prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr] + if ok { + if prevPeerId != peerId { + logger.Info("p2p_guardian_peer_changed", + zap.String("guardian_addr", guardianAddr.String()), + zap.String("prevPeerId", prevPeerId.String()), + zap.String("newPeerId", peerId.String()), + ) + params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat") + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId + } + } else { + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId + } + } + } + } else { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName)) + } + } + }() + } + case *gossipv1.GossipMessage_SignedObservation: // TODO: Get rid of this after the cutover. + if params.obsvC != nil { + if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvC); err == nil { + p2pMessagesReceived.WithLabelValues("observation").Inc() + } else { + if params.components.WarnChannelOverflow { + logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) + } + p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() + } + } case *gossipv1.GossipMessage_SignedVaaWithQuorum: if params.signedInC != nil { select { @@ -1035,48 +1054,48 @@ func Run(params *RunParams) func(ctx context.Context) error { p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc() } } - // case *gossipv1.GossipMessage_SignedObservationRequest: - // if obsvReqC != nil { - // s := m.SignedObservationRequest - // gs := gst.Get() - // if gs == nil { - // if logger.Level().Enabled(zapcore.DebugLevel) { - // logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) - // } - // break - // } - // r, err := processSignedObservationRequest(s, gs) - // if err != nil { - // p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc() - // if logger.Level().Enabled(zapcore.DebugLevel) { - // logger.Debug("invalid signed observation request received", - // zap.Error(err), - // zap.Any("payload", msg.Message), - // zap.Any("value", s), - // zap.Binary("raw", envelope.Data), - // zap.String("from", envelope.GetFrom().String())) - // } - // } else { - // if logger.Level().Enabled(zapcore.DebugLevel) { - // logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String())) - // } - - // select { - // case obsvReqC <- r: - // p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() - // default: - // p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() - // } - // } - // } - // case *gossipv1.GossipMessage_SignedChainGovernorConfig: - // if signedGovCfg != nil { - // signedGovCfg <- m.SignedChainGovernorConfig - // } - // case *gossipv1.GossipMessage_SignedChainGovernorStatus: - // if signedGovSt != nil { - // signedGovSt <- m.SignedChainGovernorStatus - // } + case *gossipv1.GossipMessage_SignedObservationRequest: // TODO: Get rid of this after the cutover. + if params.obsvReqC != nil { + s := m.SignedObservationRequest + gs := params.gst.Get() + if gs == nil { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) + } + break + } + r, err := processSignedObservationRequest(s, gs) + if err != nil { + p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc() + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("invalid signed observation request received", + zap.Error(err), + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } + } else { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String())) + } + + select { + case params.obsvReqC <- r: + p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() + default: + p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() + } + } + } + case *gossipv1.GossipMessage_SignedChainGovernorConfig: // TODO: Get rid of this after the cutover. + if params.signedGovCfg != nil { + params.signedGovCfg <- m.SignedChainGovernorConfig + } + case *gossipv1.GossipMessage_SignedChainGovernorStatus: // TODO: Get rid of this after the cutover. + if params.signedGovSt != nil { + params.signedGovSt <- m.SignedChainGovernorStatus + } default: p2pMessagesReceived.WithLabelValues("unknown").Inc() logger.Warn("received unknown message type on vaa topic (running outdated software?)", diff --git a/node/pkg/p2p/run_params.go b/node/pkg/p2p/run_params.go index f5a1ac310b..bb35992c4b 100644 --- a/node/pkg/p2p/run_params.go +++ b/node/pkg/p2p/run_params.go @@ -42,26 +42,23 @@ type ( disableHeartbeatVerify bool // The following options are guardian specific. Set with `WithGuardianOptions`. - nodeName string - gk *ecdsa.PrivateKey - gossipSendC chan []byte - obsvReqSendC <-chan *gossipv1.ObservationRequest - acct *accountant.Accountant - gov *governor.ChainGovernor - components *Components - ibcFeaturesFunc func() string - gatewayRelayerEnabled bool - ccqEnabled bool - signedQueryReqC chan<- *gossipv1.SignedQueryRequest - queryResponseReadC <-chan *query.QueryResponsePublication - ccqBootstrapPeers string - ccqPort uint - ccqAllowedPeers string - - // This is junk: + nodeName string + gk *ecdsa.PrivateKey gossipControlSendC chan []byte gossipAttestationSendC chan []byte gossipVaaSendC chan []byte + obsvReqSendC <-chan *gossipv1.ObservationRequest + acct *accountant.Accountant + gov *governor.ChainGovernor + components *Components + ibcFeaturesFunc func() string + gatewayRelayerEnabled bool + ccqEnabled bool + signedQueryReqC chan<- *gossipv1.SignedQueryRequest + queryResponseReadC <-chan *query.QueryResponsePublication + ccqBootstrapPeers string + ccqPort uint + ccqAllowedPeers string } // RunOpt is used to specify optional parameters. @@ -155,7 +152,9 @@ func WithGuardianOptions( obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], signedInC chan<- *gossipv1.SignedVAAWithQuorum, obsvReqC chan<- *gossipv1.ObservationRequest, - gossipSendC chan []byte, + gossipControlSendC chan []byte, + gossipAttestationSendC chan []byte, + gossipVaaSendC chan []byte, obsvReqSendC <-chan *gossipv1.ObservationRequest, acct *accountant.Accountant, gov *governor.ChainGovernor, @@ -176,7 +175,9 @@ func WithGuardianOptions( p.obsvC = obsvC p.signedInC = signedInC p.obsvReqC = obsvReqC - p.gossipSendC = gossipSendC + p.gossipControlSendC = gossipControlSendC + p.gossipAttestationSendC = gossipAttestationSendC + p.gossipVaaSendC = gossipVaaSendC p.obsvReqSendC = obsvReqSendC p.acct = acct p.gov = gov diff --git a/node/pkg/p2p/run_params_test.go b/node/pkg/p2p/run_params_test.go index aebebd952c..e9432e8e2d 100644 --- a/node/pkg/p2p/run_params_test.go +++ b/node/pkg/p2p/run_params_test.go @@ -143,7 +143,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42) signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42) obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42) - gossipSendC := make(chan []byte, 42) + gossipControlSendC := make(chan []byte, 42) + gossipAttestationSendC := make(chan []byte, 42) + gossipVaaSendC := make(chan []byte, 42) obsvReqSendC := make(<-chan *gossipv1.ObservationRequest, 42) acct := &accountant.Accountant{} @@ -172,7 +174,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { obsvC, signedInC, obsvReqC, - gossipSendC, + gossipControlSendC, + gossipAttestationSendC, + gossipVaaSendC, obsvReqSendC, acct, gov, @@ -194,7 +198,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { assert.Equal(t, obsvC, params.obsvC) assert.Equal(t, signedInC, params.signedInC) assert.Equal(t, obsvReqC, params.obsvReqC) - assert.Equal(t, gossipSendC, params.gossipSendC) + assert.Equal(t, gossipControlSendC, params.gossipControlSendC) + assert.Equal(t, gossipAttestationSendC, params.gossipAttestationSendC) + assert.Equal(t, gossipVaaSendC, params.gossipVaaSendC) assert.Equal(t, obsvReqSendC, params.obsvReqSendC) assert.Equal(t, acct, params.acct) assert.Equal(t, gov, params.gov) diff --git a/node/pkg/p2p/watermark_test.go b/node/pkg/p2p/watermark_test.go index 62cc28aa63..c8e8c98286 100644 --- a/node/pkg/p2p/watermark_test.go +++ b/node/pkg/p2p/watermark_test.go @@ -184,7 +184,9 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) { g.obsvC, g.signedInC, g.obsvReqC, + g.controlSendC, g.attestationSendC, + g.vaaSendC, g.obsvReqSendC, g.acct, g.gov, diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index a36017750e..014b76344b 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -228,10 +228,8 @@ func (p *Processor) handleCleanup(ctx context.Context) { if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil { p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err)) } - p.postObservationToBatch(s.ourObs) - if s.ourMsg != nil { - p.gossipAttestationSendC <- s.ourMsg // TODO: Get rid of this - } + + p.gossipAttestationSendC <- s.ourMsg s.retryCtr++ s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr)) aggregationStateRetries.Inc() diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index ac94d046bc..c45fae1d57 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -74,12 +74,13 @@ func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum, // and assembles and submits a valid VAA if possible. -func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { +func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!) // // Note that observations are never tied to the (verified) p2p identity key - the p2p network // identity is completely decoupled from the guardian identity, p2p is just transport. + start := time.Now() observationsReceivedTotal.Inc() m := obs.Msg @@ -87,6 +88,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW s := p.state.signatures[hash] if s != nil && s.submitted { // already submitted; ignoring additional signatures for it. + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) return } @@ -236,6 +238,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW zap.String("digest", hash), ) } + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) return } @@ -279,6 +282,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW } observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) } func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) { diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 1c76b67d6b..3479b4c676 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -165,7 +165,7 @@ var ( prometheus.HistogramOpts{ Name: "wormhole_time_to_handle_observation_us", Help: "Latency histogram for total time to handle observation on an observation", - Buckets: []float64{100.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 5000.0, 10_000.0}, + Buckets: []float64{25.0, 50.0, 75.0, 100.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 5000.0, 10_000.0}, }) timeToHandleQuorum = promauto.NewHistogram( @@ -174,31 +174,8 @@ var ( Help: "Latency histogram for total time to handle quorum on an observation", Buckets: []float64{25.0, 50.0, 75.0, 100.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 5000.0, 10_000.0}, }) - - batchObservationChanDelay = promauto.NewHistogram( - prometheus.HistogramOpts{ - Name: "wormhole_batch_observation_channel_delay_us", - Help: "Latency histogram for delay of batched observations in channel", - Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, - }) - - batchObservationTotalDelay = promauto.NewHistogram( - prometheus.HistogramOpts{ - Name: "wormhole_batch_observation_total_delay_us", - Help: "Latency histogram for total time to process batched observations", - Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, - }) - - batchObservationChannelOverflow = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "wormhole_batch_observation_channel_overflow", - Help: "Total number of times a write to the batch observation publish channel failed", - }, []string{"channel"}) ) -// batchObsvPubChanSize specifies the size of the channel used to publish observation batches. Allow five seconds worth. -const batchObsvPubChanSize = p2p.MaxObservationBatchSize * 5 - func NewProcessor( ctx context.Context, db *db.Database, @@ -298,7 +275,7 @@ func (p *Processor) Run(ctx context.Context) error { p.handleMessage(k) case m := <-p.obsvC: observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds())) - p.handleObservation(ctx, m) + p.handleObservation(m) case m := <-p.signedInC: p.handleInboundSignedVAAWithQuorum(ctx, m) case <-cleanup.C: From a89e3aadf6c8504ce1b6fb24c1efee0fecd70034 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Fri, 5 Jul 2024 15:18:52 -0500 Subject: [PATCH 3/5] Remove measurements that were moved to PR#3988 --- node/pkg/node/node.go | 2 +- node/pkg/p2p/gossip_cutover.go | 14 -------------- node/pkg/processor/observation.go | 8 +------- node/pkg/processor/processor.go | 16 +--------------- 4 files changed, 3 insertions(+), 37 deletions(-) diff --git a/node/pkg/node/node.go b/node/pkg/node/node.go index 92cff95d29..b94e094a03 100644 --- a/node/pkg/node/node.go +++ b/node/pkg/node/node.go @@ -75,7 +75,7 @@ type G struct { runnables map[string]supervisor.Runnable // various channels - // Outbound gossip message queues (needs to be read/write because p2p needs read/write) + // Outbound gossip message queues (need to be read/write because p2p needs read/write) gossipControlSendC chan []byte gossipAttestationSendC chan []byte gossipVaaSendC chan []byte diff --git a/node/pkg/p2p/gossip_cutover.go b/node/pkg/p2p/gossip_cutover.go index f5f3ebac40..f907714dfb 100644 --- a/node/pkg/p2p/gossip_cutover.go +++ b/node/pkg/p2p/gossip_cutover.go @@ -85,17 +85,3 @@ func getCutOverTimeStr(networkID string) string { //nolint:unparam } return devnetCutOverTimeStr } - -// GossipAttestationMsg is the payload of the `gossipAttestationSendC` channel. This will be used instead of just `[]byte` -// until after the cutover is complete and support for publishing `SignedObservations` is removed. Then this can be deleted. -type GossipAttestationMsg struct { - MsgType GossipAttestationMsgType - Msg []byte -} - -type GossipAttestationMsgType uint8 - -const ( - GossipAttestationSignedObservation GossipAttestationMsgType = iota - GossipAttestationSignedObservationBatch -) diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index c45fae1d57..e1545ce49c 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -74,13 +74,12 @@ func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum, // and assembles and submits a valid VAA if possible. -func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { +func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!) // // Note that observations are never tied to the (verified) p2p identity key - the p2p network // identity is completely decoupled from the guardian identity, p2p is just transport. - start := time.Now() observationsReceivedTotal.Inc() m := obs.Msg @@ -88,7 +87,6 @@ func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1 s := p.state.signatures[hash] if s != nil && s.submitted { // already submitted; ignoring additional signatures for it. - timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) return } @@ -238,7 +236,6 @@ func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1 zap.String("digest", hash), ) } - timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) return } @@ -261,9 +258,7 @@ func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1 if len(sigsVaaFormat) >= gs.Quorum() { // we have reached quorum *with the active guardian set* - start := time.Now() s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) - timeToHandleQuorum.Observe(float64(time.Since(start).Microseconds())) } else { if p.logger.Level().Enabled(zapcore.DebugLevel) { p.logger.Debug("quorum not met, doing nothing", @@ -282,7 +277,6 @@ func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1 } observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) - timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) } func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) { diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 3479b4c676..aee81bca88 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -160,20 +160,6 @@ var ( Help: "Latency histogram for total time to process signed observations", Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, }) - - timeToHandleObservation = promauto.NewHistogram( - prometheus.HistogramOpts{ - Name: "wormhole_time_to_handle_observation_us", - Help: "Latency histogram for total time to handle observation on an observation", - Buckets: []float64{25.0, 50.0, 75.0, 100.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 5000.0, 10_000.0}, - }) - - timeToHandleQuorum = promauto.NewHistogram( - prometheus.HistogramOpts{ - Name: "wormhole_time_to_handle_quorum_us", - Help: "Latency histogram for total time to handle quorum on an observation", - Buckets: []float64{25.0, 50.0, 75.0, 100.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 5000.0, 10_000.0}, - }) ) func NewProcessor( @@ -275,7 +261,7 @@ func (p *Processor) Run(ctx context.Context) error { p.handleMessage(k) case m := <-p.obsvC: observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds())) - p.handleObservation(m) + p.handleObservation(ctx, m) case m := <-p.signedInC: p.handleInboundSignedVAAWithQuorum(ctx, m) case <-cleanup.C: From 8190b778d26fdae3f9c56e4a4881e33512631de0 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Tue, 9 Jul 2024 12:12:19 -0500 Subject: [PATCH 4/5] Code review rework --- node/pkg/node/options.go | 2 +- node/pkg/p2p/p2p.go | 159 ++++++++++++++++++++------------------- 2 files changed, 81 insertions(+), 80 deletions(-) diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 477c71ea36..e71fc7b5af 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -65,7 +65,7 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers, nodeName, g.gk, g.obsvC, - nil, // g.signedInC.writeC, + g.signedInC.writeC, g.obsvReqC.writeC, g.gossipControlSendC, g.gossipAttestationSendC, diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index 49736e33e1..19bb740920 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -110,6 +110,7 @@ type Components struct { // is only accessed by a single routine at any given time in a running Guardian. ProtectedHostByGuardianKeyLock sync.Mutex // WarnChannelOverflow: If true, errors due to overflowing channels will produce logger.Warn + // WARNING: This should not be enabled in production. It is only used in node tests to watch for overflows. WarnChannelOverflow bool // SignedHeartbeatLogLevel is the log level at which SignedHeartbeatReceived events will be logged. SignedHeartbeatLogLevel zapcore.Level @@ -588,6 +589,9 @@ func Run(params *RunParams) func(ctx context.Context) error { }() if GossipCutoverComplete() { + if controlPubsubTopic == nil { + panic("controlPubsubTopic should not be nil when nodeName is set") + } err = controlPubsubTopic.Publish(ctx, b) p2pMessagesSent.WithLabelValues("control").Inc() if err != nil { @@ -616,99 +620,97 @@ func Run(params *RunParams) func(ctx context.Context) error { case <-ctx.Done(): return case msg := <-params.gossipControlSendC: - if controlPubsubTopic != nil { - if GossipCutoverComplete() { - err := controlPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("control").Inc() - if err != nil { - logger.Error("failed to publish message from control queue", zap.Error(err)) - } - } else if vaaPubsubTopic != nil { - err := vaaPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("old_control").Inc() - if err != nil { - logger.Error("failed to publish message from control queue to old topic", zap.Error(err)) - } + if GossipCutoverComplete() { + if controlPubsubTopic == nil { + panic("controlPubsubTopic should not be nil when gossipControlSendC is set") } - } else { - logger.Error("received a message on the control queue when we do not have a control topic") - } - case msg := <-params.gossipAttestationSendC: - if attestationPubsubTopic != nil { - if GossipCutoverComplete() { - err := attestationPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("attestation").Inc() - if err != nil { - logger.Error("failed to publish message from attestation queue", zap.Error(err)) - } - } else if vaaPubsubTopic != nil { - err := vaaPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("old_attestation").Inc() - if err != nil { - logger.Error("failed to publish message from attestation queue to old topic", zap.Error(err)) - } + err := controlPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Error("failed to publish message from control queue", zap.Error(err)) } - } else { - logger.Error("received a message on the attestation queue when we do not have an attestation topic") - } - case msg := <-params.gossipVaaSendC: - if vaaPubsubTopic != nil { + } else if vaaPubsubTopic != nil { err := vaaPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("vaa").Inc() + p2pMessagesSent.WithLabelValues("old_control").Inc() if err != nil { - logger.Error("failed to publish message from vaa queue", zap.Error(err)) + logger.Error("failed to publish message from control queue to old topic", zap.Error(err)) } - } else { - logger.Error("received a message on the vaa queue when we do not have a vaa topic") } - case msg := <-params.obsvReqSendC: - if controlPubsubTopic != nil { - b, err := proto.Marshal(msg) + case msg := <-params.gossipAttestationSendC: + if GossipCutoverComplete() { + if attestationPubsubTopic == nil { + panic("attestationPubsubTopic should not be nil when gossipAttestationSendC is set") + } + err := attestationPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("attestation").Inc() if err != nil { - panic(err) + logger.Error("failed to publish message from attestation queue", zap.Error(err)) } - - // Sign the observation request using our node's guardian key. - digest := signedObservationRequestDigest(b) - sig, err := ethcrypto.Sign(digest.Bytes(), params.gk) + } else if vaaPubsubTopic != nil { + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("old_attestation").Inc() if err != nil { - panic(err) + logger.Error("failed to publish message from attestation queue to old topic", zap.Error(err)) } + } + case msg := <-params.gossipVaaSendC: + if vaaPubsubTopic == nil { + panic("vaaPubsubTopic should not be nil when gossipVaaSendC is set") + } + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("vaa").Inc() + if err != nil { + logger.Error("failed to publish message from vaa queue", zap.Error(err)) + } + case msg := <-params.obsvReqSendC: + b, err := proto.Marshal(msg) + if err != nil { + panic(err) + } - sReq := &gossipv1.SignedObservationRequest{ - ObservationRequest: b, - Signature: sig, - GuardianAddr: ethcrypto.PubkeyToAddress(params.gk.PublicKey).Bytes(), - } + // Sign the observation request using our node's guardian key. + digest := signedObservationRequestDigest(b) + sig, err := ethcrypto.Sign(digest.Bytes(), params.gk) + if err != nil { + panic(err) + } - envelope := &gossipv1.GossipMessage{ - Message: &gossipv1.GossipMessage_SignedObservationRequest{ - SignedObservationRequest: sReq}} + sReq := &gossipv1.SignedObservationRequest{ + ObservationRequest: b, + Signature: sig, + GuardianAddr: ethcrypto.PubkeyToAddress(params.gk.PublicKey).Bytes(), + } - b, err = proto.Marshal(envelope) - if err != nil { - panic(err) - } + envelope := &gossipv1.GossipMessage{ + Message: &gossipv1.GossipMessage_SignedObservationRequest{ + SignedObservationRequest: sReq}} - // Send to local observation request queue (the loopback message is ignored) - if params.obsvReqC != nil { - params.obsvReqC <- msg - } + b, err = proto.Marshal(envelope) + if err != nil { + panic(err) + } - if GossipCutoverComplete() { - err = controlPubsubTopic.Publish(ctx, b) - p2pMessagesSent.WithLabelValues("control").Inc() - if err != nil { - logger.Error("failed to publish observation request", zap.Error(err)) - } else { - logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) - } - } else if vaaPubsubTopic != nil { - err = vaaPubsubTopic.Publish(ctx, b) - p2pMessagesSent.WithLabelValues("old_control").Inc() - if err != nil { - logger.Error("failed to publish observation request to old topic", zap.Error(err)) - } + // Send to local observation request queue (the loopback message is ignored) + if params.obsvReqC != nil { + params.obsvReqC <- msg + } + + if GossipCutoverComplete() { + if controlPubsubTopic == nil { + panic("controlPubsubTopic should not be nil when obsvReqSendC is set") + } + err = controlPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Error("failed to publish observation request", zap.Error(err)) + } else { + logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) + } + } else if vaaPubsubTopic != nil { + err = vaaPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("old_control").Inc() + if err != nil { + logger.Error("failed to publish observation request to old topic", zap.Error(err)) } } } @@ -1044,7 +1046,6 @@ func Run(params *RunParams) func(ctx context.Context) error { p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() default: if params.components.WarnChannelOverflow { - // TODO do not log this in production var hexStr string if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil { hexStr = vaa.HexDigest() From 6a0974bc34f1aab9b77050ad53539aeec1a25d49 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Fri, 19 Jul 2024 10:16:28 -0500 Subject: [PATCH 5/5] Code review rework --- node/pkg/p2p/gossip_cutover.go | 2 +- node/pkg/p2p/p2p.go | 58 ++++++++++++++++----------------- node/pkg/p2p/run_params.go | 52 ++++++++++++++--------------- node/pkg/p2p/run_params_test.go | 6 ++-- 4 files changed, 59 insertions(+), 59 deletions(-) diff --git a/node/pkg/p2p/gossip_cutover.go b/node/pkg/p2p/gossip_cutover.go index f907714dfb..8629644e84 100644 --- a/node/pkg/p2p/gossip_cutover.go +++ b/node/pkg/p2p/gossip_cutover.go @@ -64,7 +64,7 @@ func evaluateGossipCutOverImpl(logger *zap.Logger, cutOverTimeStr string, now ti return true, 0, nil } - // If we get here, we need to wait for the cutover and then force a restart. + // If we get here, we need to wait for the cutover and then switch the global flag. delay := cutOverTime.Sub(now) logger.Info("still waiting for cut over time", zap.Stringer("cutOverTime", cutOverTime), diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index 19bb740920..bf3cdfe3a2 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -372,7 +372,7 @@ func Run(params *RunParams) func(ctx context.Context) error { var controlSubscription, attestationSubscription, vaaSubscription *pubsub.Subscription // Set up the control channel. //////////////////////////////////////////////////////////////////// - if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqC != nil || params.signedGovCfg != nil || params.signedGovSt != nil { + if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil { controlTopic := fmt.Sprintf("%s/%s", params.networkID, "control") logger.Info("joining the control topic", zap.String("topic", controlTopic)) controlPubsubTopic, err = ps.Join(controlTopic) @@ -386,7 +386,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - if params.obsvReqC != nil || params.signedGovCfg != nil || params.signedGovSt != nil { + if params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil { logger.Info("subscribing to the control topic", zap.String("topic", controlTopic)) controlSubscription, err = controlPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) if err != nil { @@ -397,7 +397,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } // Set up the attestation channel. //////////////////////////////////////////////////////////////////// - if params.gossipAttestationSendC != nil || params.obsvC != nil { + if params.gossipAttestationSendC != nil || params.obsvRecvC != nil { attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation") logger.Info("joining the attestation topic", zap.String("topic", attestationTopic)) attestationPubsubTopic, err = ps.Join(attestationTopic) @@ -411,7 +411,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - if params.obsvC != nil { + if params.obsvRecvC != nil { logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic)) attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) if err != nil { @@ -422,7 +422,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } // Set up the VAA channel. //////////////////////////////////////////////////////////////////// - if params.gossipVaaSendC != nil || params.signedInC != nil || needOldTopic { + if params.gossipVaaSendC != nil || params.signedIncomingVaaRecvC != nil || needOldTopic { vaaTopic := fmt.Sprintf("%s/%s", params.networkID, "broadcast") logger.Info("joining the vaa topic", zap.String("topic", vaaTopic)) vaaPubsubTopic, err = ps.Join(vaaTopic) @@ -436,7 +436,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - if params.signedInC != nil || needOldTopic { + if params.signedIncomingVaaRecvC != nil || needOldTopic { logger.Info("subscribing to the vaa topic", zap.String("topic", vaaTopic)) vaaSubscription, err = vaaPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) if err != nil { @@ -691,8 +691,8 @@ func Run(params *RunParams) func(ctx context.Context) error { } // Send to local observation request queue (the loopback message is ignored) - if params.obsvReqC != nil { - params.obsvReqC <- msg + if params.obsvReqRecvC != nil { + params.obsvReqRecvC <- msg } if GossipCutoverComplete() { @@ -819,7 +819,7 @@ func Run(params *RunParams) func(ctx context.Context) error { }() } case *gossipv1.GossipMessage_SignedObservationRequest: - if params.obsvReqC != nil { + if params.obsvReqRecvC != nil { s := m.SignedObservationRequest gs := params.gst.Get() if gs == nil { @@ -845,7 +845,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } select { - case params.obsvReqC <- r: + case params.obsvReqRecvC <- r: p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() default: p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() @@ -853,12 +853,12 @@ func Run(params *RunParams) func(ctx context.Context) error { } } case *gossipv1.GossipMessage_SignedChainGovernorConfig: - if params.signedGovCfg != nil { - params.signedGovCfg <- m.SignedChainGovernorConfig + if params.signedGovCfgRecvC != nil { + params.signedGovCfgRecvC <- m.SignedChainGovernorConfig } case *gossipv1.GossipMessage_SignedChainGovernorStatus: - if params.signedGovSt != nil { - params.signedGovSt <- m.SignedChainGovernorStatus + if params.signedGovStatusRecvC != nil { + params.signedGovStatusRecvC <- m.SignedChainGovernorStatus } default: p2pMessagesReceived.WithLabelValues("unknown").Inc() @@ -908,12 +908,12 @@ func Run(params *RunParams) func(ctx context.Context) error { switch m := msg.Message.(type) { case *gossipv1.GossipMessage_SignedObservation: - if params.obsvC != nil { - if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvC); err == nil { + if params.obsvRecvC != nil { + if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil { p2pMessagesReceived.WithLabelValues("observation").Inc() } else { if params.components.WarnChannelOverflow { - logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr))) + logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr))) } p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() } @@ -1029,20 +1029,20 @@ func Run(params *RunParams) func(ctx context.Context) error { }() } case *gossipv1.GossipMessage_SignedObservation: // TODO: Get rid of this after the cutover. - if params.obsvC != nil { - if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvC); err == nil { + if params.obsvRecvC != nil { + if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil { p2pMessagesReceived.WithLabelValues("observation").Inc() } else { if params.components.WarnChannelOverflow { - logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) + logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) } p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() } } case *gossipv1.GossipMessage_SignedVaaWithQuorum: - if params.signedInC != nil { + if params.signedIncomingVaaRecvC != nil { select { - case params.signedInC <- m.SignedVaaWithQuorum: + case params.signedIncomingVaaRecvC <- m.SignedVaaWithQuorum: p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() default: if params.components.WarnChannelOverflow { @@ -1050,13 +1050,13 @@ func Run(params *RunParams) func(ctx context.Context) error { if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil { hexStr = vaa.HexDigest() } - logger.Warn("Ignoring SignedVaaWithQuorum because signedInC full", zap.String("hash", hexStr)) + logger.Warn("Ignoring SignedVaaWithQuorum because signedIncomingVaaRecvC full", zap.String("hash", hexStr)) } p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc() } } case *gossipv1.GossipMessage_SignedObservationRequest: // TODO: Get rid of this after the cutover. - if params.obsvReqC != nil { + if params.obsvReqRecvC != nil { s := m.SignedObservationRequest gs := params.gst.Get() if gs == nil { @@ -1082,7 +1082,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } select { - case params.obsvReqC <- r: + case params.obsvReqRecvC <- r: p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() default: p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() @@ -1090,12 +1090,12 @@ func Run(params *RunParams) func(ctx context.Context) error { } } case *gossipv1.GossipMessage_SignedChainGovernorConfig: // TODO: Get rid of this after the cutover. - if params.signedGovCfg != nil { - params.signedGovCfg <- m.SignedChainGovernorConfig + if params.signedGovCfgRecvC != nil { + params.signedGovCfgRecvC <- m.SignedChainGovernorConfig } case *gossipv1.GossipMessage_SignedChainGovernorStatus: // TODO: Get rid of this after the cutover. - if params.signedGovSt != nil { - params.signedGovSt <- m.SignedChainGovernorStatus + if params.signedGovStatusRecvC != nil { + params.signedGovStatusRecvC <- m.SignedChainGovernorStatus } default: p2pMessagesReceived.WithLabelValues("unknown").Inc() diff --git a/node/pkg/p2p/run_params.go b/node/pkg/p2p/run_params.go index bb35992c4b..d4a0c0b0b5 100644 --- a/node/pkg/p2p/run_params.go +++ b/node/pkg/p2p/run_params.go @@ -23,20 +23,20 @@ type ( gst *common.GuardianSetState rootCtxCancel context.CancelFunc - // obsvC is optional and can be set with `WithSignedObservationListener`. - obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation] + // obsvRecvC is optional and can be set with `WithSignedObservationListener`. + obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation] - // obsvReqC is optional and can be set with `WithObservationRequestListener`. - obsvReqC chan<- *gossipv1.ObservationRequest + // obsvReqRecvC is optional and can be set with `WithObservationRequestListener`. + obsvReqRecvC chan<- *gossipv1.ObservationRequest - // signedInC is optional and can be set with `WithSignedVAAListener`. - signedInC chan<- *gossipv1.SignedVAAWithQuorum + // signedIncomingVaaRecvC is optional and can be set with `WithSignedVAAListener`. + signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum - // signedGovCfg is optional and can be set with `WithChainGovernorConfigListener`. - signedGovCfg chan *gossipv1.SignedChainGovernorConfig + // signedGovCfgRecvC is optional and can be set with `WithChainGovernorConfigListener`. + signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig - // WithChainGovernorStatusListener is optional and can be set with `WithChainGovernorStatusListener`. - signedGovSt chan *gossipv1.SignedChainGovernorStatus + // signedGovStatusRecvC is optional and can be set with `WithChainGovernorStatusListener`. + signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus // disableHeartbeatVerify is optional and can be set with `WithDisableHeartbeatVerify` or `WithGuardianOptions`. disableHeartbeatVerify bool @@ -98,41 +98,41 @@ func NewRunParams( } // WithSignedObservationListener is used to set the channel to receive `SignedObservation“ messages. -func WithSignedObservationListener(obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt { +func WithSignedObservationListener(obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt { return func(p *RunParams) error { - p.obsvC = obsvC + p.obsvRecvC = obsvRecvC return nil } } // WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages. -func WithSignedVAAListener(signedInC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt { +func WithSignedVAAListener(signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt { return func(p *RunParams) error { - p.signedInC = signedInC + p.signedIncomingVaaRecvC = signedIncomingVaaRecvC return nil } } // WithObservationRequestListener is used to set the channel to receive `ObservationRequest messages. -func WithObservationRequestListener(obsvReqC chan<- *gossipv1.ObservationRequest) RunOpt { +func WithObservationRequestListener(obsvReqRecvC chan<- *gossipv1.ObservationRequest) RunOpt { return func(p *RunParams) error { - p.obsvReqC = obsvReqC + p.obsvReqRecvC = obsvReqRecvC return nil } } // WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig messages. -func WithChainGovernorConfigListener(signedGovCfg chan *gossipv1.SignedChainGovernorConfig) RunOpt { +func WithChainGovernorConfigListener(signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig) RunOpt { return func(p *RunParams) error { - p.signedGovCfg = signedGovCfg + p.signedGovCfgRecvC = signedGovCfgRecvC return nil } } // WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus messages. -func WithChainGovernorStatusListener(signedGovSt chan *gossipv1.SignedChainGovernorStatus) RunOpt { +func WithChainGovernorStatusListener(signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus) RunOpt { return func(p *RunParams) error { - p.signedGovSt = signedGovSt + p.signedGovStatusRecvC = signedGovStatusRecvC return nil } } @@ -149,9 +149,9 @@ func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt { func WithGuardianOptions( nodeName string, gk *ecdsa.PrivateKey, - obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], - signedInC chan<- *gossipv1.SignedVAAWithQuorum, - obsvReqC chan<- *gossipv1.ObservationRequest, + obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], + signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum, + obsvReqRecvC chan<- *gossipv1.ObservationRequest, gossipControlSendC chan []byte, gossipAttestationSendC chan []byte, gossipVaaSendC chan []byte, @@ -172,9 +172,9 @@ func WithGuardianOptions( return func(p *RunParams) error { p.nodeName = nodeName p.gk = gk - p.obsvC = obsvC - p.signedInC = signedInC - p.obsvReqC = obsvReqC + p.obsvRecvC = obsvRecvC + p.signedIncomingVaaRecvC = signedIncomingVaaRecvC + p.obsvReqRecvC = obsvReqRecvC p.gossipControlSendC = gossipControlSendC p.gossipAttestationSendC = gossipAttestationSendC p.gossipVaaSendC = gossipVaaSendC diff --git a/node/pkg/p2p/run_params_test.go b/node/pkg/p2p/run_params_test.go index e9432e8e2d..a1fd98cab7 100644 --- a/node/pkg/p2p/run_params_test.go +++ b/node/pkg/p2p/run_params_test.go @@ -195,9 +195,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { require.NoError(t, err) require.NotNil(t, params) assert.Equal(t, nodeName, params.nodeName) - assert.Equal(t, obsvC, params.obsvC) - assert.Equal(t, signedInC, params.signedInC) - assert.Equal(t, obsvReqC, params.obsvReqC) + assert.Equal(t, obsvC, params.obsvRecvC) + assert.Equal(t, signedInC, params.signedIncomingVaaRecvC) + assert.Equal(t, obsvReqC, params.obsvReqRecvC) assert.Equal(t, gossipControlSendC, params.gossipControlSendC) assert.Equal(t, gossipAttestationSendC, params.gossipAttestationSendC) assert.Equal(t, gossipVaaSendC, params.gossipVaaSendC)