Skip to content

Commit

Permalink
Node: Observation batching
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Aug 7, 2024
1 parent c2496cd commit 601ffc8
Show file tree
Hide file tree
Showing 14 changed files with 745 additions and 268 deletions.
8 changes: 8 additions & 0 deletions node/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ const (
// One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s
inboundObservationBufferSize = 10000

// inboundBatchObservationBufferSize configures the size of the batchObsvC channel that contains batches of observations from other Guardians.
// Since a batch contains many observations, the guardians should not be publishing too many of these, so we can keep the channel small. With
// 19 guardians, we would expect 19 messages per second during normal operations. This gives us plenty of extra room.
inboundBatchObservationBufferSize = 100

// inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians.
// One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't.
// So in the worst case the entire queue can be processed in 2s.
Expand Down Expand Up @@ -73,6 +78,8 @@ type G struct {
gossipSendC chan []byte
// Inbound observations. This is read/write because the processor also writes to it as a fast-path when handling locally made observations.
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// Inbound observation batches.
batchObsvC channelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]]
// Finalized guardian observations aggregated across all chains
msgC channelPair[*common.MessagePublication]
// Ethereum incoming guardian set updates
Expand Down Expand Up @@ -111,6 +118,7 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
// Setup various channels...
g.gossipSendC = make(chan []byte, gossipSendBufferSize)
g.obsvC = make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], inboundObservationBufferSize)
g.batchObsvC = makeChannelPair[*common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]](inboundBatchObservationBufferSize)
g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
Expand Down
2 changes: 2 additions & 0 deletions node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId, bootstrapPeers,
nodeName,
g.gk,
g.obsvC,
g.batchObsvC.writeC,
g.signedInC.writeC,
g.obsvReqC.writeC,
g.gossipSendC,
Expand Down Expand Up @@ -566,6 +567,7 @@ func GuardianOptionProcessor() *GuardianOption {
g.setC.readC,
g.gossipSendC,
g.obsvC,
g.batchObsvC.readC,
g.obsvReqSendC.writeC,
g.signedInC.readC,
g.gk,
Expand Down
25 changes: 18 additions & 7 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ const P2P_SUBSCRIPTION_BUFFER_SIZE = 1024
// TESTNET_BOOTSTRAP_DHI configures how many nodes may connect to the testnet bootstrap node. This number should not exceed HighWaterMark.
const TESTNET_BOOTSTRAP_DHI = 350

// MaxObservationBatchSize is the maximum number of observations that will fit in a single `SignedObservationBatch` message.
const MaxObservationBatchSize = 4000

// MaxObservationBatchDelay is the longest we will wait before publishing any queued up observations.
const MaxObservationBatchDelay = time.Second

var (
p2pHeartbeatsSent = promauto.NewCounter(
prometheus.CounterOpts{
Expand All @@ -74,11 +80,6 @@ var (
Name: "wormhole_p2p_drops",
Help: "Total number of messages that were dropped by libp2p",
})
p2pReject = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_p2p_rejects",
Help: "Total number of messages rejected by libp2p",
})
)

