Skip to content

Commit

Permalink
Node: Observation batching
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Jun 10, 2024
1 parent 0e2ba62 commit fd94f23
Show file tree
Hide file tree
Showing 13 changed files with 714 additions and 244 deletions.
1 change: 1 addition & 0 deletions node/cmd/spy/spy.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ func runSpy(cmd *cobra.Command, args []string) {
if err := supervisor.Run(ctx,
"p2p",
p2p.Run(nil, // Ignore incoming observations.
nil, // Ignore batch observations.
nil, // Ignore observation requests.
nil,
sendC,
Expand Down
8 changes: 8 additions & 0 deletions node/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ const (
// One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s
inboundObservationBufferSize = 10000

// inboundBatchObservationBufferSize configures the size of the batchObsvC channel that contains batches of observations from other Guardians.
// Since a batch contains many observations, the guardians should not be publishing too many of these, so we can keep the channel small. With
// 19 guardians, we would expect 19 messages per second during normal operations. This gives us plenty of extra room.
inboundBatchObservationBufferSize = 100

// inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians.
// One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't.
// So in the worst case the entire queue can be processed in 2s.
Expand Down Expand Up @@ -73,6 +78,8 @@ type G struct {
gossipSendC chan []byte
// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// Inbound observation batches.
batchObsvC channelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]]
// Finalized guardian observations aggregated across all chains
msgC channelPair[*common.MessagePublication]
// Ethereum incoming guardian set updates
Expand Down Expand Up @@ -111,6 +118,7 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
// Setup various channels...
g.gossipSendC = make(chan []byte, gossipSendBufferSize)
g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize)
g.batchObsvC = makeChannelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]](inboundBatchObservationBufferSize)
g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
Expand Down
2 changes: 2 additions & 0 deletions node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers,

g.runnables["p2p"] = p2p.Run(
g.obsvC,
g.batchObsvC.writeC,
g.obsvReqC.writeC,
g.obsvReqSendC.readC,
g.gossipSendC,
Expand Down Expand Up @@ -556,6 +557,7 @@ func GuardianOptionProcessor() *GuardianOption {
g.setC.readC,
g.gossipSendC,
g.obsvC,
g.batchObsvC.readC,
g.obsvReqSendC.writeC,
g.signedInC.readC,
g.gk,
Expand Down
20 changes: 19 additions & 1 deletion node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ const P2P_SUBSCRIPTION_BUFFER_SIZE = 1024
// TESTNET_BOOTSTRAP_DHI configures how many nodes may connect to the testnet bootstrap node. This number should not exceed HighWaterMark.
const TESTNET_BOOTSTRAP_DHI = 350

// MaxObservationBatchSize is the maximum number of observations that will fit in a single `SignedObservationBatch` message.
const MaxObservationBatchSize = 4000

// MaxObservationBatchDelay is the longest we will wait before publishing any queued up observations.
const MaxObservationBatchDelay = time.Second

var (
p2pHeartbeatsSent = promauto.NewCounter(
prometheus.CounterOpts{
Expand Down Expand Up @@ -295,6 +301,7 @@ func NewHost(logger *zap.Logger, ctx context.Context, networkID string, bootstra

func Run(
obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
obsvReqC chan<- *gossipv1.ObservationRequest,
obsvReqSendC <-chan *gossipv1.ObservationRequest,
gossipSendC chan []byte,
Expand Down Expand Up @@ -706,11 +713,22 @@ func Run(
p2pMessagesReceived.WithLabelValues("observation").Inc()
} else {
if components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservation because obsvC full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash)))
logger.Warn("Ignoring SignedObservation because obsvC is full", zap.String("hash", hex.EncodeToString(m.SignedObservation.Hash)))
}
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedObservationBatch:
if batchObsvC != nil {
if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, batchObsvC); err == nil {
p2pMessagesReceived.WithLabelValues("batch_observation").Inc()
} else {
if components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservationBatch because batchObsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr)))
}
p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedVaaWithQuorum:
if signedInC != nil {
select {
Expand Down
3 changes: 3 additions & 0 deletions node/pkg/p2p/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const LOCAL_P2P_PORTRANGE_START = 11000
type G struct {
// arguments passed to p2p.New
obsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]
batchObsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]
obsvReqC chan *gossipv1.ObservationRequest
obsvReqSendC chan *gossipv1.ObservationRequest
sendC chan []byte
Expand Down Expand Up @@ -63,6 +64,7 @@ func NewG(t *testing.T, nodeName string) *G {

g := &G{
obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs),
batchObsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], cs),
obsvReqC: make(chan *gossipv1.ObservationRequest, cs),
obsvReqSendC: make(chan *gossipv1.ObservationRequest, cs),
sendC: make(chan []byte, cs),
Expand Down Expand Up @@ -166,6 +168,7 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
t.Helper()
supervisor.New(ctx, zap.L(),
Run(g.obsvC,
g.batchObsvC,
g.obsvReqC,
g.obsvReqSendC,
g.sendC,
Expand Down
71 changes: 71 additions & 0 deletions node/pkg/processor/batch_obs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//nolint:unparam // this will be refactored in https://github.com/wormhole-foundation/wormhole/pull/1953
package processor

import (
"context"
"errors"
"fmt"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"google.golang.org/protobuf/proto"
)

// postObservationToBatch posts an individual observation to the batch processor.
func (p *Processor) postObservationToBatch(obs *gossipv1.Observation) {
select {
case p.batchObsvPubC <- obs:
default:
batchObservationChannelOverflow.WithLabelValues("batchObsvPub").Inc()
}
}

// batchProcessor is the entry point for the batch processor, which is responsible for taking individual
// observations and publishing them as batches. It limits the size of a batch and the delay before publishing.
func (p *Processor) batchProcessor(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
default:
if err := p.handleBatch(ctx); err != nil {
return err
}
}
}
}

// handleBatch reads observations from the channel, either until a timeout occurs or the batch is full.
// Then it builds a `SendObservationBatch` gossip message and posts it to p2p.
func (p *Processor) handleBatch(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, p2p.MaxObservationBatchDelay)
defer cancel()

observations, err := common.ReadFromChannelWithTimeout(ctx, p.batchObsvPubC, p2p.MaxObservationBatchSize)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("failed to read observations from the internal observation batch channel: %w", err)
}

