Skip to content

Commit

Permalink
Node metric and performance tweaks (#3967)
Browse files Browse the repository at this point in the history
* Node: Metric and performance tweaks

* Tweak observation metric

* pubsub StrictNoSign

* bigger observation metric buckets

* biggerer observation metric buckets

* Node: Add metric for libp2p drops

* Disable StrictNoSign for now

---------

Co-authored-by: Evan Gray <battledingo@gmail.com>
  • Loading branch information
bruce-riley and evan-gray authored Jun 6, 2024
1 parent d5fd72b commit ac7794b
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 44 deletions.
35 changes: 32 additions & 3 deletions node/pkg/common/guardianset.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/wormhole-foundation/wormhole/sdk/vaa"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -49,8 +50,30 @@ const MaxStateAge = 1 * time.Minute
type GuardianSet struct {
// Guardian's public key hashes truncated by the ETH standard hashing mechanism (20 bytes).
Keys []common.Address

// On-chain set index
Index uint32

// Quorum value for this set of keys
Quorum int

// A map from address to index. Testing showed that, on average, a map is almost three times faster than a sequential search of the key slice.
// Testing also showed that the map was twice as fast as using a sorted slice and `slices.BinarySearchFunc`. That being said, on a 4GHz CPU,
// the sequential search takes an average of 800 nanos and the map look up takes about 260 nanos. Is this worth doing?
keyMap map[common.Address]int
}

func NewGuardianSet(keys []common.Address, index uint32) *GuardianSet {
keyMap := map[common.Address]int{}
for idx, key := range keys {
keyMap[key] = idx
}
return &GuardianSet{
Keys: keys,
Index: index,
Quorum: vaa.CalculateQuorum(len(keys)),
keyMap: keyMap,
}
}

func (g *GuardianSet) KeysAsHexStrings() []string {
Expand All @@ -66,9 +89,15 @@ func (g *GuardianSet) KeysAsHexStrings() []string {
// KeyIndex returns a given address index from the guardian set. Returns (-1, false)
// if the address wasn't found and (addr, true) otherwise.
func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) {
for n, k := range g.Keys {
if k == addr {
return n, true
if g.keyMap != nil {
if idx, found := g.keyMap[addr]; found {
return idx, true
}
} else {
for n, k := range g.Keys {
if k == addr {
return n, true
}
}
}

Expand Down
38 changes: 34 additions & 4 deletions node/pkg/common/guardianset_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,42 @@
package common

import (
"reflect"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)

func TestNewGuardianSet(t *testing.T) {
keys := []common.Address{
common.HexToAddress("0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"),
common.HexToAddress("0x88D7D8B32a9105d228100E72dFFe2Fae0705D31c"),
common.HexToAddress("0x58076F561CC62A47087B567C86f986426dFCD000"),
common.HexToAddress("0xBd6e9833490F8fA87c733A183CD076a6cBD29074"),
common.HexToAddress("0xb853FCF0a5C78C1b56D15fCE7a154e6ebe9ED7a2"),
common.HexToAddress("0xAF3503dBD2E37518ab04D7CE78b630F98b15b78a"),
common.HexToAddress("0x785632deA5609064803B1c8EA8bB2c77a6004Bd1"),
common.HexToAddress("0x09a281a698C0F5BA31f158585B41F4f33659e54D"),
common.HexToAddress("0x3178443AB76a60E21690DBfB17f7F59F09Ae3Ea1"),
common.HexToAddress("0x647ec26ae49b14060660504f4DA1c2059E1C5Ab6"),
common.HexToAddress("0x810AC3D8E1258Bd2F004a94Ca0cd4c68Fc1C0611"),
common.HexToAddress("0x80610e96d645b12f47ae5cf4546b18538739e90F"),
common.HexToAddress("0x2edb0D8530E31A218E72B9480202AcBaeB06178d"),
common.HexToAddress("0xa78858e5e5c4705CdD4B668FFe3Be5bae4867c9D"),
common.HexToAddress("0x5Efe3A05Efc62D60e1D19fAeB56A80223CDd3472"),
common.HexToAddress("0xD791b7D32C05aBB1cc00b6381FA0c4928f0c56fC"),
common.HexToAddress("0x14Bc029B8809069093D712A3fd4DfAb31963597e"),
common.HexToAddress("0x246Ab29FC6EBeDf2D392a51ab2Dc5C59d0902A03"),
common.HexToAddress("0x132A84dFD920b35a3D0BA5f7A0635dF298F9033e"),
}
gs := NewGuardianSet(keys, 1)
assert.True(t, reflect.DeepEqual(keys, gs.Keys))
assert.Equal(t, uint32(1), gs.Index)
assert.Equal(t, vaa.CalculateQuorum(len(keys)), gs.Quorum)
}

func TestKeyIndex(t *testing.T) {
type test struct {
guardianSet GuardianSet
Expand All @@ -15,13 +45,13 @@ func TestKeyIndex(t *testing.T) {
keyIndex int
}

guardianSet := GuardianSet{
Keys: []common.Address{
guardianSet := *NewGuardianSet(
[]common.Address{
common.HexToAddress("0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaed"),
common.HexToAddress("0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaee"),
},
Index: 1,
}
1,
)

tests := []test{
{guardianSet: guardianSet, address: "0x5aaeb6053f3e94c9b9a09f33669435e7ef1beaed", result: true, keyIndex: 0},
Expand Down
16 changes: 8 additions & 8 deletions node/pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,10 +669,10 @@ func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) {
supervisor.Signal(ctx, supervisor.SignalHealthy)

// Inform them of the Guardian Set
commonGuardianSet := common.GuardianSet{
Keys: mockGuardianSetToGuardianAddrList(t, gs),
Index: guardianSetIndex,
}
commonGuardianSet := *common.NewGuardianSet(
mockGuardianSetToGuardianAddrList(t, gs),
guardianSetIndex,
)
for i, g := range gs {
logger.Info("Sending guardian set update", zap.Int("guardian_index", i))
g.MockSetC <- &commonGuardianSet
Expand Down Expand Up @@ -1183,10 +1183,10 @@ func runConsensusBenchmark(t *testing.B, name string, numGuardians int, numMessa
supervisor.Signal(ctx, supervisor.SignalHealthy)

// Inform them of the Guardian Set
commonGuardianSet := common.GuardianSet{
Keys: mockGuardianSetToGuardianAddrList(t, gs),
Index: guardianSetIndex,
}
commonGuardianSet := *common.NewGuardianSet(
mockGuardianSetToGuardianAddrList(t, gs),
guardianSetIndex,
)
for i, g := range gs {
logger.Info("Sending guardian set update", zap.Int("guardian_index", i))
g.MockSetC <- &commonGuardianSet
Expand Down
30 changes: 30 additions & 0 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2ppb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
Expand Down Expand Up @@ -71,6 +72,16 @@ var (
Name: "wormhole_p2p_receive_channel_overflow",
Help: "Total number of p2p received messages dropped due to channel overflow",
}, []string{"type"})
p2pDrop = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_p2p_drops",
Help: "Total number of messages that were dropped by libp2p",
})
p2pReject = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_p2p_rejects",
Help: "Total number of messages rejected by libp2p",
})
)

var heartbeatMessagePrefix = []byte("heartbeat|")
Expand Down Expand Up @@ -155,6 +166,21 @@ func DefaultConnectionManager() (*connmgr.BasicConnMgr, error) {
)
}

// traceHandler is used to intercept libp2p trace events so we can peg metrics.
type traceHandler struct {
}

// Trace is the interface to the libp2p trace handler. It pegs metrics as appropriate.
func (*traceHandler) Trace(evt *libp2ppb.TraceEvent) {
if evt.Type != nil {
if *evt.Type == libp2ppb.TraceEvent_DROP_RPC {
p2pDrop.Inc()
} else if *evt.Type == libp2ppb.TraceEvent_REJECT_MESSAGE {
p2pReject.Inc()
}
}
}

// BootstrapAddrs takes a comma-separated string of multi-address strings and returns an array of []peer.AddrInfo that does not include `self`.
// if `self` is part of `bootstrapPeers`, return isBootstrapNode=true
func BootstrapAddrs(logger *zap.Logger, bootstrapPeers string, self peer.ID) (bootstrappers []peer.AddrInfo, isBootstrapNode bool) {
Expand Down Expand Up @@ -342,9 +368,13 @@ func Run(
}

logger.Info("Subscribing pubsub topic", zap.String("topic", topic))
ourTracer := &traceHandler{}
ps, err := pubsub.NewGossipSub(ctx, h,
pubsub.WithValidateQueueSize(P2P_VALIDATE_QUEUE_SIZE),
pubsub.WithGossipSubParams(components.GossipParams),
pubsub.WithEventTracer(ourTracer),
// TODO: Investigate making this change. May need to use LaxSign until everyone has upgraded to that.
// pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
)
if err != nil {
panic(err)
Expand Down
23 changes: 19 additions & 4 deletions node/pkg/processor/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,23 @@ import (
)

var (
observationsBroadcastTotal = promauto.NewCounter(
observationsBroadcast = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_observations_broadcast_total",
Name: "wormhole_observations_queued_for_broadcast",
Help: "Total number of signed observations queued for broadcast",
})

observationsPostedInternally = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_observations_posted_internally",
Help: "Total number of our observations posted internally",
})

signedVAAsBroadcast = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_signed_vaas_queued_for_broadcast",
Help: "Total number of signed vaas queued for broadcast",
})
)

func (p *Processor) broadcastSignature(
Expand All @@ -45,9 +57,10 @@ func (p *Processor) broadcastSignature(
panic(err)
}

// Broadcast the observation.
p.gossipSendC <- msg
observationsBroadcast.Inc()

// Store our VAA in case we're going to submit it to Solana
hash := hex.EncodeToString(digest.Bytes())

if p.state.signatures[hash] == nil {
Expand Down Expand Up @@ -75,7 +88,7 @@ func (p *Processor) broadcastSignature(
go func() { p.obsvC <- om }()
}

observationsBroadcastTotal.Inc()
observationsPostedInternally.Inc()
}

func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
Expand All @@ -93,7 +106,9 @@ func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
panic(err)
}

// Broadcast the signed VAA.
p.gossipSendC <- msg
signedVAAsBroadcast.Inc()

if p.gatewayRelayer != nil {
p.gatewayRelayer.SubmitVAA(v)
Expand Down
11 changes: 5 additions & 6 deletions node/pkg/processor/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
}

hasSigs := len(s.signatures)
wantSigs := vaa.CalculateQuorum(len(gs.Keys))
quorum := hasSigs >= wantSigs
quorum := hasSigs >= gs.Quorum

var chain vaa.ChainID
if s.ourObservation != nil {
Expand All @@ -129,7 +128,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Int("required_sigs", gs.Quorum),
zap.Bool("quorum", quorum),
zap.Stringer("emitter_chain", chain),
)
Expand Down Expand Up @@ -220,6 +219,7 @@ func (p *Processor) handleCleanup(ctx context.Context) {
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.String("firstObserved", s.firstObserved.String()),
zap.Int("numSignatures", len(s.signatures)),
)
req := &gossipv1.ObservationRequest{
ChainId: uint32(s.ourObservation.GetEmitterChain()),
Expand All @@ -238,16 +238,15 @@ func (p *Processor) handleCleanup(ctx context.Context) {
// network reached consensus without us. We don't know the correct guardian
// set, so we simply use the most recent one.
hasSigs := len(s.signatures)
wantSigs := vaa.CalculateQuorum(len(p.gs.Keys))

if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("expiring unsubmitted nil observation",
zap.String("message_id", s.LoggingID()),
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Int("have_sigs", hasSigs),
zap.Int("required_sigs", wantSigs),
zap.Bool("quorum", hasSigs >= wantSigs),
zap.Int("required_sigs", p.gs.Quorum),
zap.Bool("quorum", hasSigs >= p.gs.Quorum),
)
}
delete(p.state.signatures, hash)
Expand Down
28 changes: 16 additions & 12 deletions node/pkg/processor/observation.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (

// signaturesToVaaFormat converts a map[common.Address][]byte (processor state format) to []*vaa.Signature (VAA format) given a set of keys gsKeys
// It also returns a bool array indicating which key in gsKeys had a signature
// The processor state format is used for effeciently storing signatures during aggregation while the VAA format is more efficient for on-chain verification.
// The processor state format is used for efficiently storing signatures during aggregation while the VAA format is more efficient for on-chain verification.
func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common.Address) ([]*vaa.Signature, []bool) {
// Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA.
agg := make([]bool, len(gsKeys))
Expand Down Expand Up @@ -80,6 +80,8 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
// Note that observations are never tied to the (verified) p2p identity key - the p2p network
// identity is completely decoupled from the guardian identity, p2p is just transport.

observationsReceivedTotal.Inc()

m := obs.Msg
hash := hex.EncodeToString(m.Hash)
s := p.state.signatures[hash]
Expand All @@ -99,8 +101,6 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
)
}

observationsReceivedTotal.Inc()

// Verify the Guardian's signature. This verifies that m.Signature matches m.Hash and recovers
// the public key that was used to sign the payload.
pk, err := crypto.Ecrecover(m.Hash, m.Signature)
Expand Down Expand Up @@ -213,17 +213,22 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW

s.signatures[their_addr] = m.Signature

if s.ourObservation != nil {
if s.submitted {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("already submitted, doing nothing",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
}
} else if s.ourObservation != nil {
// We have made this observation on chain!

quorum := vaa.CalculateQuorum(len(gs.Keys))

// Check if we have more signatures than required for quorum.
// s.signatures may contain signatures from multiple guardian sets during guardian set updates
// Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation,
// but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set.
// We will later check for quorum again after assembling the VAA for a particular guardian set.
if len(s.signatures) < quorum {
if len(s.signatures) < gs.Quorum {
// no quorum yet, we're done here
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("quorum not yet met",
Expand All @@ -245,18 +250,18 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
zap.Any("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index),
zap.Bools("aggregation", agg),
zap.Int("required_sigs", quorum),
zap.Int("required_sigs", gs.Quorum),
zap.Int("have_sigs", len(sigsVaaFormat)),
zap.Bool("quorum", len(sigsVaaFormat) >= quorum),
zap.Bool("quorum", len(sigsVaaFormat) >= gs.Quorum),
)
}

if len(sigsVaaFormat) >= quorum && !s.submitted {
if len(sigsVaaFormat) >= gs.Quorum {
// we have reached quorum *with the active guardian set*
s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p)
} else {
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / guardian
p.logger.Debug("quorum not met, doing nothing",
zap.String("messageId", m.MessageId),
zap.String("digest", hash),
)
Expand All @@ -269,7 +274,6 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
zap.String("digest", hash),
)
}

}

observationTotalDelay.Observe(float64(time.Since(obs.Timestamp).Microseconds()))
Expand Down
Loading

0 comments on commit ac7794b

Please sign in to comment.