Skip to content

Commit

Permalink
Code review rework
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jul 30, 2024
1 parent 8190b77 commit 6a0974b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 59 deletions.
2 changes: 1 addition & 1 deletion node/pkg/p2p/gossip_cutover.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func evaluateGossipCutOverImpl(logger *zap.Logger, cutOverTimeStr string, now ti
return true, 0, nil
}

// If we get here, we need to wait for the cutover and then force a restart.
// If we get here, we need to wait for the cutover and then switch the global flag.
delay := cutOverTime.Sub(now)
logger.Info("still waiting for cut over time",
zap.Stringer("cutOverTime", cutOverTime),
Expand Down
58 changes: 29 additions & 29 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
var controlSubscription, attestationSubscription, vaaSubscription *pubsub.Subscription

// Set up the control channel. ////////////////////////////////////////////////////////////////////
if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqC != nil || params.signedGovCfg != nil || params.signedGovSt != nil {
if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil {
controlTopic := fmt.Sprintf("%s/%s", params.networkID, "control")
logger.Info("joining the control topic", zap.String("topic", controlTopic))
controlPubsubTopic, err = ps.Join(controlTopic)
Expand All @@ -386,7 +386,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}()

if params.obsvReqC != nil || params.signedGovCfg != nil || params.signedGovSt != nil {
if params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil {
logger.Info("subscribing to the control topic", zap.String("topic", controlTopic))
controlSubscription, err = controlPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
if err != nil {
Expand All @@ -397,7 +397,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}

// Set up the attestation channel. ////////////////////////////////////////////////////////////////////
if params.gossipAttestationSendC != nil || params.obsvC != nil {
if params.gossipAttestationSendC != nil || params.obsvRecvC != nil {
attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation")
logger.Info("joining the attestation topic", zap.String("topic", attestationTopic))
attestationPubsubTopic, err = ps.Join(attestationTopic)
Expand All @@ -411,7 +411,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}()

if params.obsvC != nil {
if params.obsvRecvC != nil {
logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic))
attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
if err != nil {
Expand All @@ -422,7 +422,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}

// Set up the VAA channel. ////////////////////////////////////////////////////////////////////
if params.gossipVaaSendC != nil || params.signedInC != nil || needOldTopic {
if params.gossipVaaSendC != nil || params.signedIncomingVaaRecvC != nil || needOldTopic {
vaaTopic := fmt.Sprintf("%s/%s", params.networkID, "broadcast")
logger.Info("joining the vaa topic", zap.String("topic", vaaTopic))
vaaPubsubTopic, err = ps.Join(vaaTopic)
Expand All @@ -436,7 +436,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}()

if params.signedInC != nil || needOldTopic {
if params.signedIncomingVaaRecvC != nil || needOldTopic {
logger.Info("subscribing to the vaa topic", zap.String("topic", vaaTopic))
vaaSubscription, err = vaaPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
if err != nil {
Expand Down Expand Up @@ -691,8 +691,8 @@ func Run(params *RunParams) func(ctx context.Context) error {
}

// Send to local observation request queue (the loopback message is ignored)
if params.obsvReqC != nil {
params.obsvReqC <- msg
if params.obsvReqRecvC != nil {
params.obsvReqRecvC <- msg
}

if GossipCutoverComplete() {
Expand Down Expand Up @@ -819,7 +819,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}()
}
case *gossipv1.GossipMessage_SignedObservationRequest:
if params.obsvReqC != nil {
if params.obsvReqRecvC != nil {
s := m.SignedObservationRequest
gs := params.gst.Get()
if gs == nil {
Expand All @@ -845,20 +845,20 @@ func Run(params *RunParams) func(ctx context.Context) error {
}

select {
case params.obsvReqC <- r:
case params.obsvReqRecvC <- r:
p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc()
default:
p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc()
}
}
}
case *gossipv1.GossipMessage_SignedChainGovernorConfig:
if params.signedGovCfg != nil {
params.signedGovCfg <- m.SignedChainGovernorConfig
if params.signedGovCfgRecvC != nil {
params.signedGovCfgRecvC <- m.SignedChainGovernorConfig
}
case *gossipv1.GossipMessage_SignedChainGovernorStatus:
if params.signedGovSt != nil {
params.signedGovSt <- m.SignedChainGovernorStatus
if params.signedGovStatusRecvC != nil {
params.signedGovStatusRecvC <- m.SignedChainGovernorStatus
}
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()
Expand Down Expand Up @@ -908,12 +908,12 @@ func Run(params *RunParams) func(ctx context.Context) error {

switch m := msg.Message.(type) {
case *gossipv1.GossipMessage_SignedObservation:
if params.obsvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvC); err == nil {
if params.obsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil {
p2pMessagesReceived.WithLabelValues("observation").Inc()
} else {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr)))
logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr)))
}
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
Expand Down Expand Up @@ -1029,34 +1029,34 @@ func Run(params *RunParams) func(ctx context.Context) error {
}()
}
case *gossipv1.GossipMessage_SignedObservation: // TODO: Get rid of this after the cutover.
if params.obsvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvC); err == nil {
if params.obsvRecvC != nil {
if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvRecvC); err == nil {
p2pMessagesReceived.WithLabelValues("observation").Inc()
} else {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash)))
logger.Warn("Ignoring SignedObservation because obsvRecvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash)))
}
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedVaaWithQuorum:
if params.signedInC != nil {
if params.signedIncomingVaaRecvC != nil {
select {
case params.signedInC <- m.SignedVaaWithQuorum:
case params.signedIncomingVaaRecvC <- m.SignedVaaWithQuorum:
p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc()
default:
if params.components.WarnChannelOverflow {
var hexStr string
if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil {
hexStr = vaa.HexDigest()
}
logger.Warn("Ignoring SignedVaaWithQuorum because signedInC full", zap.String("hash", hexStr))
logger.Warn("Ignoring SignedVaaWithQuorum because signedIncomingVaaRecvC full", zap.String("hash", hexStr))
}
p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc()
}
}
case *gossipv1.GossipMessage_SignedObservationRequest: // TODO: Get rid of this after the cutover.
if params.obsvReqC != nil {
if params.obsvReqRecvC != nil {
s := m.SignedObservationRequest
gs := params.gst.Get()
if gs == nil {
Expand All @@ -1082,20 +1082,20 @@ func Run(params *RunParams) func(ctx context.Context) error {
}

select {
case params.obsvReqC <- r:
case params.obsvReqRecvC <- r:
p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc()
default:
p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc()
}
}
}
case *gossipv1.GossipMessage_SignedChainGovernorConfig: // TODO: Get rid of this after the cutover.
if params.signedGovCfg != nil {
params.signedGovCfg <- m.SignedChainGovernorConfig
if params.signedGovCfgRecvC != nil {
params.signedGovCfgRecvC <- m.SignedChainGovernorConfig
}
case *gossipv1.GossipMessage_SignedChainGovernorStatus: // TODO: Get rid of this after the cutover.
if params.signedGovSt != nil {
params.signedGovSt <- m.SignedChainGovernorStatus
if params.signedGovStatusRecvC != nil {
params.signedGovStatusRecvC <- m.SignedChainGovernorStatus
}
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()
Expand Down
52 changes: 26 additions & 26 deletions node/pkg/p2p/run_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@ type (
gst *common.GuardianSetState
rootCtxCancel context.CancelFunc

// obsvC is optional and can be set with `WithSignedObservationListener`.
obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// obsvRecvC is optional and can be set with `WithSignedObservationListener`.
obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]

// obsvReqC is optional and can be set with `WithObservationRequestListener`.
obsvReqC chan<- *gossipv1.ObservationRequest
// obsvReqRecvC is optional and can be set with `WithObservationRequestListener`.
obsvReqRecvC chan<- *gossipv1.ObservationRequest

// signedInC is optional and can be set with `WithSignedVAAListener`.
signedInC chan<- *gossipv1.SignedVAAWithQuorum
// signedIncomingVaaRecvC is optional and can be set with `WithSignedVAAListener`.
signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum

// signedGovCfg is optional and can be set with `WithChainGovernorConfigListener`.
signedGovCfg chan *gossipv1.SignedChainGovernorConfig
// signedGovCfgRecvC is optional and can be set with `WithChainGovernorConfigListener`.
signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig

// WithChainGovernorStatusListener is optional and can be set with `WithChainGovernorStatusListener`.
signedGovSt chan *gossipv1.SignedChainGovernorStatus
// signedGovStatusRecvC is optional and can be set with `WithChainGovernorStatusListener`.
signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus

// disableHeartbeatVerify is optional and can be set with `WithDisableHeartbeatVerify` or `WithGuardianOptions`.
disableHeartbeatVerify bool
Expand Down Expand Up @@ -98,41 +98,41 @@ func NewRunParams(
}

// WithSignedObservationListener is used to set the channel to receive `SignedObservation“ messages.
func WithSignedObservationListener(obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt {
func WithSignedObservationListener(obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt {
return func(p *RunParams) error {
p.obsvC = obsvC
p.obsvRecvC = obsvRecvC
return nil
}
}

// WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages.
func WithSignedVAAListener(signedInC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt {
func WithSignedVAAListener(signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt {
return func(p *RunParams) error {
p.signedInC = signedInC
p.signedIncomingVaaRecvC = signedIncomingVaaRecvC
return nil
}
}

// WithObservationRequestListener is used to set the channel to receive `ObservationRequest messages.
func WithObservationRequestListener(obsvReqC chan<- *gossipv1.ObservationRequest) RunOpt {
func WithObservationRequestListener(obsvReqRecvC chan<- *gossipv1.ObservationRequest) RunOpt {
return func(p *RunParams) error {
p.obsvReqC = obsvReqC
p.obsvReqRecvC = obsvReqRecvC
return nil
}
}

// WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig messages.
func WithChainGovernorConfigListener(signedGovCfg chan *gossipv1.SignedChainGovernorConfig) RunOpt {
func WithChainGovernorConfigListener(signedGovCfgRecvC chan *gossipv1.SignedChainGovernorConfig) RunOpt {
return func(p *RunParams) error {
p.signedGovCfg = signedGovCfg
p.signedGovCfgRecvC = signedGovCfgRecvC
return nil
}
}

// WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus messages.
func WithChainGovernorStatusListener(signedGovSt chan *gossipv1.SignedChainGovernorStatus) RunOpt {
func WithChainGovernorStatusListener(signedGovStatusRecvC chan *gossipv1.SignedChainGovernorStatus) RunOpt {
return func(p *RunParams) error {
p.signedGovSt = signedGovSt
p.signedGovStatusRecvC = signedGovStatusRecvC
return nil
}
}
Expand All @@ -149,9 +149,9 @@ func WithDisableHeartbeatVerify(disableHeartbeatVerify bool) RunOpt {
func WithGuardianOptions(
nodeName string,
gk *ecdsa.PrivateKey,
obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
signedInC chan<- *gossipv1.SignedVAAWithQuorum,
obsvReqC chan<- *gossipv1.ObservationRequest,
obsvRecvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
signedIncomingVaaRecvC chan<- *gossipv1.SignedVAAWithQuorum,
obsvReqRecvC chan<- *gossipv1.ObservationRequest,
gossipControlSendC chan []byte,
gossipAttestationSendC chan []byte,
gossipVaaSendC chan []byte,
Expand All @@ -172,9 +172,9 @@ func WithGuardianOptions(
return func(p *RunParams) error {
p.nodeName = nodeName
p.gk = gk
p.obsvC = obsvC
p.signedInC = signedInC
p.obsvReqC = obsvReqC
p.obsvRecvC = obsvRecvC
p.signedIncomingVaaRecvC = signedIncomingVaaRecvC
p.obsvReqRecvC = obsvReqRecvC
p.gossipControlSendC = gossipControlSendC
p.gossipAttestationSendC = gossipAttestationSendC
p.gossipVaaSendC = gossipVaaSendC
Expand Down
6 changes: 3 additions & 3 deletions node/pkg/p2p/run_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, params)
assert.Equal(t, nodeName, params.nodeName)
assert.Equal(t, obsvC, params.obsvC)
assert.Equal(t, signedInC, params.signedInC)
assert.Equal(t, obsvReqC, params.obsvReqC)
assert.Equal(t, obsvC, params.obsvRecvC)
assert.Equal(t, signedInC, params.signedIncomingVaaRecvC)
assert.Equal(t, obsvReqC, params.obsvReqRecvC)
assert.Equal(t, gossipControlSendC, params.gossipControlSendC)
assert.Equal(t, gossipAttestationSendC, params.gossipAttestationSendC)
assert.Equal(t, gossipVaaSendC, params.gossipVaaSendC)
Expand Down

0 comments on commit 6a0974b

Please sign in to comment.