Skip to content

Commit

Permalink
Merge branch 'master' into update_enrtrees
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Sep 27, 2023
2 parents dbfb7e3 + 388f56b commit 610912e
Show file tree
Hide file tree
Showing 14 changed files with 339 additions and 120 deletions.
6 changes: 6 additions & 0 deletions cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ var (
Destination: &options.MaxPeerConnections,
EnvVars: []string{"WAKUNODE2_MAX_CONNECTIONS"},
})
PeerStoreCapacity = altsrc.NewIntFlag(&cli.IntFlag{
Name: "peer-store-capacity",
Usage: "Maximum stored peers in the peerstore.",
Destination: &options.PeerStoreCapacity,
EnvVars: []string{"WAKUNODE2_PEERSTORE_CAPACITY"},
})
WebsocketSupport = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "websocket-support",
Aliases: []string{"ws"},
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func main() {
TcpPort,
Address,
MaxPeerConnections,
PeerStoreCapacity,
WebsocketSupport,
WebsocketPort,
WebsocketSecurePort,
Expand Down
3 changes: 2 additions & 1 deletion cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func Execute(options NodeOptions) {
node.WithKeepAlive(options.KeepAlive),
node.WithMaxPeerConnections(options.MaxPeerConnections),
node.WithPrometheusRegisterer(prometheus.DefaultRegisterer),
node.WithPeerStoreCapacity(options.PeerStoreCapacity),
}
if len(options.AdvertiseAddresses) != 0 {
nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...))
Expand Down Expand Up @@ -322,7 +323,7 @@ func Execute(options NodeOptions) {
}

for _, d := range discoveredNodes {
wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DNSDiscovery, nil)
wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DNSDiscovery, nil, true)
}

//For now assuming that static peers added support/listen on all topics specified via commandLine.
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type NodeOptions struct {
UserAgent string
PProf bool
MaxPeerConnections int
PeerStoreCapacity int

PeerExchange PeerExchangeOptions
Websocket WSOptions
Expand Down
6 changes: 3 additions & 3 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
}

//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log)
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, w.log)

w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log)
if err != nil {
Expand Down Expand Up @@ -701,7 +701,7 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics
}

// AddDiscoveredPeer to add a discovered peer to the node peerStore
func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string) {
func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, connectNow bool) {
p := peermanager.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
Expand All @@ -710,7 +710,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp
},
PubSubTopics: pubsubTopics,
}
w.peermanager.AddDiscoveredPeer(p)
w.peermanager.AddDiscoveredPeer(p, connectNow)
}

// DialPeerWithMultiAddress is used to connect to a peer using a multiaddress
Expand Down
8 changes: 8 additions & 0 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type WakuNodeParameters struct {
rendezvousDB *rendezvous.DB

maxPeerConnections int
peerStoreCapacity int

enableDiscV5 bool
udpPort uint
Expand Down Expand Up @@ -356,6 +357,13 @@ func WithMaxPeerConnections(maxPeers int) WakuNodeOption {
}
}

func WithPeerStoreCapacity(capacity int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.peerStoreCapacity = capacity
return nil
}
}

// WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery
func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool) WakuNodeOption {
return func(params *WakuNodeParameters) error {
Expand Down
31 changes: 11 additions & 20 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
"github.com/waku-org/go-waku/logging"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"

"go.uber.org/zap"

Expand Down Expand Up @@ -113,8 +114,14 @@ func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) {
if !ok {
return
}
c.pm.AddDiscoveredPeer(p)
c.PushToChan(p)
triggerImmediateConnection := false
//Not connecting to peer as soon as it is discovered,
// rather expecting this to be pushed from PeerManager based on the need.
if len(c.host.Network().Peers()) < waku_proto.GossipSubOptimalFullMeshSize {
triggerImmediateConnection = true
}
c.pm.AddDiscoveredPeer(p, triggerImmediateConnection)

case <-time.After(1 * time.Second):
// This timeout is to not lock the goroutine
break
Expand All @@ -137,8 +144,8 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error {

}
func (c *PeerConnectionStrategy) start() error {
c.WaitGroup().Add(2)
go c.shouldDialPeers()
c.WaitGroup().Add(1)

go c.dialPeers()

c.consumeSubscriptions()
Expand All @@ -155,22 +162,6 @@ func (c *PeerConnectionStrategy) isPaused() bool {
return c.paused.Load()
}

func (c *PeerConnectionStrategy) shouldDialPeers() {
defer c.WaitGroup().Done()

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.Context().Done():
return
case <-ticker.C:
_, outRelayPeers := c.pm.getRelayPeers()
c.paused.Store(outRelayPeers.Len() >= c.pm.OutRelayPeersTarget) // pause if no of OutPeers more than or eq to target
}
}
}

// it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set.
func (c *PeerConnectionStrategy) consumeSubscriptions() {
for _, subs := range c.subscriptions {
Expand Down
Loading

0 comments on commit 610912e

Please sign in to comment.