if len(observations) == 0 {
return nil
}

batchMsg := gossipv1.SignedObservationBatch{
Addr: p.ourAddr.Bytes(),
Observations: observations,
}

gossipMsg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservationBatch{SignedObservationBatch: &batchMsg}}
msg, err := proto.Marshal(&gossipMsg)
if err != nil {
panic(err)
}

select {
case p.gossipSendC <- msg:
default:
batchObservationChannelOverflow.WithLabelValues("gossipSend").Inc()
}
return nil
}
85 changes: 85 additions & 0 deletions node/pkg/processor/batch_obs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package processor

import (
"bytes"
"testing"
"time"

"github.com/certusone/wormhole/node/pkg/devnet"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"google.golang.org/protobuf/proto"
)

func getUniqueVAA(seqNo uint64) vaa.VAA {
var payload = []byte{97, 97, 97, 97, 97, 97}
var governanceEmitter = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4}

vaa := vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: time.Unix(0, 0),
Nonce: uint32(1),
Sequence: seqNo,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: governanceEmitter,
Payload: payload,
}

return vaa
}

func TestMarshalSignedObservationBatch(t *testing.T) {
gk := devnet.InsecureDeterministicEcdsaKeyByIndex(crypto.S256(), uint64(0))
require.NotNil(t, gk)

NumObservations := uint64(p2p.MaxObservationBatchSize)
observations := make([]*gossipv1.Observation, 0, NumObservations)
for seqNo := uint64(1); seqNo <= NumObservations; seqNo++ {
vaa := getUniqueVAA(seqNo)
digest := vaa.SigningDigest()
sig, err := crypto.Sign(digest.Bytes(), gk)
require.NoError(t, err)

observations = append(observations, &gossipv1.Observation{
Hash: digest.Bytes(),
Signature: sig,
TxHash: ethcommon.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
MessageId: vaa.MessageID(),
})
}

obsBuf, err := proto.Marshal(observations[0])
require.NoError(t, err)
assert.Equal(t, 205, len(obsBuf))

batch := gossipv1.SignedObservationBatch{
Addr: crypto.PubkeyToAddress(gk.PublicKey).Bytes(),
Observations: observations,
}

buf, err := proto.Marshal((&batch))
require.NoError(t, err)
assert.Greater(t, pubsub.DefaultMaxMessageSize, len(buf))

var batch2 gossipv1.SignedObservationBatch
err = proto.Unmarshal(buf, &batch2)
require.NoError(t, err)

assert.True(t, bytes.Equal(batch.Addr, batch2.Addr))
assert.Equal(t, len(batch.Observations), len(batch2.Observations))
for idx := range batch2.Observations {
assert.True(t, bytes.Equal(batch.Observations[idx].Hash, batch2.Observations[idx].Hash))
assert.True(t, bytes.Equal(batch.Observations[idx].Signature, batch2.Observations[idx].Signature))
assert.True(t, bytes.Equal(batch.Observations[idx].TxHash, batch2.Observations[idx].TxHash))
assert.Equal(t, batch.Observations[idx].MessageId, batch2.Observations[idx].MessageId)
}
}
34 changes: 20 additions & 14 deletions node/pkg/processor/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
ethcommon "github.com/ethereum/go-ethereum/common"
"google.golang.org/protobuf/proto"

