From 681cf0b9469eb6d6c6344e5eb65c208eb30b2cc8 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Wed, 26 Jun 2024 15:41:54 -0500 Subject: [PATCH] Add cutover support --- node/pkg/node/options.go | 4 +- node/pkg/p2p/gossip_cutover.go | 101 ++++++++ node/pkg/p2p/gossip_cutover_test.go | 81 ++++++ node/pkg/p2p/p2p.go | 371 +++++++++++++++------------- node/pkg/p2p/run_params.go | 39 +-- node/pkg/p2p/run_params_test.go | 12 +- node/pkg/p2p/watermark_test.go | 2 + node/pkg/processor/cleanup.go | 6 +- node/pkg/processor/observation.go | 6 +- node/pkg/processor/processor.go | 27 +- 10 files changed, 420 insertions(+), 229 deletions(-) create mode 100644 node/pkg/p2p/gossip_cutover.go create mode 100644 node/pkg/p2p/gossip_cutover_test.go diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 9926670202..c0bae8ffa9 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -67,7 +67,9 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers, g.obsvC, nil, // g.signedInC.writeC, g.obsvReqC.writeC, - g.gossipSendC, + g.gossipControlSendC, + g.gossipAttestationSendC, + g.gossipVaaSendC, g.obsvReqSendC.readC, g.acct, g.gov, diff --git a/node/pkg/p2p/gossip_cutover.go b/node/pkg/p2p/gossip_cutover.go new file mode 100644 index 0000000000..f5f3ebac40 --- /dev/null +++ b/node/pkg/p2p/gossip_cutover.go @@ -0,0 +1,101 @@ +package p2p + +import ( + "fmt" + "strings" + "sync/atomic" + "time" + + "go.uber.org/zap" +) + +// The format of this time is very picky. Please use the exact format specified by cutOverFmtStr! +const mainnetCutOverTimeStr = "" +const testnetCutOverTimeStr = "" +const devnetCutOverTimeStr = "" +const cutOverFmtStr = "2006-01-02T15:04:05-0700" + +// gossipCutoverCompleteFlag indicates if the cutover time has passed, meaning we should publish only on the new topics. +var gossipCutoverCompleteFlag atomic.Bool + +// GossipCutoverComplete returns true if the cutover time has passed, meaning we should publish on the new topic. +func GossipCutoverComplete() bool { + return gossipCutoverCompleteFlag.Load() +} + +// evaluateCutOver determines if the gossip cutover time has passed yet and sets the global flag accordingly. If the time has +// not yet passed, it creates a go routine to wait for that time and then set the flag. +func evaluateGossipCutOver(logger *zap.Logger, networkID string) error { + cutOverTimeStr := getCutOverTimeStr(networkID) + + sco, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, time.Now()) + if err != nil { + return err + } + + gossipCutoverCompleteFlag.Store(sco) + logger.Info("evaluated cutover flag", zap.Bool("cutOverFlag", GossipCutoverComplete()), zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco")) + + if delay != time.Duration(0) { + // Wait for the cut over time and then update the flag. + go func() { + time.Sleep(delay) + logger.Info("time to cut over to new gossip topics", zap.String("cutOverTime", cutOverTimeStr), zap.String("component", "p2pco")) + gossipCutoverCompleteFlag.Store(true) + }() + } + + return nil +} + +// evaluateGossipCutOverImpl performs the actual cut over check. It is a separate function for testing purposes. +func evaluateGossipCutOverImpl(logger *zap.Logger, cutOverTimeStr string, now time.Time) (bool, time.Duration, error) { + if cutOverTimeStr == "" { + return false, 0, nil + } + + cutOverTime, err := time.Parse(cutOverFmtStr, cutOverTimeStr) + if err != nil { + return false, 0, fmt.Errorf(`failed to parse cut over time: %w`, err) + } + + if cutOverTime.Before(now) { + logger.Info("cut over time has passed, should use new gossip topics", zap.String("cutOverTime", cutOverTime.Format(cutOverFmtStr)), zap.String("now", now.Format(cutOverFmtStr)), zap.String("component", "p2pco")) + return true, 0, nil + } + + // If we get here, we need to wait for the cutover and then force a restart. + delay := cutOverTime.Sub(now) + logger.Info("still waiting for cut over time", + zap.Stringer("cutOverTime", cutOverTime), + zap.String("now", now.Format(cutOverFmtStr)), + zap.Stringer("delay", delay), + zap.String("component", "p2pco")) + + return false, delay, nil +} + +// getCutOverTimeStr returns the cut over time string based on the network ID passed in. +func getCutOverTimeStr(networkID string) string { //nolint:unparam + if strings.Contains(networkID, "/mainnet/") { + return mainnetCutOverTimeStr + } + if strings.Contains(networkID, "/testnet/") { + return testnetCutOverTimeStr + } + return devnetCutOverTimeStr +} + +// GossipAttestationMsg is the payload of the `gossipAttestationSendC` channel. This will be used instead of just `[]byte` +// until after the cutover is complete and support for publishing `SignedObservations` is removed. Then this can be deleted. +type GossipAttestationMsg struct { + MsgType GossipAttestationMsgType + Msg []byte +} + +type GossipAttestationMsgType uint8 + +const ( + GossipAttestationSignedObservation GossipAttestationMsgType = iota + GossipAttestationSignedObservationBatch +) diff --git a/node/pkg/p2p/gossip_cutover_test.go b/node/pkg/p2p/gossip_cutover_test.go new file mode 100644 index 0000000000..f398985b08 --- /dev/null +++ b/node/pkg/p2p/gossip_cutover_test.go @@ -0,0 +1,81 @@ +package p2p + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestVerifyCutOverTime(t *testing.T) { + if mainnetCutOverTimeStr != "" { + _, err := time.Parse(cutOverFmtStr, mainnetCutOverTimeStr) + require.NoError(t, err) + } + if testnetCutOverTimeStr != "" { + _, err := time.Parse(cutOverFmtStr, testnetCutOverTimeStr) + require.NoError(t, err) + } + if devnetCutOverTimeStr != "" { + _, err := time.Parse(cutOverFmtStr, devnetCutOverTimeStr) + require.NoError(t, err) + } +} + +func TestGetCutOverTimeStr(t *testing.T) { + assert.Equal(t, mainnetCutOverTimeStr, getCutOverTimeStr("blah/blah/mainnet/blah")) + assert.Equal(t, testnetCutOverTimeStr, getCutOverTimeStr("blah/blah/testnet/blah")) + assert.Equal(t, devnetCutOverTimeStr, getCutOverTimeStr("blah/blah/devnet/blah")) +} + +func TestCutOverDisabled(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000") + require.NoError(t, err) + + cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.NoError(t, err) + assert.False(t, cuttingOver) + assert.Equal(t, time.Duration(0), delay) +} + +func TestCutOverInvalidTime(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "Hello World" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000") + require.NoError(t, err) + + _, _, err = evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.EqualError(t, err, `failed to parse cut over time: parsing time "Hello World" as "2006-01-02T15:04:05-0700": cannot parse "Hello World" as "2006"`) +} + +func TestCutOverAlreadyHappened(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "2023-10-06T18:18:00-0000" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T18:19:00-0000") + require.NoError(t, err) + + cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.NoError(t, err) + assert.True(t, cuttingOver) + assert.Equal(t, time.Duration(0), delay) +} + +func TestCutOverDelayRequired(t *testing.T) { + logger := zap.NewNop() + + cutOverTimeStr := "2023-10-06T18:18:00-0000" + now, err := time.Parse(cutOverFmtStr, "2023-10-06T17:18:00-0000") + require.NoError(t, err) + + cuttingOver, delay, err := evaluateGossipCutOverImpl(logger, cutOverTimeStr, now) + require.NoError(t, err) + assert.False(t, cuttingOver) + assert.Equal(t, time.Duration(60*time.Minute), delay) +} diff --git a/node/pkg/p2p/p2p.go b/node/pkg/p2p/p2p.go index c61cb85ff9..f305ffbd31 100644 --- a/node/pkg/p2p/p2p.go +++ b/node/pkg/p2p/p2p.go @@ -310,6 +310,16 @@ func Run(params *RunParams) func(ctx context.Context) error { logger := supervisor.Logger(ctx) + // Evaluate the gossip cutover time. If it has passed, then the flag will be set to make us publish on the new topics. + // If not, a routine will be started to wait for that time before starting to publish on the new topics. + cutoverErr := evaluateGossipCutOver(logger, params.networkID) + if cutoverErr != nil { + panic(cutoverErr) + } + + // If the cutover has not happened yet, we need to join and subscribe to the VAA topic because it is also the old topic. + needOldTopic := !GossipCutoverComplete() + defer func() { // TODO: Right now we're canceling the root context because it used to be the case that libp2p cannot be cleanly restarted. // But that seems to no longer be the case. We may want to revisit this. See (https://github.com/libp2p/go-libp2p/issues/992) for background. @@ -358,7 +368,7 @@ func Run(params *RunParams) func(ctx context.Context) error { // These will only be non-nil if the application plans to listen for or publish to that topic. var controlPubsubTopic, attestationPubsubTopic, vaaPubsubTopic *pubsub.Topic - var controlSubscription, vaaSubscription, attestationSubscription *pubsub.Subscription + var controlSubscription, attestationSubscription, vaaSubscription *pubsub.Subscription // Set up the control channel. //////////////////////////////////////////////////////////////////// if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqC != nil || params.signedGovCfg != nil || params.signedGovSt != nil { @@ -386,7 +396,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } // Set up the attestation channel. //////////////////////////////////////////////////////////////////// - if params.gossipAttestationSendC != nil || params.obsvC != nil || params.batchObsvC != nil { + if params.gossipAttestationSendC != nil || params.obsvC != nil { attestationTopic := fmt.Sprintf("%s/%s", params.networkID, "attestation") logger.Info("joining the attestation topic", zap.String("topic", attestationTopic)) attestationPubsubTopic, err = ps.Join(attestationTopic) @@ -400,7 +410,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - if params.obsvC != nil || params.batchObsvC != nil { + if params.obsvC != nil { logger.Info("subscribing to the attestation topic", zap.String("topic", attestationTopic)) attestationSubscription, err = attestationPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) if err != nil { @@ -411,7 +421,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } // Set up the VAA channel. //////////////////////////////////////////////////////////////////// - if params.gossipVaaSendC != nil || params.signedInC != nil { + if params.gossipVaaSendC != nil || params.signedInC != nil || needOldTopic { vaaTopic := fmt.Sprintf("%s/%s", params.networkID, "broadcast") logger.Info("joining the vaa topic", zap.String("topic", vaaTopic)) vaaPubsubTopic, err = ps.Join(vaaTopic) @@ -425,7 +435,7 @@ func Run(params *RunParams) func(ctx context.Context) error { } }() - if params.signedInC != nil { + if params.signedInC != nil || needOldTopic { logger.Info("subscribing to the vaa topic", zap.String("topic", vaaTopic)) vaaSubscription, err = vaaPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE)) if err != nil { @@ -573,10 +583,18 @@ func Run(params *RunParams) func(ctx context.Context) error { return b }() - err = controlPubsubTopic.Publish(ctx, b) - p2pMessagesSent.WithLabelValues("control").Inc() - if err != nil { - logger.Warn("failed to publish heartbeat message", zap.Error(err)) + if GossipCutoverComplete() { + err = controlPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Warn("failed to publish heartbeat message", zap.Error(err)) + } + } else if vaaPubsubTopic != nil { + err = vaaPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("old_control").Inc() + if err != nil { + logger.Warn("failed to publish heartbeat message to old topic", zap.Error(err)) + } } p2pHeartbeatsSent.Inc() @@ -595,20 +613,36 @@ func Run(params *RunParams) func(ctx context.Context) error { return case msg := <-params.gossipControlSendC: if controlPubsubTopic != nil { - err := controlPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("control").Inc() - if err != nil { - logger.Error("failed to publish message from control queue", zap.Error(err)) + if GossipCutoverComplete() { + err := controlPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Error("failed to publish message from control queue", zap.Error(err)) + } + } else if vaaPubsubTopic != nil { + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("old_control").Inc() + if err != nil { + logger.Error("failed to publish message from control queue to old topic", zap.Error(err)) + } } } else { logger.Error("received a message on the control queue when we do not have a control topic") } case msg := <-params.gossipAttestationSendC: if attestationPubsubTopic != nil { - err := attestationPubsubTopic.Publish(ctx, msg) - p2pMessagesSent.WithLabelValues("attestation").Inc() - if err != nil { - logger.Error("failed to publish message from attestation queue", zap.Error(err)) + if GossipCutoverComplete() { + err := attestationPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("attestation").Inc() + if err != nil { + logger.Error("failed to publish message from attestation queue", zap.Error(err)) + } + } else if vaaPubsubTopic != nil { + err := vaaPubsubTopic.Publish(ctx, msg) + p2pMessagesSent.WithLabelValues("old_attestation").Inc() + if err != nil { + logger.Error("failed to publish message from attestation queue to old topic", zap.Error(err)) + } } } else { logger.Error("received a message on the attestation queue when we do not have an attestation topic") @@ -657,12 +691,20 @@ func Run(params *RunParams) func(ctx context.Context) error { params.obsvReqC <- msg } - err = controlPubsubTopic.Publish(ctx, b) - p2pMessagesSent.WithLabelValues("control").Inc() - if err != nil { - logger.Error("failed to publish observation request", zap.Error(err)) - } else { - logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) + if GossipCutoverComplete() { + err = controlPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("control").Inc() + if err != nil { + logger.Error("failed to publish observation request", zap.Error(err)) + } else { + logger.Info("published signed observation request", zap.Any("signed_observation_request", sReq)) + } + } else if vaaPubsubTopic != nil { + err = vaaPubsubTopic.Publish(ctx, b) + p2pMessagesSent.WithLabelValues("old_control").Inc() + if err != nil { + logger.Error("failed to publish observation request to old topic", zap.Error(err)) + } } } } @@ -677,14 +719,14 @@ func Run(params *RunParams) func(ctx context.Context) error { for { envelope, err := controlSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled if err != nil { - errC <- fmt.Errorf("failed to receive pubsub message on control subscription: %w", err) + errC <- fmt.Errorf("failed to receive pubsub message on control topic: %w", err) return } var msg gossipv1.GossipMessage err = proto.Unmarshal(envelope.Data, &msg) if err != nil { - logger.Info("received invalid message", + logger.Info("received invalid message on control topic", zap.Binary("data", envelope.Data), zap.String("from", envelope.GetFrom().String())) p2pMessagesReceived.WithLabelValues("invalid").Inc() @@ -693,14 +735,14 @@ func Run(params *RunParams) func(ctx context.Context) error { if envelope.GetFrom() == h.ID() { if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) + logger.Debug("received message from ourselves on control topic, ignoring", zap.Any("payload", msg.Message)) } p2pMessagesReceived.WithLabelValues("loopback").Inc() continue } if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message", + logger.Debug("received message on control topic", zap.Any("payload", msg.Message), zap.Binary("raw", envelope.Data), zap.String("from", envelope.GetFrom().String())) @@ -829,14 +871,14 @@ func Run(params *RunParams) func(ctx context.Context) error { for { envelope, err := attestationSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled if err != nil { - errC <- fmt.Errorf("failed to receive pubsub message on attestation subscription: %w", err) + errC <- fmt.Errorf("failed to receive pubsub message on attestation topic: %w", err) return } var msg gossipv1.GossipMessage err = proto.Unmarshal(envelope.Data, &msg) if err != nil { - logger.Info("received invalid message", + logger.Info("received invalid message on attestation topic", zap.Binary("data", envelope.Data), zap.String("from", envelope.GetFrom().String())) p2pMessagesReceived.WithLabelValues("invalid").Inc() @@ -845,14 +887,14 @@ func Run(params *RunParams) func(ctx context.Context) error { if envelope.GetFrom() == h.ID() { if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) + logger.Debug("received message from ourselves on attestation topic, ignoring", zap.Any("payload", msg.Message)) } p2pMessagesReceived.WithLabelValues("loopback").Inc() continue } if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message", + logger.Debug("received message on attestation topic", zap.Any("payload", msg.Message), zap.Binary("raw", envelope.Data), zap.String("from", envelope.GetFrom().String())) @@ -861,26 +903,15 @@ func Run(params *RunParams) func(ctx context.Context) error { switch m := msg.Message.(type) { case *gossipv1.GossipMessage_SignedObservation: if params.obsvC != nil { - if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, params.obsvC); err == nil { + if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvC); err == nil { p2pMessagesReceived.WithLabelValues("observation").Inc() } else { if params.components.WarnChannelOverflow { - logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) + logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservation.Addr))) } p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() } } - case *gossipv1.GossipMessage_SignedObservationBatch: - if params.batchObsvC != nil { - if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, params.batchObsvC); err == nil { - p2pMessagesReceived.WithLabelValues("batch_observation").Inc() - } else { - if params.components.WarnChannelOverflow { - logger.Warn("Ignoring SignedObservationBatch because batchObsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr))) - } - p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc() - } - } default: p2pMessagesReceived.WithLabelValues("unknown").Inc() logger.Warn("received unknown message type on attestation topic (running outdated software?)", @@ -898,15 +929,14 @@ func Run(params *RunParams) func(ctx context.Context) error { for { envelope, err := vaaSubscription.Next(ctx) // Note: sub.Next(ctx) will return an error once ctx is canceled if err != nil { - errC <- fmt.Errorf("failed to receive pubsub message on vaa subscription: %w", err) - errC <- fmt.Errorf("failed to receive pubsub message on vaa subscription: %w", err) + errC <- fmt.Errorf("failed to receive pubsub message on vaa topic: %w", err) return } var msg gossipv1.GossipMessage err = proto.Unmarshal(envelope.Data, &msg) if err != nil { - logger.Info("received invalid message", + logger.Info("received invalid message on vaa topic", zap.Binary("data", envelope.Data), zap.String("from", envelope.GetFrom().String())) p2pMessagesReceived.WithLabelValues("invalid").Inc() @@ -915,105 +945,94 @@ func Run(params *RunParams) func(ctx context.Context) error { if envelope.GetFrom() == h.ID() { if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message from ourselves, ignoring", zap.Any("payload", msg.Message)) + logger.Debug("received message from ourselves on vaa topic, ignoring", zap.Any("payload", msg.Message)) } p2pMessagesReceived.WithLabelValues("loopback").Inc() continue } if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("received message", + logger.Debug("received message on vaa topic", zap.Any("payload", msg.Message), zap.Binary("raw", envelope.Data), zap.String("from", envelope.GetFrom().String())) } switch m := msg.Message.(type) { - // case *gossipv1.GossipMessage_SignedHeartbeat: - // s := m.SignedHeartbeat - // gs := gst.Get() - // if gs == nil { - // // No valid guardian set yet - dropping heartbeat - // logger.Log(components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", - // zap.Any("value", s), - // zap.String("from", envelope.GetFrom().String())) - // break - // } - // if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, gst, disableHeartbeatVerify); err != nil { - // p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() - // logger.Log(components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", - // zap.Error(err), - // zap.Any("payload", msg.Message), - // zap.Any("value", s), - // zap.Binary("raw", envelope.Data), - // zap.String("from", envelope.GetFrom().String())) - // } else { - // p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc() - // logger.Log(components.SignedHeartbeatLogLevel, "valid signed heartbeat received", - // zap.Any("value", heartbeat), - // zap.String("from", envelope.GetFrom().String())) - - // func() { - // if len(heartbeat.P2PNodeId) != 0 { - // components.ProtectedHostByGuardianKeyLock.Lock() - // defer components.ProtectedHostByGuardianKeyLock.Unlock() - // var peerId peer.ID - // if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil { - // logger.Error("p2p_node_id_in_heartbeat_invalid", - // zap.Any("payload", msg.Message), - // zap.Any("value", s), - // zap.Binary("raw", envelope.Data), - // zap.String("from", envelope.GetFrom().String())) - // } else { - // guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) - // if gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(gk.PublicKey) { - // prevPeerId, ok := components.ProtectedHostByGuardianKey[guardianAddr] - // if ok { - // if prevPeerId != peerId { - // logger.Info("p2p_guardian_peer_changed", - // zap.String("guardian_addr", guardianAddr.String()), - // zap.String("prevPeerId", prevPeerId.String()), - // zap.String("newPeerId", peerId.String()), - // ) - // components.ConnMgr.Unprotect(prevPeerId, "heartbeat") - // components.ConnMgr.Protect(peerId, "heartbeat") - // components.ProtectedHostByGuardianKey[guardianAddr] = peerId - // } - // } else { - // components.ConnMgr.Protect(peerId, "heartbeat") - // components.ProtectedHostByGuardianKey[guardianAddr] = peerId - // } - // } - // } - // } else { - // if logger.Level().Enabled(zapcore.DebugLevel) { - // logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName)) - // } - // } - // }() - // } - // case *gossipv1.GossipMessage_SignedObservation: - // if obsvC != nil { - // if err := common.PostMsgWithTimestamp[gossipv1.SignedObservation](m.SignedObservation, obsvC); err == nil { - // p2pMessagesReceived.WithLabelValues("observation").Inc() - // } else { - // if components.WarnChannelOverflow { - // logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) - // } - // p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() - // } - // } - // case *gossipv1.GossipMessage_SignedObservationBatch: - // if batchObsvC != nil { - // if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, batchObsvC); err == nil { - // p2pMessagesReceived.WithLabelValues("batch_observation").Inc() - // } else { - // if components.WarnChannelOverflow { - // logger.Warn("Ignoring SignedObservationBatch because batchObsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr))) - // } - // p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc() - // } - // } + case *gossipv1.GossipMessage_SignedHeartbeat: // TODO: Get rid of this after the cutover. + s := m.SignedHeartbeat + gs := params.gst.Get() + if gs == nil { + // No valid guardian set yet - dropping heartbeat + logger.Log(params.components.SignedHeartbeatLogLevel, "skipping heartbeat - no guardian set", + zap.Any("value", s), + zap.String("from", envelope.GetFrom().String())) + break + } + if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, params.gst, params.disableHeartbeatVerify); err != nil { + p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() + logger.Log(params.components.SignedHeartbeatLogLevel, "invalid signed heartbeat received", + zap.Error(err), + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } else { + p2pMessagesReceived.WithLabelValues("valid_heartbeat").Inc() + logger.Log(params.components.SignedHeartbeatLogLevel, "valid signed heartbeat received", + zap.Any("value", heartbeat), + zap.String("from", envelope.GetFrom().String())) + + func() { + if len(heartbeat.P2PNodeId) != 0 { + params.components.ProtectedHostByGuardianKeyLock.Lock() + defer params.components.ProtectedHostByGuardianKeyLock.Unlock() + var peerId peer.ID + if err = peerId.Unmarshal(heartbeat.P2PNodeId); err != nil { + logger.Error("p2p_node_id_in_heartbeat_invalid", + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } else { + guardianAddr := eth_common.BytesToAddress(s.GuardianAddr) + if params.gk == nil || guardianAddr != ethcrypto.PubkeyToAddress(params.gk.PublicKey) { + prevPeerId, ok := params.components.ProtectedHostByGuardianKey[guardianAddr] + if ok { + if prevPeerId != peerId { + logger.Info("p2p_guardian_peer_changed", + zap.String("guardian_addr", guardianAddr.String()), + zap.String("prevPeerId", prevPeerId.String()), + zap.String("newPeerId", peerId.String()), + ) + params.components.ConnMgr.Unprotect(prevPeerId, "heartbeat") + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId + } + } else { + params.components.ConnMgr.Protect(peerId, "heartbeat") + params.components.ProtectedHostByGuardianKey[guardianAddr] = peerId + } + } + } + } else { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("p2p_node_id_not_in_heartbeat", zap.Error(err), zap.Any("payload", heartbeat.NodeName)) + } + } + }() + } + case *gossipv1.GossipMessage_SignedObservation: // TODO: Get rid of this after the cutover. + if params.obsvC != nil { + if err := common.PostMsgWithTimestamp(m.SignedObservation, params.obsvC); err == nil { + p2pMessagesReceived.WithLabelValues("observation").Inc() + } else { + if params.components.WarnChannelOverflow { + logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash))) + } + p2pReceiveChannelOverflow.WithLabelValues("observation").Inc() + } + } case *gossipv1.GossipMessage_SignedVaaWithQuorum: if params.signedInC != nil { select { @@ -1031,48 +1050,48 @@ func Run(params *RunParams) func(ctx context.Context) error { p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Inc() } } - // case *gossipv1.GossipMessage_SignedObservationRequest: - // if obsvReqC != nil { - // s := m.SignedObservationRequest - // gs := gst.Get() - // if gs == nil { - // if logger.Level().Enabled(zapcore.DebugLevel) { - // logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) - // } - // break - // } - // r, err := processSignedObservationRequest(s, gs) - // if err != nil { - // p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc() - // if logger.Level().Enabled(zapcore.DebugLevel) { - // logger.Debug("invalid signed observation request received", - // zap.Error(err), - // zap.Any("payload", msg.Message), - // zap.Any("value", s), - // zap.Binary("raw", envelope.Data), - // zap.String("from", envelope.GetFrom().String())) - // } - // } else { - // if logger.Level().Enabled(zapcore.DebugLevel) { - // logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String())) - // } - - // select { - // case obsvReqC <- r: - // p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() - // default: - // p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() - // } - // } - // } - // case *gossipv1.GossipMessage_SignedChainGovernorConfig: - // if signedGovCfg != nil { - // signedGovCfg <- m.SignedChainGovernorConfig - // } - // case *gossipv1.GossipMessage_SignedChainGovernorStatus: - // if signedGovSt != nil { - // signedGovSt <- m.SignedChainGovernorStatus - // } + case *gossipv1.GossipMessage_SignedObservationRequest: // TODO: Get rid of this after the cutover. + if params.obsvReqC != nil { + s := m.SignedObservationRequest + gs := params.gst.Get() + if gs == nil { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("dropping SignedObservationRequest - no guardian set", zap.Any("value", s), zap.String("from", envelope.GetFrom().String())) + } + break + } + r, err := processSignedObservationRequest(s, gs) + if err != nil { + p2pMessagesReceived.WithLabelValues("invalid_signed_observation_request").Inc() + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("invalid signed observation request received", + zap.Error(err), + zap.Any("payload", msg.Message), + zap.Any("value", s), + zap.Binary("raw", envelope.Data), + zap.String("from", envelope.GetFrom().String())) + } + } else { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("valid signed observation request received", zap.Any("value", r), zap.String("from", envelope.GetFrom().String())) + } + + select { + case params.obsvReqC <- r: + p2pMessagesReceived.WithLabelValues("signed_observation_request").Inc() + default: + p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Inc() + } + } + } + case *gossipv1.GossipMessage_SignedChainGovernorConfig: // TODO: Get rid of this after the cutover. + if params.signedGovCfg != nil { + params.signedGovCfg <- m.SignedChainGovernorConfig + } + case *gossipv1.GossipMessage_SignedChainGovernorStatus: // TODO: Get rid of this after the cutover. + if params.signedGovSt != nil { + params.signedGovSt <- m.SignedChainGovernorStatus + } default: p2pMessagesReceived.WithLabelValues("unknown").Inc() logger.Warn("received unknown message type on vaa topic (running outdated software?)", diff --git a/node/pkg/p2p/run_params.go b/node/pkg/p2p/run_params.go index f5a1ac310b..bb35992c4b 100644 --- a/node/pkg/p2p/run_params.go +++ b/node/pkg/p2p/run_params.go @@ -42,26 +42,23 @@ type ( disableHeartbeatVerify bool // The following options are guardian specific. Set with `WithGuardianOptions`. - nodeName string - gk *ecdsa.PrivateKey - gossipSendC chan []byte - obsvReqSendC <-chan *gossipv1.ObservationRequest - acct *accountant.Accountant - gov *governor.ChainGovernor - components *Components - ibcFeaturesFunc func() string - gatewayRelayerEnabled bool - ccqEnabled bool - signedQueryReqC chan<- *gossipv1.SignedQueryRequest - queryResponseReadC <-chan *query.QueryResponsePublication - ccqBootstrapPeers string - ccqPort uint - ccqAllowedPeers string - - // This is junk: + nodeName string + gk *ecdsa.PrivateKey gossipControlSendC chan []byte gossipAttestationSendC chan []byte gossipVaaSendC chan []byte + obsvReqSendC <-chan *gossipv1.ObservationRequest + acct *accountant.Accountant + gov *governor.ChainGovernor + components *Components + ibcFeaturesFunc func() string + gatewayRelayerEnabled bool + ccqEnabled bool + signedQueryReqC chan<- *gossipv1.SignedQueryRequest + queryResponseReadC <-chan *query.QueryResponsePublication + ccqBootstrapPeers string + ccqPort uint + ccqAllowedPeers string } // RunOpt is used to specify optional parameters. @@ -155,7 +152,9 @@ func WithGuardianOptions( obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], signedInC chan<- *gossipv1.SignedVAAWithQuorum, obsvReqC chan<- *gossipv1.ObservationRequest, - gossipSendC chan []byte, + gossipControlSendC chan []byte, + gossipAttestationSendC chan []byte, + gossipVaaSendC chan []byte, obsvReqSendC <-chan *gossipv1.ObservationRequest, acct *accountant.Accountant, gov *governor.ChainGovernor, @@ -176,7 +175,9 @@ func WithGuardianOptions( p.obsvC = obsvC p.signedInC = signedInC p.obsvReqC = obsvReqC - p.gossipSendC = gossipSendC + p.gossipControlSendC = gossipControlSendC + p.gossipAttestationSendC = gossipAttestationSendC + p.gossipVaaSendC = gossipVaaSendC p.obsvReqSendC = obsvReqSendC p.acct = acct p.gov = gov diff --git a/node/pkg/p2p/run_params_test.go b/node/pkg/p2p/run_params_test.go index aebebd952c..e9432e8e2d 100644 --- a/node/pkg/p2p/run_params_test.go +++ b/node/pkg/p2p/run_params_test.go @@ -143,7 +143,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42) signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42) obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42) - gossipSendC := make(chan []byte, 42) + gossipControlSendC := make(chan []byte, 42) + gossipAttestationSendC := make(chan []byte, 42) + gossipVaaSendC := make(chan []byte, 42) obsvReqSendC := make(<-chan *gossipv1.ObservationRequest, 42) acct := &accountant.Accountant{} @@ -172,7 +174,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { obsvC, signedInC, obsvReqC, - gossipSendC, + gossipControlSendC, + gossipAttestationSendC, + gossipVaaSendC, obsvReqSendC, acct, gov, @@ -194,7 +198,9 @@ func TestRunParamsWithGuardianOptions(t *testing.T) { assert.Equal(t, obsvC, params.obsvC) assert.Equal(t, signedInC, params.signedInC) assert.Equal(t, obsvReqC, params.obsvReqC) - assert.Equal(t, gossipSendC, params.gossipSendC) + assert.Equal(t, gossipControlSendC, params.gossipControlSendC) + assert.Equal(t, gossipAttestationSendC, params.gossipAttestationSendC) + assert.Equal(t, gossipVaaSendC, params.gossipVaaSendC) assert.Equal(t, obsvReqSendC, params.obsvReqSendC) assert.Equal(t, acct, params.acct) assert.Equal(t, gov, params.gov) diff --git a/node/pkg/p2p/watermark_test.go b/node/pkg/p2p/watermark_test.go index 62cc28aa63..c8e8c98286 100644 --- a/node/pkg/p2p/watermark_test.go +++ b/node/pkg/p2p/watermark_test.go @@ -184,7 +184,9 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) { g.obsvC, g.signedInC, g.obsvReqC, + g.controlSendC, g.attestationSendC, + g.vaaSendC, g.obsvReqSendC, g.acct, g.gov, diff --git a/node/pkg/processor/cleanup.go b/node/pkg/processor/cleanup.go index a36017750e..014b76344b 100644 --- a/node/pkg/processor/cleanup.go +++ b/node/pkg/processor/cleanup.go @@ -228,10 +228,8 @@ func (p *Processor) handleCleanup(ctx context.Context) { if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil { p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err)) } - p.postObservationToBatch(s.ourObs) - if s.ourMsg != nil { - p.gossipAttestationSendC <- s.ourMsg // TODO: Get rid of this - } + + p.gossipAttestationSendC <- s.ourMsg s.retryCtr++ s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr)) aggregationStateRetries.Inc() diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index ac94d046bc..c45fae1d57 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -74,12 +74,13 @@ func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum, // and assembles and submits a valid VAA if possible. -func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { +func (p *Processor) handleObservation(obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { // SECURITY: at this point, observations received from the p2p network are fully untrusted (all fields!) // // Note that observations are never tied to the (verified) p2p identity key - the p2p network // identity is completely decoupled from the guardian identity, p2p is just transport. + start := time.Now() observationsReceivedTotal.Inc() m := obs.Msg @@ -87,6 +88,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW s := p.state.signatures[hash] if s != nil && s.submitted { // already submitted; ignoring additional signatures for it. + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) return } @@ -236,6 +238,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW zap.String("digest", hash), ) } + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) return } @@ -279,6 +282,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW } observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds())) + timeToHandleObservation.Observe(float64(time.Since(start).Microseconds())) } func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gossipv1.SignedVAAWithQuorum) { diff --git a/node/pkg/processor/processor.go b/node/pkg/processor/processor.go index 1c76b67d6b..3479b4c676 100644 --- a/node/pkg/processor/processor.go +++ b/node/pkg/processor/processor.go @@ -165,7 +165,7 @@ var ( prometheus.HistogramOpts{ Name: "wormhole_time_to_handle_observation_us", Help: "Latency histogram for total time to handle observation on an observation", - Buckets: []float64{100.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 5000.0, 10_000.0}, + Buckets: []float64{25.0, 50.0, 75.0, 100.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 5000.0, 10_000.0}, }) timeToHandleQuorum = promauto.NewHistogram( @@ -174,31 +174,8 @@ var ( Help: "Latency histogram for total time to handle quorum on an observation", Buckets: []float64{25.0, 50.0, 75.0, 100.0, 200.0, 300.0, 400.0, 500.0, 750.0, 1000.0, 5000.0, 10_000.0}, }) - - batchObservationChanDelay = promauto.NewHistogram( - prometheus.HistogramOpts{ - Name: "wormhole_batch_observation_channel_delay_us", - Help: "Latency histogram for delay of batched observations in channel", - Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, - }) - - batchObservationTotalDelay = promauto.NewHistogram( - prometheus.HistogramOpts{ - Name: "wormhole_batch_observation_total_delay_us", - Help: "Latency histogram for total time to process batched observations", - Buckets: []float64{10.0, 20.0, 50.0, 100.0, 1000.0, 5000.0, 10_000.0, 100_000.0, 1_000_000.0, 10_000_000.0, 100_000_000.0, 1_000_000_000.0}, - }) - - batchObservationChannelOverflow = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "wormhole_batch_observation_channel_overflow", - Help: "Total number of times a write to the batch observation publish channel failed", - }, []string{"channel"}) ) -// batchObsvPubChanSize specifies the size of the channel used to publish observation batches. Allow five seconds worth. -const batchObsvPubChanSize = p2p.MaxObservationBatchSize * 5 - func NewProcessor( ctx context.Context, db *db.Database, @@ -298,7 +275,7 @@ func (p *Processor) Run(ctx context.Context) error { p.handleMessage(k) case m := <-p.obsvC: observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds())) - p.handleObservation(ctx, m) + p.handleObservation(m) case m := <-p.signedInC: p.handleInboundSignedVAAWithQuorum(ctx, m) case <-cleanup.C: