From d324234c8153119f25b0f432183396ff2c12da03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Tue, 26 Sep 2023 12:27:29 -0400 Subject: [PATCH 1/2] fix(filter2): add requestID to pings and remove unneeded log (#776) --- waku/v2/protocol/filter/client.go | 2 +- waku/v2/utils/peer.go | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 503f186ad..916dea5a2 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -359,7 +359,7 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error { return wf.request( ctx, - &FilterSubscribeParameters{selectedPeer: peerID}, + &FilterSubscribeParameters{selectedPeer: peerID, requestID: protocol.GenerateRequestID()}, pb.FilterSubscribeRequest_SUBSCRIBER_PING, ContentFilter{}) } diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 4b9d176fd..e2173b307 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -12,7 +12,6 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/multiformats/go-multiaddr" - "github.com/waku-org/go-waku/logging" "go.uber.org/zap" ) @@ -62,7 +61,6 @@ func SelectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) { if len(peers) >= 1 { peerID := peers[rand.Intn(len(peers))] // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned - log.Info("Got random peer from peerstore", logging.HostID("peer", peerID)) return peerID, nil // nolint: gosec } From 388f56b43f35e6a8560eb077852710d06979a358 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 27 Sep 2023 12:16:37 +0530 Subject: [PATCH 2/2] feat: Sharded peer management - Relay (#764) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: connect/disconnect to peers based on node topic sub/unsub * feat: maintain healty relay connections per pubSubTopic Co-authored-by: richΛrd * chore: add config to limit peerstore capacity (#770) --- cmd/waku/flags.go | 6 + cmd/waku/main.go | 1 + cmd/waku/node.go | 3 +- cmd/waku/options.go | 1 + waku/v2/node/wakunode2.go | 6 +- waku/v2/node/wakuoptions.go | 8 + waku/v2/peermanager/peer_connector.go | 31 ++- waku/v2/peermanager/peer_manager.go | 210 +++++++++++++-------- waku/v2/peermanager/peer_manager_test.go | 2 +- waku/v2/peermanager/topic_event_handler.go | 166 ++++++++++++++++ waku/v2/protocol/relay/waku_relay.go | 19 +- waku/v2/protocol/utils.go | 2 + 12 files changed, 338 insertions(+), 117 deletions(-) create mode 100644 waku/v2/peermanager/topic_event_handler.go diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 99a4ad744..e60a498d6 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -32,6 +32,12 @@ var ( Destination: &options.MaxPeerConnections, EnvVars: []string{"WAKUNODE2_MAX_CONNECTIONS"}, }) + PeerStoreCapacity = altsrc.NewIntFlag(&cli.IntFlag{ + Name: "peer-store-capacity", + Usage: "Maximum stored peers in the peerstore.", + Destination: &options.PeerStoreCapacity, + EnvVars: []string{"WAKUNODE2_PEERSTORE_CAPACITY"}, + }) WebsocketSupport = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "websocket-support", Aliases: []string{"ws"}, diff --git a/cmd/waku/main.go b/cmd/waku/main.go index 9c6097b52..c260fb9bf 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -22,6 +22,7 @@ func main() { TcpPort, Address, MaxPeerConnections, + PeerStoreCapacity, WebsocketSupport, WebsocketPort, WebsocketSecurePort, diff --git a/cmd/waku/node.go b/cmd/waku/node.go index d77a29b2b..e0d0fbf63 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -129,6 +129,7 @@ func Execute(options NodeOptions) { node.WithKeepAlive(options.KeepAlive), node.WithMaxPeerConnections(options.MaxPeerConnections), node.WithPrometheusRegisterer(prometheus.DefaultRegisterer), + node.WithPeerStoreCapacity(options.PeerStoreCapacity), } if len(options.AdvertiseAddresses) != 0 { nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...)) @@ -322,7 +323,7 @@ func Execute(options NodeOptions) { } for _, d := range discoveredNodes { - wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DNSDiscovery, nil) + wakuNode.AddDiscoveredPeer(d.PeerID, d.PeerInfo.Addrs, wakupeerstore.DNSDiscovery, nil, true) } //For now assuming that static peers added support/listen on all topics specified via commandLine. diff --git a/cmd/waku/options.go b/cmd/waku/options.go index f27fd4ab5..e1bd026f3 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -169,6 +169,7 @@ type NodeOptions struct { UserAgent string PProf bool MaxPeerConnections int + PeerStoreCapacity int PeerExchange PeerExchangeOptions Websocket WSOptions diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 280720be1..1a3b18c21 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -253,7 +253,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } //Initialize peer manager. - w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log) + w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, w.log) w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log) if err != nil { @@ -701,7 +701,7 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics } // AddDiscoveredPeer to add a discovered peer to the node peerStore -func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string) { +func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, connectNow bool) { p := peermanager.PeerData{ Origin: origin, AddrInfo: peer.AddrInfo{ @@ -710,7 +710,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp }, PubSubTopics: pubsubTopics, } - w.peermanager.AddDiscoveredPeer(p) + w.peermanager.AddDiscoveredPeer(p, connectNow) } // DialPeerWithMultiAddress is used to connect to a peer using a multiaddress diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 559fd858e..2f85d9c10 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -88,6 +88,7 @@ type WakuNodeParameters struct { rendezvousDB *rendezvous.DB maxPeerConnections int + peerStoreCapacity int enableDiscV5 bool udpPort uint @@ -356,6 +357,13 @@ func WithMaxPeerConnections(maxPeers int) WakuNodeOption { } } +func WithPeerStoreCapacity(capacity int) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.peerStoreCapacity = capacity + return nil + } +} + // WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool) WakuNodeOption { return func(params *WakuNodeParameters) error { diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 33c4c8ae8..120a112f6 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -17,6 +17,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" @@ -113,8 +114,14 @@ func (c *PeerConnectionStrategy) consumeSubscription(ch <-chan PeerData) { if !ok { return } - c.pm.AddDiscoveredPeer(p) - c.PushToChan(p) + triggerImmediateConnection := false + //Not connecting to peer as soon as it is discovered, + // rather expecting this to be pushed from PeerManager based on the need. + if len(c.host.Network().Peers()) < waku_proto.GossipSubOptimalFullMeshSize { + triggerImmediateConnection = true + } + c.pm.AddDiscoveredPeer(p, triggerImmediateConnection) + case <-time.After(1 * time.Second): // This timeout is to not lock the goroutine break @@ -137,8 +144,8 @@ func (c *PeerConnectionStrategy) Start(ctx context.Context) error { } func (c *PeerConnectionStrategy) start() error { - c.WaitGroup().Add(2) - go c.shouldDialPeers() + c.WaitGroup().Add(1) + go c.dialPeers() c.consumeSubscriptions() @@ -155,22 +162,6 @@ func (c *PeerConnectionStrategy) isPaused() bool { return c.paused.Load() } -func (c *PeerConnectionStrategy) shouldDialPeers() { - defer c.WaitGroup().Done() - - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - for { - select { - case <-c.Context().Done(): - return - case <-ticker.C: - _, outRelayPeers := c.pm.getRelayPeers() - c.paused.Store(outRelayPeers.Len() >= c.pm.OutRelayPeersTarget) // pause if no of OutPeers more than or eq to target - } - } -} - // it might happen Subscribe is called before peerConnector has started so store these subscriptions in subscriptions array and custom after c.cancel is set. func (c *PeerConnectionStrategy) consumeSubscriptions() { for _, subs := range c.subscriptions { diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index be5b39203..042d3de99 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -2,8 +2,11 @@ package peermanager import ( "context" + "errors" + "sync" "time" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -13,6 +16,7 @@ import ( ma "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" @@ -20,9 +24,15 @@ import ( "go.uber.org/zap" ) +// NodeTopicDetails stores pubSubTopic related data like topicHandle for the node. +type NodeTopicDetails struct { + topic *pubsub.Topic +} + // PeerManager applies various controls and manage connections towards peers. type PeerManager struct { peerConnector *PeerConnectionStrategy + maxPeers int maxRelayPeers int logger *zap.Logger InRelayPeersTarget int @@ -31,9 +41,12 @@ type PeerManager struct { serviceSlots *ServiceSlots ctx context.Context sub event.Subscription + topicMutex sync.RWMutex + subRelayTopics map[string]*NodeTopicDetails } const peerConnectivityLoopSecs = 15 +const maxConnsToPeerRatio = 5 // 80% relay peers 20% service peers func relayAndServicePeers(maxConnections int) (int, int) { @@ -52,22 +65,29 @@ func inAndOutRelayPeers(relayPeers int) (int, int) { } // NewPeerManager creates a new peerManager instance. -func NewPeerManager(maxConnections int, logger *zap.Logger) *PeerManager { +func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerManager { maxRelayPeers, _ := relayAndServicePeers(maxConnections) inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers) + if maxPeers == 0 || maxConnections > maxPeers { + maxPeers = maxConnsToPeerRatio * maxConnections + } + pm := &PeerManager{ logger: logger.Named("peer-manager"), maxRelayPeers: maxRelayPeers, InRelayPeersTarget: inRelayPeersTarget, OutRelayPeersTarget: outRelayPeersTarget, serviceSlots: NewServiceSlot(), + subRelayTopics: make(map[string]*NodeTopicDetails), + maxPeers: maxPeers, } logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), zap.Int("maxRelayPeers", maxRelayPeers), zap.Int("outRelayPeersTarget", outRelayPeersTarget), - zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget)) + zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget), + zap.Int("maxPeers", maxPeers)) return pm } @@ -82,44 +102,6 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) { pm.peerConnector = pc } -func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error { - var err error - pm.sub, err = bus.Subscribe(new(relay.EvtPeerTopic)) - if err != nil { - return err - } - return nil -} - -func (pm *PeerManager) peerEventLoop(ctx context.Context) { - defer pm.sub.Close() - for { - select { - case e := <-pm.sub.Out(): - peerEvt := e.(relay.EvtPeerTopic) - wps := pm.host.Peerstore().(*wps.WakuPeerstoreImpl) - peerID := peerEvt.PeerID - if peerEvt.State == relay.PEER_JOINED { - err := wps.AddPubSubTopic(peerID, peerEvt.Topic) - if err != nil { - pm.logger.Error("failed to add pubSubTopic for peer", - logging.HostID("peerID", peerID), zap.String("topic", peerEvt.Topic), zap.Error(err)) - } - } else if peerEvt.State == relay.PEER_LEFT { - err := wps.RemovePubSubTopic(peerID, peerEvt.Topic) - if err != nil { - pm.logger.Error("failed to remove pubSubTopic for peer", - logging.HostID("peerID", peerID), zap.Error(err)) - } - } else { - pm.logger.Error("unknown peer event received", zap.Int("eventState", int(peerEvt.State))) - } - case <-ctx.Done(): - return - } - } -} - // Start starts the processing to be done by peer manager. func (pm *PeerManager) Start(ctx context.Context) { pm.ctx = ctx @@ -131,6 +113,7 @@ func (pm *PeerManager) Start(ctx context.Context) { // This is a connectivity loop, which currently checks and prunes inbound connections. func (pm *PeerManager) connectivityLoop(ctx context.Context) { + pm.connectToRelayPeers() t := time.NewTicker(peerConnectivityLoopSecs * time.Second) defer t.Stop() for { @@ -144,10 +127,12 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) { } // GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction -func (pm *PeerManager) GroupPeersByDirection() (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) { - peers := pm.host.Network().Peers() +func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) { + if len(specificPeers) == 0 { + specificPeers = pm.host.Network().Peers() + } - for _, p := range peers { + for _, p := range specificPeers { direction, err := pm.host.Peerstore().(wps.WakuPeerstore).Direction(p) if err == nil { if direction == network.DirInbound { @@ -163,9 +148,11 @@ func (pm *PeerManager) GroupPeersByDirection() (inPeers peer.IDSlice, outPeers p return inPeers, outPeers, nil } -func (pm *PeerManager) getRelayPeers() (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) { +// getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers. +// If specifiedPeers is empty, it checks within all peers in peerStore. +func (pm *PeerManager) getRelayPeers(specificPeers []peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) { //Group peers by their connected direction inbound or outbound. - inPeers, outPeers, err := pm.GroupPeersByDirection() + inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers) if err != nil { return } @@ -182,57 +169,91 @@ func (pm *PeerManager) getRelayPeers() (inRelayPeers peer.IDSlice, outRelayPeers return } -func (pm *PeerManager) connectToRelayPeers() { +// ensureMinRelayConnsPerTopic makes sure there are min of D conns per pubsubTopic. +// If not it will look into peerStore to initiate more connections. +// If peerStore doesn't have enough peers, will wait for discv5 to find more and try in next cycle +func (pm *PeerManager) ensureMinRelayConnsPerTopic() { + pm.topicMutex.RLock() + defer pm.topicMutex.RUnlock() + for topicStr, topicInst := range pm.subRelayTopics { + curPeers := topicInst.topic.ListPeers() + curPeerLen := len(curPeers) + if curPeerLen < waku_proto.GossipSubOptimalFullMeshSize { + pm.logger.Info("Subscribed topic is unhealthy, initiating more connections to maintain health", + zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), + zap.Int("optimumPeers", waku_proto.GossipSubOptimalFullMeshSize)) + //Find not connected peers. + notConnectedPeers := pm.getNotConnectedPers(topicStr) + if notConnectedPeers.Len() == 0 { + //TODO: Trigger on-demand discovery for this topic. + continue + } + //Connect to eligible peers. + numPeersToConnect := waku_proto.GossipSubOptimalFullMeshSize - curPeerLen + if numPeersToConnect > notConnectedPeers.Len() { + numPeersToConnect = notConnectedPeers.Len() + } + pm.connectToPeers(notConnectedPeers[0:numPeersToConnect]) + } + } +} + +// connectToRelayPeers ensures minimum D connections are there for each pubSubTopic. +// If not, initiates connections to additional peers. +// It also checks for incoming relay connections and prunes once they cross inRelayTarget +func (pm *PeerManager) connectToRelayPeers() { //Check for out peer connections and connect to more peers. - inRelayPeers, outRelayPeers := pm.getRelayPeers() - pm.logger.Info("Number of Relay peers connected", zap.Int("inRelayPeers", inRelayPeers.Len()), - zap.Int("outRelayPeers", outRelayPeers.Len())) + pm.ensureMinRelayConnsPerTopic() + + inRelayPeers, outRelayPeers := pm.getRelayPeers(nil) + pm.logger.Info("number of relay peers connected", + zap.Int("in", inRelayPeers.Len()), + zap.Int("out", outRelayPeers.Len())) if inRelayPeers.Len() > 0 && inRelayPeers.Len() > pm.InRelayPeersTarget { pm.pruneInRelayConns(inRelayPeers) } - - if outRelayPeers.Len() > pm.OutRelayPeersTarget { - return - } - totalRelayPeers := inRelayPeers.Len() + outRelayPeers.Len() - // Establish additional connections connected peers are lesser than target. - //What if the not connected peers in peerstore are not relay peers??? - if totalRelayPeers < pm.maxRelayPeers { - //Find not connected peers. - notConnectedPeers := pm.getNotConnectedPers() - if notConnectedPeers.Len() == 0 { - return - } - //Connect to eligible peers. - numPeersToConnect := pm.maxRelayPeers - totalRelayPeers - - if numPeersToConnect > notConnectedPeers.Len() { - numPeersToConnect = notConnectedPeers.Len() - } - pm.connectToPeers(notConnectedPeers[0:numPeersToConnect]) - } //Else: Should we raise some sort of unhealthy event?? } -func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) PeerData { - return PeerData{ +// addrInfoToPeerData returns addressinfo for a peer +// If addresses are expired, it removes the peer from host peerStore and returns nil. +func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) *PeerData { + addrs := host.Peerstore().Addrs(peerID) + if len(addrs) == 0 { + //Addresses expired, remove peer from peerStore + host.Peerstore().RemovePeer(peerID) + return nil + } + return &PeerData{ Origin: origin, AddrInfo: peer.AddrInfo{ ID: peerID, - Addrs: host.Peerstore().Addrs(peerID), + Addrs: addrs, }, } } + +// connectToPeers connects to peers provided in the list if the addresses have not expired. func (pm *PeerManager) connectToPeers(peers peer.IDSlice) { for _, peerID := range peers { peerData := addrInfoToPeerData(wps.PeerManager, peerID, pm.host) - pm.peerConnector.PushToChan(peerData) + if peerData == nil { + continue + } + pm.peerConnector.PushToChan(*peerData) } } -func (pm *PeerManager) getNotConnectedPers() (notConnectedPeers peer.IDSlice) { - for _, peerID := range pm.host.Peerstore().Peers() { +// getNotConnectedPers returns peers for a pubSubTopic that are not connected. +func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeers peer.IDSlice) { + var peerList peer.IDSlice + if pubsubTopic == "" { + peerList = pm.host.Peerstore().Peers() + } else { + peerList = pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic) + } + for _, peerID := range peerList { if pm.host.Network().Connectedness(peerID) != network.Connected { notConnectedPeers = append(notConnectedPeers, peerID) } @@ -240,13 +261,15 @@ func (pm *PeerManager) getNotConnectedPers() (notConnectedPeers peer.IDSlice) { return } +// pruneInRelayConns prune any incoming relay connections crossing derived inrelayPeerTarget func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { //Start disconnecting peers, based on what? - //For now, just disconnect most recently connected peers + //For now no preference is used //TODO: Need to have more intelligent way of doing this, maybe peer scores. - pm.logger.Info("Number of in peer connections exceed targer relay peers, hence pruning", - zap.Int("inRelayPeers", inRelayPeers.Len()), zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget)) + //TODO: Keep optimalPeersRequired for a pubSubTopic in mind while pruning connections to peers. + pm.logger.Info("peer connections exceed target relay peers, hence pruning", + zap.Int("cnt", inRelayPeers.Len()), zap.Int("target", pm.InRelayPeersTarget)) for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ { p := inRelayPeers[pruningStartIndex] err := pm.host.Network().ClosePeer(p) @@ -262,7 +285,17 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { // AddDiscoveredPeer to add dynamically discovered peers. // Note that these peers will not be set in service-slots. // TODO: It maybe good to set in service-slots based on services supported in the ENR -func (pm *PeerManager) AddDiscoveredPeer(p PeerData) { +func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) { + //Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes. + if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { + return + } + //Check if the peer is already present, if so skip adding + _, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID) + if err == nil { + pm.logger.Debug("Found discovered peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID)) + return + } // Try to fetch shard info from ENR to arrive at pubSub topics. if len(p.PubSubTopics) == 0 && p.ENR != nil { shards, err := wenr.RelaySharding(p.ENR.Record()) @@ -278,7 +311,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData) { p.PubSubTopics = append(p.PubSubTopics, topicStr) } } else { - pm.logger.Info("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) + pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID)) } } } @@ -292,14 +325,25 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData) { logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String())) } } - + if connectNow { + pm.peerConnector.PushToChan(p) + } } // addPeer adds peer to only the peerStore. // It also sets additional metadata such as origin, ENR and supported protocols func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error { + if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { + return errors.New("peer store capacity reached") + } pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID)) - pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL) + if origin == wps.Static { + pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL) + } else { + //Need to re-evaluate the address expiry + // For now expiring them with default addressTTL which is an hour. + pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL) + } err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin) if err != nil { pm.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", ID)) diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index cef6598fa..259b92f46 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -31,7 +31,7 @@ func initTest(t *testing.T) (context.Context, *PeerManager, func()) { defer h1.Close() // host 1 is used by peer manager - pm := NewPeerManager(10, utils.Logger()) + pm := NewPeerManager(10, 20, utils.Logger()) pm.SetHost(h1) return ctx, pm, func() { diff --git a/waku/v2/peermanager/topic_event_handler.go b/waku/v2/peermanager/topic_event_handler.go new file mode 100644 index 000000000..9e66ec0fc --- /dev/null +++ b/waku/v2/peermanager/topic_event_handler.go @@ -0,0 +1,166 @@ +package peermanager + +import ( + "context" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/logging" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "go.uber.org/zap" +) + +func (pm *PeerManager) SubscribeToRelayEvtBus(bus event.Bus) error { + var err error + pm.sub, err = bus.Subscribe([]interface{}{new(relay.EvtPeerTopic), new(relay.EvtRelaySubscribed), new(relay.EvtRelayUnsubscribed)}) + return err +} + +func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topicInst *pubsub.Topic) { + pm.logger.Info("handleNewRelayTopicSubscription", zap.String("pubSubTopic", pubsubTopic)) + pm.topicMutex.Lock() + defer pm.topicMutex.Unlock() + + _, ok := pm.subRelayTopics[pubsubTopic] + if ok { + //Nothing to be done, as we are already subscribed to this topic. + return + } + pm.subRelayTopics[pubsubTopic] = &NodeTopicDetails{topicInst} + //Check how many relay peers we are connected to that subscribe to this topic, if less than D find peers in peerstore and connect. + //If no peers in peerStore, trigger discovery for this topic? + relevantPeersForPubSubTopic := pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic) + var notConnectedPeers peer.IDSlice + connectedPeers := 0 + for _, peer := range relevantPeersForPubSubTopic { + if pm.host.Network().Connectedness(peer) == network.Connected { + connectedPeers++ + } else { + notConnectedPeers = append(notConnectedPeers, peer) + } + } + + if connectedPeers >= waku_proto.GossipSubOptimalFullMeshSize { //TODO: Use a config rather than hard-coding. + // Should we use optimal number or define some sort of a config for the node to choose from? + // A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has + // or bandwidth it can support. + // Should we link this to bandwidth management somehow or just depend on some sort of config profile? + pm.logger.Info("Optimal required relay peers for new pubSubTopic are already connected ", zap.String("pubSubTopic", pubsubTopic), + zap.Int("connectedPeerCount", connectedPeers)) + return + } + triggerDiscovery := false + if notConnectedPeers.Len() > 0 { + numPeersToConnect := notConnectedPeers.Len() - connectedPeers + if numPeersToConnect < 0 { + numPeersToConnect = notConnectedPeers.Len() + } else if numPeersToConnect-connectedPeers > waku_proto.GossipSubOptimalFullMeshSize { + numPeersToConnect = waku_proto.GossipSubOptimalFullMeshSize - connectedPeers + } + if numPeersToConnect+connectedPeers < waku_proto.GossipSubOptimalFullMeshSize { + triggerDiscovery = true + } + //For now all peers are being given same priority, + // Later we may want to choose peers that have more shards in common over others. + pm.connectToPeers(notConnectedPeers[0:numPeersToConnect]) + } else { + triggerDiscovery = true + } + + if triggerDiscovery { + //TODO: Initiate on-demand discovery for this pubSubTopic. + // Use peer-exchange and rendevouz? + //Should we query discoverycache to find out if there are any more peers before triggering discovery? + return + } +} + +func (pm *PeerManager) handleNewRelayTopicUnSubscription(pubsubTopic string) { + pm.logger.Info("handleNewRelayTopicUnSubscription", zap.String("pubSubTopic", pubsubTopic)) + pm.topicMutex.Lock() + defer pm.topicMutex.Unlock() + _, ok := pm.subRelayTopics[pubsubTopic] + if !ok { + //Nothing to be done, as we are already unsubscribed from this topic. + return + } + delete(pm.subRelayTopics, pubsubTopic) + + //If there are peers only subscribed to this topic, disconnect them. + relevantPeersForPubSubTopic := pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic) + for _, peer := range relevantPeersForPubSubTopic { + if pm.host.Network().Connectedness(peer) == network.Connected { + peerTopics, err := pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PubSubTopics(peer) + if err != nil { + pm.logger.Error("Could not retrieve pubsub topics for peer", zap.Error(err), + logging.HostID("peerID", peer)) + continue + } + if len(peerTopics) == 1 && peerTopics[0] == pubsubTopic { + err := pm.host.Network().ClosePeer(peer) + if err != nil { + pm.logger.Warn("Failed to disconnect connection towards peer", + logging.HostID("peerID", peer)) + continue + } + pm.logger.Debug("Successfully disconnected connection towards peer", + logging.HostID("peerID", peer)) + } + } + } +} + +func (pm *PeerManager) handlerPeerTopicEvent(peerEvt relay.EvtPeerTopic) { + wps := pm.host.Peerstore().(*wps.WakuPeerstoreImpl) + peerID := peerEvt.PeerID + if peerEvt.State == relay.PEER_JOINED { + err := wps.AddPubSubTopic(peerID, peerEvt.PubsubTopic) + if err != nil { + pm.logger.Error("failed to add pubSubTopic for peer", + logging.HostID("peerID", peerID), zap.String("topic", peerEvt.PubsubTopic), zap.Error(err)) + } + } else if peerEvt.State == relay.PEER_LEFT { + err := wps.RemovePubSubTopic(peerID, peerEvt.PubsubTopic) + if err != nil { + pm.logger.Error("failed to remove pubSubTopic for peer", + logging.HostID("peerID", peerID), zap.Error(err)) + } + } else { + pm.logger.Error("unknown peer event received", zap.Int("eventState", int(peerEvt.State))) + } +} + +func (pm *PeerManager) peerEventLoop(ctx context.Context) { + defer pm.sub.Close() + for { + select { + case e := <-pm.sub.Out(): + switch e := e.(type) { + case relay.EvtPeerTopic: + { + peerEvt := (relay.EvtPeerTopic)(e) + pm.handlerPeerTopicEvent(peerEvt) + } + case relay.EvtRelaySubscribed: + { + eventDetails := (relay.EvtRelaySubscribed)(e) + pm.handleNewRelayTopicSubscription(eventDetails.Topic, eventDetails.TopicInst) + } + case relay.EvtRelayUnsubscribed: + { + eventDetails := (relay.EvtRelayUnsubscribed)(e) + pm.handleNewRelayTopicUnSubscription(eventDetails.Topic) + } + default: + pm.logger.Error("unsupported event type", zap.Any("eventType", e)) + } + + case <-ctx.Done(): + return + } + } +} diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 15ac5a95e..6d19afc74 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -71,7 +71,8 @@ type WakuRelay struct { // EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created type EvtRelaySubscribed struct { - Topic string + Topic string + TopicInst *pubsub.Topic } // EvtRelayUnsubscribed is an event emitted when a subscription to a pubsub topic is closed @@ -87,9 +88,9 @@ const ( ) type EvtPeerTopic struct { - Topic string - PeerID peer.ID - State PeerTopicState + PubsubTopic string + PeerID peer.ID + State PeerTopicState } func msgIDFn(pmsg *pubsub_pb.Message) string { @@ -115,11 +116,11 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou cfg.PruneBackoff = time.Minute cfg.UnsubscribeBackoff = 5 * time.Second cfg.GossipFactor = 0.25 - cfg.D = 6 + cfg.D = waku_proto.GossipSubOptimalFullMeshSize cfg.Dlo = 4 cfg.Dhi = 12 cfg.Dout = 3 - cfg.Dlazy = 6 + cfg.Dlazy = waku_proto.GossipSubOptimalFullMeshSize cfg.HeartbeatInterval = time.Second cfg.HistoryLength = 6 cfg.HistoryGossip = 3 @@ -331,7 +332,7 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro w.topicEvtHanders[topic] = evtHandler w.relaySubs[topic] = sub - err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic}) + err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic, pubSubTopic}) if err != nil { return nil, err } @@ -554,13 +555,13 @@ func (w *WakuRelay) topicEventPoll(topic string, handler *pubsub.TopicEventHandl } if evt.Type == pubsub.PeerJoin { w.log.Debug("received a PeerJoin event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer)) - err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{Topic: topic, PeerID: evt.Peer, State: PEER_JOINED}) + err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_JOINED}) if err != nil { w.log.Error("failed to emit PeerJoin", zap.String("topic", topic), zap.Error(err)) } } else if evt.Type == pubsub.PeerLeave { w.log.Debug("received a PeerLeave event", zap.String("topic", topic), logging.HostID("peerID", evt.Peer)) - err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{Topic: topic, PeerID: evt.Peer, State: PEER_LEFT}) + err = w.emitters.EvtPeerTopic.Emit(EvtPeerTopic{PubsubTopic: topic, PeerID: evt.Peer, State: PEER_LEFT}) if err != nil { w.log.Error("failed to emit PeerLeave", zap.String("topic", topic), zap.Error(err)) } diff --git a/waku/v2/protocol/utils.go b/waku/v2/protocol/utils.go index aa65c9aef..eaf820936 100644 --- a/waku/v2/protocol/utils.go +++ b/waku/v2/protocol/utils.go @@ -6,6 +6,8 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" ) +const GossipSubOptimalFullMeshSize = 6 + // FulltextMatch is the default matching function used for checking if a peer // supports a protocol or not func FulltextMatch(expectedProtocol string) func(string) bool {