var heartbeatMessagePrefix = []byte("heartbeat|")
Expand Down Expand Up @@ -172,8 +173,6 @@ func (*traceHandler) Trace(evt *libp2ppb.TraceEvent) {
if evt.Type != nil {
if *evt.Type == libp2ppb.TraceEvent_DROP_RPC {
p2pDrop.Inc()
} else if *evt.Type == libp2ppb.TraceEvent_REJECT_MESSAGE {
p2pReject.Inc()
}
}
}
Expand Down Expand Up @@ -302,6 +301,7 @@ func Run(params *RunParams) func(ctx context.Context) error {

return func(ctx context.Context) error {
p2pReceiveChannelOverflow.WithLabelValues("observation").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("signed_vaa_with_quorum").Add(0)
p2pReceiveChannelOverflow.WithLabelValues("signed_observation_request").Add(0)

Expand Down Expand Up @@ -690,6 +690,17 @@ func Run(params *RunParams) func(ctx context.Context) error {
p2pReceiveChannelOverflow.WithLabelValues("observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedObservationBatch:
if params.batchObsvC != nil {
if err := common.PostMsgWithTimestamp[gossipv1.SignedObservationBatch](m.SignedObservationBatch, params.batchObsvC); err == nil {
p2pMessagesReceived.WithLabelValues("batch_observation").Inc()
} else {
if params.components.WarnChannelOverflow {
logger.Warn("Ignoring SignedObservationBatch because batchObsvC is full", zap.String("addr", hex.EncodeToString(m.SignedObservationBatch.Addr)))
}
p2pReceiveChannelOverflow.WithLabelValues("batch_observation").Inc()
}
}
case *gossipv1.GossipMessage_SignedVaaWithQuorum:
if params.signedInC != nil {
select {
Expand Down
23 changes: 18 additions & 5 deletions node/pkg/p2p/run_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type (
// obsvC is optional and can be set with `WithSignedObservationListener`.
obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]

// batchObsvC is optional and can be set with `WithSignedObservationBatchListener`.
batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]

// obsvReqC is optional and can be set with `WithObservationRequestListener`.
obsvReqC chan<- *gossipv1.ObservationRequest

Expand Down Expand Up @@ -95,39 +98,47 @@ func NewRunParams(
return p, nil
}

// WithSignedObservationListener is used to set the channel to receive `SignedObservation messages.
// WithSignedObservationListener is used to set the channel to receive `SignedObservation` messages.
func WithSignedObservationListener(obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation]) RunOpt {
return func(p *RunParams) error {
p.obsvC = obsvC
return nil
}
}

// WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum messages.
// WithSignedObservationBatchListener is used to set the channel to receive `SignedObservationBatch` messages.
func WithSignedObservationBatchListener(batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]) RunOpt {
return func(p *RunParams) error {
p.batchObsvC = batchObsvC
return nil
}
}

// WithSignedVAAListener is used to set the channel to receive `SignedVAAWithQuorum` messages.
func WithSignedVAAListener(signedInC chan<- *gossipv1.SignedVAAWithQuorum) RunOpt {
return func(p *RunParams) error {
p.signedInC = signedInC
return nil
}
}

// WithObservationRequestListener is used to set the channel to receive `ObservationRequest messages.
// WithObservationRequestListener is used to set the channel to receive `ObservationRequest` messages.
func WithObservationRequestListener(obsvReqC chan<- *gossipv1.ObservationRequest) RunOpt {
return func(p *RunParams) error {
p.obsvReqC = obsvReqC
return nil
}
}

// WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig messages.
// WithChainGovernorConfigListener is used to set the channel to receive `SignedChainGovernorConfig` messages.
func WithChainGovernorConfigListener(signedGovCfg chan *gossipv1.SignedChainGovernorConfig) RunOpt {
return func(p *RunParams) error {
p.signedGovCfg = signedGovCfg
return nil
}
}

// WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus messages.
// WithChainGovernorStatusListener is used to set the channel to receive `SignedChainGovernorStatus` messages.
func WithChainGovernorStatusListener(signedGovSt chan *gossipv1.SignedChainGovernorStatus) RunOpt {
return func(p *RunParams) error {
p.signedGovSt = signedGovSt
Expand All @@ -148,6 +159,7 @@ func WithGuardianOptions(
nodeName string,
gk *ecdsa.PrivateKey,
obsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation],
batchObsvC chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch],
signedInC chan<- *gossipv1.SignedVAAWithQuorum,
obsvReqC chan<- *gossipv1.ObservationRequest,
gossipSendC chan []byte,
Expand All @@ -169,6 +181,7 @@ func WithGuardianOptions(
p.nodeName = nodeName
p.gk = gk
p.obsvC = obsvC
p.batchObsvC = batchObsvC
p.signedInC = signedInC
p.obsvReqC = obsvReqC
p.gossipSendC = gossipSendC
Expand Down
2 changes: 2 additions & 0 deletions node/pkg/p2p/run_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
require.NotNil(t, gk)

obsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservation], 42)
batchObsvC := make(chan<- *common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], 42)
signedInC := make(chan<- *gossipv1.SignedVAAWithQuorum, 42)
obsvReqC := make(chan<- *gossipv1.ObservationRequest, 42)
gossipSendC := make(chan []byte, 42)
Expand Down Expand Up @@ -170,6 +171,7 @@ func TestRunParamsWithGuardianOptions(t *testing.T) {
nodeName,
gk,
obsvC,
batchObsvC,
signedInC,
obsvReqC,
gossipSendC,
Expand Down
3 changes: 3 additions & 0 deletions node/pkg/p2p/watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const LOCAL_P2P_PORTRANGE_START = 11000
type G struct {
// arguments passed to p2p.New
obsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]
batchObsvC chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch]
obsvReqC chan *gossipv1.ObservationRequest
obsvReqSendC chan *gossipv1.ObservationRequest
sendC chan []byte
Expand Down Expand Up @@ -65,6 +66,7 @@ func NewG(t *testing.T, nodeName string) *G {

g := &G{
obsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservation], cs),
batchObsvC: make(chan *node_common.MsgWithTimeStamp[gossipv1.SignedObservationBatch], cs),
obsvReqC: make(chan *gossipv1.ObservationRequest, cs),
obsvReqSendC: make(chan *gossipv1.ObservationRequest, cs),
sendC: make(chan []byte, cs),
Expand Down Expand Up @@ -176,6 +178,7 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
g.nodeName,
g.gk,
g.obsvC,
g.batchObsvC,
g.signedInC,
g.obsvReqC,
g.sendC,
Expand Down
71 changes: 71 additions & 0 deletions node/pkg/processor/batch_obs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package processor

import (
"context"
"errors"
"fmt"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"google.golang.org/protobuf/proto"
)

// postObservationToBatch posts an individual observation to the batch processor.
func (p *Processor) postObservationToBatch(obs *gossipv1.Observation) {
// TODO: This will be enabled as part of the gossip split PR.
// select {
// case p.batchObsvPubC <- obs:
// default:
// batchObservationChannelOverflow.WithLabelValues("batchObsvPub").Inc()
// }
}

// batchProcessor is the entry point for the batch processor, which is responsible for taking individual
// observations and publishing them as batches. It limits the size of a batch and the delay before publishing.
func (p *Processor) batchProcessor(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
default:
if err := p.handleBatch(ctx); err != nil {
return err
}
}
}
}