node_common "github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
Expand Down Expand Up @@ -40,13 +39,27 @@ func (p *Processor) broadcastSignature(
signature []byte,
txhash []byte,
) {
addr := p.ourAddr.Bytes()
digest := o.SigningDigest()
msgId := o.MessageID()

// Post the observation to the batch publisher.
ourObs := &gossipv1.Observation{
Hash: digest.Bytes(),
Signature: signature,
TxHash: txhash,
MessageId: msgId,
}

p.postObservationToBatch(ourObs)

// Post the observation in its own gossip message. TODO: Remove this once everyone has migrated to batches.
obsv := gossipv1.SignedObservation{
Addr: p.ourAddr.Bytes(),
Addr: addr,
Hash: digest.Bytes(),
Signature: signature,
TxHash: txhash,
MessageId: o.MessageID(),
MessageId: msgId,
}

w := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservation{SignedObservation: &obsv}}
Expand All @@ -57,7 +70,7 @@ func (p *Processor) broadcastSignature(
}

// Broadcast the observation.
p.gossipSendC <- msg
p.gossipSendC <- msg // TODO: Get rid of this
observationsBroadcast.Inc()

hash := hex.EncodeToString(digest.Bytes())
Expand All @@ -72,21 +85,14 @@ func (p *Processor) broadcastSignature(
}

p.state.signatures[hash].ourObservation = o
p.state.signatures[hash].ourObs = ourObs
p.state.signatures[hash].ourMsg = msg
p.state.signatures[hash].txHash = txhash
p.state.signatures[hash].source = o.GetEmitterChain().String()
p.state.signatures[hash].gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs

// Fast path for our own signature
// send to obsvC directly if there is capacity, otherwise do it in a go routine.
// We can't block here because the same process would be responsible for reading from obsvC.
om := node_common.CreateMsgWithTimestamp[gossipv1.SignedObservation](&obsv)
select {
case p.obsvC <- om:
default:
go func() { p.obsvC <- om }()
}

// Post the signature to ourselves.
p.handleSingleObservation(addr, ourObs)
observationsPostedInternally.Inc()
}

Expand Down
11 changes: 7 additions & 4 deletions node/pkg/processor/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ func (p *Processor) handleCleanup(ctx context.Context) {
}
delete(p.state.signatures, hash)
aggregationStateExpiration.Inc()
case !s.submitted && ((s.ourMsg != nil && delta > retryLimitOurs) || (s.ourMsg == nil && delta > retryLimitNotOurs)):
case !s.submitted && ((s.ourObs != nil && delta > retryLimitOurs) || (s.ourObs == nil && delta > retryLimitNotOurs)):
// Clearly, this horse is dead and continued beatings won't bring it closer to quorum.
p.logger.Info("expiring unsubmitted observation after exhausting retries",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Bool("weObserved", s.ourMsg != nil),
zap.Bool("weObserved", s.ourObs != nil),
)
delete(p.state.signatures, hash)
aggregationStateTimeout.Inc()
Expand All @@ -172,7 +172,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
// sig. If we do not have an observation, it means we either never observed it, or it got
// revived by a malfunctioning guardian node, in which case, we can't do anything about it
// and just delete it to keep our state nice and lean.
if s.ourMsg != nil {
if s.ourObs != nil {
// Unreliable observations cannot be resubmitted and can be considered failed after 5 minutes
if !s.ourObservation.IsReliable() {
p.logger.Info("expiring unsubmitted unreliable observation",
Expand Down Expand Up @@ -228,7 +228,10 @@ func (p *Processor) handleCleanup(ctx context.Context) {
if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil {
p.logger.Warn("failed to broadcast re-observation request", zap.String("message_id", s.LoggingID()), zap.Error(err))
}
p.gossipSendC <- s.ourMsg
p.postObservationToBatch(s.ourObs)
if s.ourMsg != nil {
p.gossipSendC <- s.ourMsg // TODO: Get rid of this
}
s.retryCtr++
s.nextRetry = time.Now().Add(nextRetryDuration(s.retryCtr))
aggregationStateRetries.Inc()
Expand Down
Loading

0 comments on commit fd94f23

Please sign in to comment.