diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index b27123a6f7..540e1c9efb 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -11,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" ) var log = logging.Logger("share/discovery") @@ -21,6 +22,10 @@ const ( // so ConnManager will not break a connection with them. peerWeight = 1000 topic = "full" + + // eventbusBufSize is the size of the buffered channel to handle + // events in libp2p + eventbusBufSize = 32 ) // waitF calculates time to restart announcing. @@ -116,8 +121,11 @@ func (d *Discovery) ensurePeers(ctx context.Context) { log.Warn("peers limit is set to 0. Skipping discovery...") return } - // subscribe on Event Bus in order to catch disconnected peers and restart the discovery - sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) + // subscribe on EventBus in order to catch disconnected peers and restart + // the discovery. We specify a larger buffer size for the channel where + // EvtPeerConnectednessChanged events are sent (by default it is 16, we + // specify 32) to avoid any blocks on writing to the full channel. + sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize)) if err != nil { log.Error(err) return @@ -131,9 +139,40 @@ func (d *Discovery) ensurePeers(ctx context.Context) { log.Error(err) } }() + + // starting to listen to subscriptions async will help us to avoid any blocking + // in the case when we will not have the needed amount of FNs and will be blocked in `FindPeers`. + go func() { + select { + case <-ctx.Done(): + log.Debug("Context canceled. Finish listening for connectedness events.") + return + case e, ok := <-sub.Out(): + if !ok { + log.Debug("Subscription for connectedness events is closed.") + return + } + // listen to disconnect event to remove peer from set and reset backoff time + // reset timer in order to restart the discovery, once stored peer is disconnected + connStatus := e.(event.EvtPeerConnectednessChanged) + if connStatus.Connectedness == network.NotConnected { + if d.set.Contains(connStatus.Peer) { + log.Debugw("removing the peer from the peer set", + "peer", connStatus.Peer, "status", connStatus.Connectedness.String()) + d.connector.RestartBackoff(connStatus.Peer) + d.set.Remove(connStatus.Peer) + d.onUpdatedPeers(connStatus.Peer, false) + d.host.ConnManager().UntagPeer(connStatus.Peer, topic) + t.Reset(d.discoveryInterval) + } + } + } + }() + for { select { case <-ctx.Done(): + log.Info("Context canceled. Finishing peer discovery") return case <-t.C: if uint(d.set.Size()) == d.peersLimit { @@ -149,19 +188,6 @@ func (d *Discovery) ensurePeers(ctx context.Context) { for p := range peers { go d.handlePeerFound(ctx, topic, p) } - case e := <-sub.Out(): - // listen to disconnect event to remove peer from set and reset backoff time - // reset timer in order to restart the discovery, once stored peer is disconnected - connStatus := e.(event.EvtPeerConnectednessChanged) - if connStatus.Connectedness == network.NotConnected { - if d.set.Contains(connStatus.Peer) { - d.connector.RestartBackoff(connStatus.Peer) - d.set.Remove(connStatus.Peer) - d.onUpdatedPeers(connStatus.Peer, false) - d.host.ConnManager().UntagPeer(connStatus.Peer, topic) - t.Reset(d.discoveryInterval) - } - } } } }