Skip to content

Commit

Permalink
fix(share/discovery): decouple peer discovery from event processing (#…
Browse files Browse the repository at this point in the history
…1639)

## Overview
Starting to listen to subscriptions async will help us to avoid any
blocking in the case when we will not have the needed amount of FNs and
will be blocked in `FindPeers`.
## Checklist

- [x] New and updated code has appropriate documentation
- [x] New and updated code has new and/or updated testing
- [x] Required CI checks are passing
- [x] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords

---------

Co-authored-by: rene <41963722+renaynay@users.noreply.github.com>
  • Loading branch information
vgonkivs and renaynay committed Jan 31, 2023
1 parent 183e2b8 commit 5da1b7e
Showing 1 changed file with 41 additions and 15 deletions.
56 changes: 41 additions & 15 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
)

var log = logging.Logger("share/discovery")
Expand All @@ -21,6 +22,10 @@ const (
// so ConnManager will not break a connection with them.
peerWeight = 1000
topic = "full"

// eventbusBufSize is the size of the buffered channel to handle
// events in libp2p
eventbusBufSize = 32
)

// waitF calculates time to restart announcing.
Expand Down Expand Up @@ -116,8 +121,11 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
log.Warn("peers limit is set to 0. Skipping discovery...")
return
}
// subscribe on Event Bus in order to catch disconnected peers and restart the discovery
sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
// subscribe on EventBus in order to catch disconnected peers and restart
// the discovery. We specify a larger buffer size for the channel where
// EvtPeerConnectednessChanged events are sent (by default it is 16, we
// specify 32) to avoid any blocks on writing to the full channel.
sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize))
if err != nil {
log.Error(err)
return
Expand All @@ -131,9 +139,40 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
log.Error(err)
}
}()

// starting to listen to subscriptions async will help us to avoid any blocking
// in the case when we will not have the needed amount of FNs and will be blocked in `FindPeers`.
go func() {
select {
case <-ctx.Done():
log.Debug("Context canceled. Finish listening for connectedness events.")
return
case e, ok := <-sub.Out():
if !ok {
log.Debug("Subscription for connectedness events is closed.")
return
}
// listen to disconnect event to remove peer from set and reset backoff time
// reset timer in order to restart the discovery, once stored peer is disconnected
connStatus := e.(event.EvtPeerConnectednessChanged)
if connStatus.Connectedness == network.NotConnected {
if d.set.Contains(connStatus.Peer) {
log.Debugw("removing the peer from the peer set",
"peer", connStatus.Peer, "status", connStatus.Connectedness.String())
d.connector.RestartBackoff(connStatus.Peer)
d.set.Remove(connStatus.Peer)
d.onUpdatedPeers(connStatus.Peer, false)
d.host.ConnManager().UntagPeer(connStatus.Peer, topic)
t.Reset(d.discoveryInterval)
}
}
}
}()

for {
select {
case <-ctx.Done():
log.Info("Context canceled. Finishing peer discovery")
return
case <-t.C:
if uint(d.set.Size()) == d.peersLimit {
Expand All @@ -149,19 +188,6 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
for p := range peers {
go d.handlePeerFound(ctx, topic, p)
}
case e := <-sub.Out():
// listen to disconnect event to remove peer from set and reset backoff time
// reset timer in order to restart the discovery, once stored peer is disconnected
connStatus := e.(event.EvtPeerConnectednessChanged)
if connStatus.Connectedness == network.NotConnected {
if d.set.Contains(connStatus.Peer) {
d.connector.RestartBackoff(connStatus.Peer)
d.set.Remove(connStatus.Peer)
d.onUpdatedPeers(connStatus.Peer, false)
d.host.ConnManager().UntagPeer(connStatus.Peer, topic)
t.Reset(d.discoveryInterval)
}
}
}
}
}
Expand Down

0 comments on commit 5da1b7e

Please sign in to comment.