// handleBatch reads observations from the channel, either until a timeout occurs or the batch is full.
// Then it builds a `SendObservationBatch` gossip message and posts it to p2p.
func (p *Processor) handleBatch(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, p2p.MaxObservationBatchDelay)
defer cancel()

observations, err := common.ReadFromChannelWithTimeout(ctx, p.batchObsvPubC, p2p.MaxObservationBatchSize)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return fmt.Errorf("failed to read observations from the internal observation batch channel: %w", err)
}

if len(observations) == 0 {
return nil
}

batchMsg := gossipv1.SignedObservationBatch{
Addr: p.ourAddr.Bytes(),
Observations: observations,
}

gossipMsg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedObservationBatch{SignedObservationBatch: &batchMsg}}
msg, err := proto.Marshal(&gossipMsg)
if err != nil {
panic(err)
}

select {
case p.gossipSendC <- msg:
default:
batchObservationChannelOverflow.WithLabelValues("gossipSend").Inc()
}
return nil
}
85 changes: 85 additions & 0 deletions node/pkg/processor/batch_obs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package processor

import (
"bytes"
"testing"
"time"

"github.com/certusone/wormhole/node/pkg/devnet"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"google.golang.org/protobuf/proto"
)

func getUniqueVAA(seqNo uint64) vaa.VAA {
var payload = []byte{97, 97, 97, 97, 97, 97}
var governanceEmitter = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4}

vaa := vaa.VAA{
Version: uint8(1),
GuardianSetIndex: uint32(1),
Signatures: nil,
Timestamp: time.Unix(0, 0),
Nonce: uint32(1),
Sequence: seqNo,
ConsistencyLevel: uint8(32),
EmitterChain: vaa.ChainIDSolana,
EmitterAddress: governanceEmitter,
Payload: payload,
}

return vaa
}

func TestMarshalSignedObservationBatch(t *testing.T) {
gk := devnet.InsecureDeterministicEcdsaKeyByIndex(crypto.S256(), uint64(0))
require.NotNil(t, gk)

NumObservations := uint64(p2p.MaxObservationBatchSize)
observations := make([]*gossipv1.Observation, 0, NumObservations)
for seqNo := uint64(1); seqNo <= NumObservations; seqNo++ {
vaa := getUniqueVAA(seqNo)
digest := vaa.SigningDigest()
sig, err := crypto.Sign(digest.Bytes(), gk)
require.NoError(t, err)

observations = append(observations, &gossipv1.Observation{
Hash: digest.Bytes(),
Signature: sig,
TxHash: ethcommon.HexToHash("0x06f541f5ecfc43407c31587aa6ac3a689e8960f36dc23c332db5510dfc6a4063").Bytes(),
MessageId: vaa.MessageID(),
})
}

obsBuf, err := proto.Marshal(observations[0])
require.NoError(t, err)
assert.Equal(t, 205, len(obsBuf))

batch := gossipv1.SignedObservationBatch{
Addr: crypto.PubkeyToAddress(gk.PublicKey).Bytes(),
Observations: observations,
}

buf, err := proto.Marshal((&batch))
require.NoError(t, err)
assert.Greater(t, pubsub.DefaultMaxMessageSize, len(buf))

var batch2 gossipv1.SignedObservationBatch
err = proto.Unmarshal(buf, &batch2)
require.NoError(t, err)

assert.True(t, bytes.Equal(batch.Addr, batch2.Addr))
assert.Equal(t, len(batch.Observations), len(batch2.Observations))
for idx := range batch2.Observations {
assert.True(t, bytes.Equal(batch.Observations[idx].Hash, batch2.Observations[idx].Hash))
assert.True(t, bytes.Equal(batch.Observations[idx].Signature, batch2.Observations[idx].Signature))
assert.True(t, bytes.Equal(batch.Observations[idx].TxHash, batch2.Observations[idx].TxHash))
assert.Equal(t, batch.Observations[idx].MessageId, batch2.Observations[idx].MessageId)
}
}
Loading

0 comments on commit 601ffc8

Please sign in to comment.