From 5a3a3b2a97e6f9b3c1e9b2f7edee9ef6c5a4b875 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Tue, 9 Jul 2024 12:12:19 -0500 Subject: [PATCH] Code review rework --- node/pkg/p2p/p2p.go | 159 ++++++++++++++++++++++---------------------- 1 file changed, 80 insertions(+), 79 deletions(-) diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index f305ffbd31..a17c5e203a 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -110,6 +110,7 @@ type Components struct { // is only accessed by a single routine at any given time in a running Guardian. ProtectedHostByGuardianKeyLock sync.Mutex // WarnChannelOverflow: If true, errors due to overflowing channels will produce logger.Warn + // WARNING: This should not be enabled in production. It is only used in node tests to watch for overflows. WarnChannelOverflow bool // SignedHeartbeatLogLevel is the log level at which SignedHeartbeatReceived events will be logged. SignedHeartbeatLogLevel zapcore.Level @@ -584,6 +585,9 @@ func Run(params *RunParams) func(ctx context.Context) error { }() if GossipCutoverComplete() { + if controlPubsubTopic == nil { + panic("controlPubsubTopic should not be nil when nodeName is set") + } err = controlPubsubTopic.Publish(ctx, b) p2pMessagesSent.WithLabelValues("control").Inc() if err != nil { @@ -612,99 +616,97 @@ func Run(params *RunParams) func(ctx context.Context) error { case <-ctx.Done(): return case msg := <-params.gossipControlSendC: - if controlPubsubTopic != nil { - if GossipCutoverComplete() { - err := controlPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("control").Inc() - if err != nil { - logger.Error("failed to publish message from control queue", zap.Error(err)) - } - } else if vaaPubsubTopic != nil { - err := vaaPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("old_control").Inc() - if err != nil { - logger.Error("failed to publish message from control queue to old topic", zap.Error(err)) - } + if GossipCutoverComplete() { + if controlPubsubTopic == nil { + panic("controlPubsubTopic should not be nil when gossipControlSendC is set") } - } else { - logger.Error("received a message on the control queue when we do not have a control topic") - } - case msg := <-params.gossipAttestationSendC: - if attestationPubsubTopic != nil { - if GossipCutoverComplete() { - err := attestationPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("attestation").Inc() - if err != nil { - logger.Error("failed to publish message from attestation queue", zap.Error(err)) - } - } else if vaaPubsubTopic != nil { - err := vaaPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("old_attestation").Inc() - if err != nil { - logger.Error("failed to publish message from attestation queue to old topic", zap.Error(err)) - } + err := controlPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Error("failed to publish message from control queue", zap.Error(err)) } - } else { - logger.Error("received a message on the attestation queue when we do not have an attestation topic") - } - case msg := <-params.gossipVaaSendC: - if vaaPubsubTopic != nil { + } else if vaaPubsubTopic != nil { err := vaaPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("vaa").Inc() + p2pMessagesSent.WithLabelValues("old_control").Inc() if err != nil { - logger.Error("failed to publish message from vaa queue", zap.Error(err)) + logger.Error("failed to publish message from control queue to old topic", zap.Error(err)) } - } else { - logger.Error("received a message on the vaa queue when we do not have a vaa topic") } - case msg := <-params.obsvReqSendC: - if controlPubsubTopic != nil { - b, err := proto.Marshal(msg) + case msg := <-params.gossipAttestationSendC: + if GossipCutoverComplete() { + if attestationPubsubTopic == nil { + panic("attestationPubsubTopic should not be nil when gossipAttestationSendC is set") + } + err := attestationPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("attestation").Inc() if err != nil { - panic(err) + logger.Error("failed to publish message from attestation queue", zap.Error(err)) } - - // Sign the observation request using our node's guardian key. - digest := signedObservationRequestDigest(b) - sig, err := ethcrypto.Sign(digest.Bytes(), params.gk) + } else if vaaPubsubTopic != nil { + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("old_attestation").Inc() if err != nil { - panic(err) + logger.Error("failed to publish message from attestation queue to old topic", zap.Error(err)) } + } + case msg := <-params.gossipVaaSendC: + if vaaPubsubTopic == nil { + panic("vaaPubsubTopic should not be nil when gossipVaaSendC is set") + } + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("vaa").Inc() + if err != nil { + logger.Error("failed to publish message from vaa queue", zap.Error(err)) + } + case msg := <-params.obsvReqSendC: + b, err := proto.Marshal(msg) + if err != nil { + panic(err) + } - sReq := &gossipv1.SignedObservationRequest{ - ObservationRequest: b, - Signature: sig, - GuardianAddr: ethcrypto.PubkeyToAddress(params.gk.PublicKey).Bytes(), - } + // Sign the observation request using our node's guardian key. + digest := signedObservationRequestDigest(b) + sig, err := ethcrypto.Sign(digest.Bytes(), params.gk) + if err != nil { + panic(err) + } - envelope := &gossipv1.GossipMessage{ - Message: &gossipv1.GossipMessage_SignedObservationRequest{ - SignedObservationRequest: sReq}} + sReq := &gossipv1.SignedObservationRequest{ + ObservationRequest: b, + Signature: sig, + GuardianAddr: ethcrypto.PubkeyToAddress(params.gk.PublicKey).Bytes(), + } - b, err = proto.Marshal(envelope) - if err != nil { - panic(err) - } + envelope := &gossipv1.GossipMessage{ + Message: &gossipv1.GossipMessage_SignedObservationRequest{ + SignedObservationRequest: sReq}} - // Send to local observation request queue (the loopback message is ignored) - if params.obsvReqC != nil { - params.obsvReqC <- msg - } + b, err = proto.Marshal(envelope) + if err != nil { + panic(err) + } - if GossipCutoverComplete() { - err = controlPubsubTopic.Publish(ctx, b) - p2pMessagesSent.WithLabelValues("control").Inc() - if err != nil { - logger.Error("failed to publish observation request", zap.Error(err)) - } else { - logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) - } - } else if vaaPubsubTopic != nil { - err = vaaPubsubTopic.Publish(ctx, b) - p2pMessagesSent.WithLabelValues("old_control").Inc() - if err != nil { - logger.Error("failed to publish observation request to old topic", zap.Error(err)) - } + // Send to local observation request queue (the loopback message is ignored) + if params.obsvReqC != nil { + params.obsvReqC <- msg + } + + if GossipCutoverComplete() { + if controlPubsubTopic == nil { + panic("controlPubsubTopic should not be nil when obsvReqSendC is set") + } + err = controlPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Error("failed to publish observation request", zap.Error(err)) + } else { + logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) + } + } else if vaaPubsubTopic != nil { + err = vaaPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("old_control").Inc() + if err != nil { + logger.Error("failed to publish observation request to old topic", zap.Error(err)) } } } @@ -1040,7 +1042,6 @@ func Run(params *RunParams) func(ctx context.Context) error { p2pMessagesReceived.WithLabelValues("signed_vaa_with_quorum").Inc() default: if params.components.WarnChannelOverflow { - // TODO do not log this in production var hexStr string if vaa, err := vaa.Unmarshal(m.SignedVaaWithQuorum.Vaa); err == nil { hexStr = vaa.HexDigest()