diff --git a/node/pkg/p2p/gossip_cutover.go b/node/pkg/p2p/gossip_cutover.go index f907714dfb..8629644e84 100644 --- a/node/pkg/p2p/gossip_cutover.go +++ b/node/pkg/p2p/gossip_cutover.go @@ -64,7 +64,7 @@ func evaluateGossipCutOverImpl(logger *zap.Logger, cutOverTimeStr string, now ti return true, 0, nil } - // If we get here, we need to wait for the cutover and then force a restart. + // If we get here, we need to wait for the cutover and then switch the global flag. delay := cutOverTime.Sub(now) logger.Info("still waiting for cut over time", zap.Stringer("cutOverTime", cutOverTime), diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index 19bb740920..bf3cdfe3a2 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -372,7 +372,7 @@ func Run(params *RunParams) func(ctx context.Context) error { var controlSubscription, attestationSubscription, vaaSubscription *pubsub.Subscription // Set up the control channel. //////////////////////////////////////////////////////////////////// - if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqC != nil || params.signedGovCfg != nil || params.signedGovSt != nil { + if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil { controlTopic := fmt.Sprintf("%s/%s", params.networkID, "control") logger.Info("joining the control topic", zap.String("topic", controlTopic)) controlPubsubTopic, err = ps.Join(controlTopic) @@ -386,7 +386,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - if params.obsvReqC != nil || params.signedGovCfg != nil || params.signedGovSt != nil { + if params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil { logger.Info("subscribing to the control topic", zap.String("topic", controlTopic)) controlSubscription, err = controlPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) if err != nil { @@ -397,7 +397,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } // Set up the attestation channel. //////////////////////////////////////////////////////////////////// - if params.gossipAttestationSendC != nil || params.obsvC != nil { + if params.gossipAttestationSendC != nil || params.obsvRecvC != nil { attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation") logger.Info("joining the attestation topic", zap.String("topic", attestationTopic)) attestationPubsubTopic, err = ps.Join(attestationTopic) @@ -411,7 +411,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - if params.obsvC != nil { + if params.obsvRecvC != nil { logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic)) attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) if err != nil { @@ -422,7 +422,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } // Set up the VAA channel. //////////////////////////////////////////////////////////////////// - if params.gossipVaaSendC != nil || params.signedInC != nil || needOldTopic { + if params.gossipVaaSendC != nil || params.signedIncomingVaaRecvC != nil || needOldTopic { vaaTopic := fmt.Sprintf("%s/%s", params.networkID, "broadcast") logger.Info("joining the vaa topic", zap.String("topic", vaaTopic)) vaaPubsubTopic, err = ps.Join(vaaTopic) @@ -436,7 +436,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - if params.signedInC != nil || needOldTopic { + if params.signedIncomingVaaRecvC != nil || needOldTopic { logger.Info("subscribing to the vaa topic", zap.String("topic", vaaTopic)) vaaSubscription, err = vaaPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) if err != nil { @@ -691,8 +691,8 @@ func Run(params *RunParams) func(ctx context.Context) error { } // Send to local observation request queue (the loopback message is ignored) - if params.obsvReqC != nil { - params.obsvReqC <- msg + if params.obsvReqRecvC != nil { + params.obsvReqRecvC <- msg } if GossipCutoverComplete() { @@ -819,7 +819,7 @@ func Run(params *RunParams) func(ctx context.Context) error { }() } case *gossipv1.GossipMessage_SignedObservationRequest: - if params.obsvReqC != nil { + if params.obsvReqRecvC != nil { s := m.SignedObservationRequest gs := params.gst.Get() if gs == nil { @@ -845,7 +845,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } select { - case params.obsvReqC <- r: + case params.obsvReqRecvC <- r: p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() default: p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() @@ -853,12 +853,12 @@ func Run(params *RunParams) func(ctx context.Context) error { } } case *gossipv1.GossipMessage_SignedChainGovernorConfig: - if params.signedGovCfg != nil { - params.signedGovCfg <- m.SignedChainGovernorConfig + if params.signedGovCfgRecvC != nil { + params.signedGovCfgRecvC <- m.SignedChainGovernorConfig } case *gossipv1.GossipMessage_SignedChainGovernorStatus: - if params.signedGovSt != nil { - params.signedGovSt <- m.SignedChainGovernorStatus + if params.signedGovStatusRecvC != nil { + params.signedGovStatusRecvC <- m.SignedChainGovernorStatus } default: p2pMessagesReceived.WithLabelValues("unknown").Inc() @@ -908,12 +908,12 @@ func Run(params *RunParams) func(ctx context.Context) error { switch m := msg.Message.(type) { case *gossipv1.GossipMessage_SignedObservation: - if params.obsvC != nil { - if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvC); err == nil { + if params.obsvRecvC != nil { + if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil { p2pMessagesReceived.WithLabelValues("observation").Inc() } else { if params.components.WarnChannelOverflow { - logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr))) + logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr))) } p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() } @@ -1029,20 +1029,20 @@ func Run(params *RunParams) func(ctx context.Context) error { }() } case *gossipv1.GossipMessage_SignedObservation: // TODO: Get rid of this after the cutover. - if params.obsvC != nil { - if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvC); err == nil { + if params.obsvRecvC != nil { + if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil { p2pMessagesReceived.WithLabelValues("observation").Inc() } else { if params.components.WarnChannelOverflow { - logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) + logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) } p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() } } case *gossipv1.GossipMessage_SignedVaaWithQuorum: - if params.signedInC != nil { + if params.signedIncomingVaaRecvC != nil { select { - case params.signedInC <- m.SignedVaaWithQuorum: + case params.signedIncomingVaaRecvC <- m.SignedVaaWithQuorum: p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() default: if params.components.WarnChannelOverflow { @@ -1050,13 +1050,13 @@ func Run(params *RunParams) func(ctx context.Context) error { if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil { hexStr = vaa.HexDigest() } - logger.Warn("Ignoring SignedVaaWithQuorum because signedInC full", zap.String("hash", hexStr)) + logger.Warn("Ignoring SignedVaaWithQuorum because signedIncomingVaaRecvC full", zap.String("hash", hexStr)) } p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc() } } case *gossipv1.GossipMessage_SignedObservationRequest: // TODO: Get rid of this after the cutover. - if params.obsvReqC != nil { + if params.obsvReqRecvC != nil { s := m.SignedObservationRequest gs := params.gst.Get() if gs == nil { @@ -1082,7 +1082,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } select { - case params.obsvReqC <- r: + case params.obsvReqRecvC <- r: p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() default: p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() @@ -1090,12 +1090,12 @@ func Run(params *RunParams) func(ctx context.Context) error { } } case *gossipv1.GossipMessage_SignedChainGovernorConfig: // TODO: Get rid of this after the cutover. - if params.signedGovCfg != nil { - params.signedGovCfg <- m.SignedChainGovernorConfig + if params.signedGovCfgRecvC != nil { + params.signedGovCfgRecvC <- m.SignedChainGovernorConfig } case *gossipv1.GossipMessage_SignedChainGovernorStatus: // TODO: Get rid of this after the cutover. - if params.signedGovSt != nil { - params.signedGovSt <- m.SignedChainGovernorStatus + if params.signedGovStatusRecvC != nil { + params.signedGovStatusRecvC <- m.SignedChainGovernorStatus } default: p2pMessagesReceived.WithLabelValues("unknown").Inc() diff --git a/node/pkg/p2p/run_params.go b/node/pkg/p2p/run_params.go index bb35992c4b..d4a0c0b0b5 100644 --- a/node/pkg/p2p/run_params.go +++ b/node/pkg/p2p/run_params.go @@ -23,20 +23,20 @@ type ( gst *common.GuardianSetState rootCtxCancel context.CancelFunc - // obsvC is optional and can be set with `WithSignedObservationListener`. - obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation] + // obsvRecvC is optional and can be set with `WithSignedObservationListener`. + obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation] - // obsvReqC is optional and can be set with `WithObservationRequestListener`. - obsvReqC chan<- *gossipv1.ObservationRequest + // obsvReqRecvC is optional and can be set with `WithObservationRequestListener`. + obsvReqRecvC chan<- *gossipv1.ObservationRequest - // signedInC is optional and can be set with `WithSignedVAAListener`. - signedInC chan<- *gossipv1.SignedVAAWithQuorum + // signedIncomingVaaRecvC is optional and can be set with `WithSignedVAAListener`. + signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum - // signedGovCfg is optional and can be set with `WithChainGovernorConfigListener`. - signedGovCfg chan *gossipv1.SignedChainGovernorConfig + // signedGovCfgRecvC is optional and can be set with `WithChainGovernorConfigListener`. + signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig - // WithChainGovernorStatusListener is optional and can be set with `WithChainGovernorStatusListener`. - signedGovSt chan *gossipv1.SignedChainGovernorStatus + // signedGovStatusRecvC is optional and can be set with `WithChainGovernorStatusListener`. + signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus // disableHeartbeatVerify is optional and can be set with `WithDisableHeartbeatVerify` or `WithGuardianOptions`. disableHeartbeatVerify bool @@ -98,41 +98,41 @@ func NewRunParams( } // WithSignedObservationListener is used to set the channel to receive `SignedObservation“ messages. -func WithSignedObservationListener(obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt { +func WithSignedObservationListener(obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt { return func(p *RunParams) error { - p.obsvC = obsvC + p.obsvRecvC = obsvRecvC return nil } } // WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages. -func WithSignedVAAListener(signedInC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt { +func WithSignedVAAListener(signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt { return func(p *RunParams) error { - p.signedInC = signedInC + p.signedIncomingVaaRecvC = signedIncomingVaaRecvC return nil } } // WithObservationRequestListener is used to set the channel to receive `ObservationRequest messages. -func WithObservationRequestListener(obsvReqC chan<- *gossipv1.ObservationRequest) RunOpt { +func WithObservationRequestListener(obsvReqRecvC chan<- *gossipv1.ObservationRequest) RunOpt { return func(p *RunParams) error { - p.obsvReqC = obsvReqC + p.obsvReqRecvC = obsvReqRecvC return nil } } // WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig messages. -func WithChainGovernorConfigListener(signedGovCfg chan *gossipv1.SignedChainGovernorConfig) RunOpt { +func WithChainGovernorConfigListener(signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig) RunOpt { return func(p *RunParams) error { - p.signedGovCfg = signedGovCfg + p.signedGovCfgRecvC = signedGovCfgRecvC return nil } } // WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus messages. -func WithChainGovernorStatusListener(signedGovSt chan *gossipv1.SignedChainGovernorStatus) RunOpt { +func WithChainGovernorStatusListener(signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus) RunOpt { return func(p *RunParams) error { - p.signedGovSt = signedGovSt + p.signedGovStatusRecvC = signedGovStatusRecvC return nil } } @@ -149,9 +149,9 @@ func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt { func WithGuardianOptions( nodeName string, gk *ecdsa.PrivateKey, - obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], - signedInC chan<- *gossipv1.SignedVAAWithQuorum, - obsvReqC chan<- *gossipv1.ObservationRequest, + obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], + signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum, + obsvReqRecvC chan<- *gossipv1.ObservationRequest, gossipControlSendC chan []byte, gossipAttestationSendC chan []byte, gossipVaaSendC chan []byte, @@ -172,9 +172,9 @@ func WithGuardianOptions( return func(p *RunParams) error { p.nodeName = nodeName p.gk = gk - p.obsvC = obsvC - p.signedInC = signedInC - p.obsvReqC = obsvReqC + p.obsvRecvC = obsvRecvC + p.signedIncomingVaaRecvC = signedIncomingVaaRecvC + p.obsvReqRecvC = obsvReqRecvC p.gossipControlSendC = gossipControlSendC p.gossipAttestationSendC = gossipAttestationSendC p.gossipVaaSendC = gossipVaaSendC diff --git a/node/pkg/p2p/run_params_test.go b/node/pkg/p2p/run_params_test.go index e9432e8e2d..a1fd98cab7 100644 --- a/node/pkg/p2p/run_params_test.go +++ b/node/pkg/p2p/run_params_test.go @@ -195,9 +195,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { require.NoError(t, err) require.NotNil(t, params) assert.Equal(t, nodeName, params.nodeName) - assert.Equal(t, obsvC, params.obsvC) - assert.Equal(t, signedInC, params.signedInC) - assert.Equal(t, obsvReqC, params.obsvReqC) + assert.Equal(t, obsvC, params.obsvRecvC) + assert.Equal(t, signedInC, params.signedIncomingVaaRecvC) + assert.Equal(t, obsvReqC, params.obsvReqRecvC) assert.Equal(t, gossipControlSendC, params.gossipControlSendC) assert.Equal(t, gossipAttestationSendC, params.gossipAttestationSendC) assert.Equal(t, gossipVaaSendC, params.gossipVaaSendC)