diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index 01001376d5..96eda8bd78 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -28,16 +28,15 @@ const ( // (by default it is 16) eventbusBufSize = 64 - // findPeersStuckWarnDelay is the duration after which discover will log an error message to - // notify that it is stuck. - findPeersStuckWarnDelay = time.Minute + // findPeersTimeout limits the FindPeers operation in time + findPeersTimeout = time.Minute - // defaultRetryTimeout defines time interval between discovery attempts. - defaultRetryTimeout = time.Second + // retryTimeout defines time interval between discovery and advertise attempts. + retryTimeout = time.Second ) -// defaultRetryTimeout defines time interval between discovery attempts. -var discoveryRetryTimeout = defaultRetryTimeout +// discoveryRetryTimeout defines time interval between discovery attempts, needed for tests +var discoveryRetryTimeout = retryTimeout // Discovery combines advertise and discover services and allows to store discovered nodes. // TODO: The code here gets horribly hairy, so we should refactor this at some point @@ -166,7 +165,9 @@ func (d *Discovery) Advertise(ctx context.Context) { } log.Warnw("error advertising", "rendezvous", rendezvousPoint, "err", err) - errTimer := time.NewTimer(time.Minute) + // we don't want retry indefinitely in busy loop + // internal discovery mechanism may need some time before attempts + errTimer := time.NewTimer(retryTimeout) select { case <-errTimer.C: errTimer.Stop() @@ -257,8 +258,7 @@ func (d *Discovery) discover(ctx context.Context) bool { // limit to minimize chances of overreaching the limit wg.SetLimit(int(d.set.Limit())) - // stop discovery when we are done - findCtx, findCancel := context.WithCancel(ctx) + findCtx, findCancel := context.WithTimeout(ctx, findPeersTimeout) defer func() { // some workers could still be running, wait them to finish before canceling findCtx wg.Wait() //nolint:errcheck @@ -271,26 +271,11 @@ func (d *Discovery) discover(ctx context.Context) bool { return false } - ticker := time.NewTicker(findPeersStuckWarnDelay) - defer ticker.Stop() for { - ticker.Reset(findPeersStuckWarnDelay) - // drain all previous ticks from channel - drainChannel(ticker.C) select { - case <-findCtx.Done(): - d.metrics.observeFindPeers(ctx, true, true) - return true - case <-ticker.C: - d.metrics.observeDiscoveryStuck(ctx) - log.Warn("wasn't able to find new peers for long time") - continue case p, ok := <-peers: if !ok { - isEnoughPeers := d.set.Size() >= d.set.Limit() - d.metrics.observeFindPeers(ctx, ctx.Err() != nil, isEnoughPeers) - log.Debugw("discovery channel closed", "find_is_canceled", findCtx.Err() != nil) - return isEnoughPeers + break } peer := p @@ -313,10 +298,18 @@ func (d *Discovery) discover(ctx context.Context) bool { } log.Infow("discovered wanted peers", "amount", size) - findCancel() + findCancel() // stop discovery when we are done return nil }) + + continue + case <-findCtx.Done(): } + + isEnoughPeers := d.set.Size() >= d.set.Limit() + d.metrics.observeFindPeers(ctx, isEnoughPeers) + log.Debugw("discovery finished", "discovered_wanted", isEnoughPeers) + return isEnoughPeers } } diff --git a/share/p2p/discovery/metrics.go b/share/p2p/discovery/metrics.go index c147a2eeeb..b6adbb1984 100644 --- a/share/p2p/discovery/metrics.go +++ b/share/p2p/discovery/metrics.go @@ -13,8 +13,7 @@ import ( ) const ( - discoveryEnougPeersKey = "enough_peers" - discoveryFindCancledKey = "is_canceled" + discoveryEnoughPeersKey = "enough_peers" handlePeerResultKey = "result" handlePeerSkipSelf handlePeerResult = "skip_self" @@ -37,7 +36,6 @@ type handlePeerResult string type metrics struct { peersAmount asyncint64.Gauge discoveryResult syncint64.Counter // attributes: enough_peers[bool],is_canceled[bool] - discoveryStuck syncint64.Counter handlePeerResult syncint64.Counter // attributes: result[string] advertise syncint64.Counter // attributes: failed[bool] peerAdded syncint64.Counter @@ -68,12 +66,6 @@ func initMetrics(d *Discovery) (*metrics, error) { return nil, err } - discoveryStuck, err := meter.SyncInt64().Counter("discovery_lookup_is_stuck", - instrument.WithDescription("indicates discovery wasn't able to find peers for more than 1 min")) - if err != nil { - return nil, err - } - handlePeerResultCounter, err := meter.SyncInt64().Counter("discovery_handler_peer_result", instrument.WithDescription("result handling found peer")) if err != nil { @@ -107,7 +99,6 @@ func initMetrics(d *Discovery) (*metrics, error) { metrics := &metrics{ peersAmount: peersAmount, discoveryResult: discoveryResult, - discoveryStuck: discoveryStuck, handlePeerResult: handlePeerResultCounter, advertise: advertise, peerAdded: peerAdded, @@ -130,7 +121,7 @@ func initMetrics(d *Discovery) (*metrics, error) { return metrics, nil } -func (m *metrics) observeFindPeers(ctx context.Context, canceled, isEnoughPeers bool) { +func (m *metrics) observeFindPeers(ctx context.Context, isEnoughPeers bool) { if m == nil { return } @@ -139,8 +130,7 @@ func (m *metrics) observeFindPeers(ctx context.Context, canceled, isEnoughPeers } m.discoveryResult.Add(ctx, 1, - attribute.Bool(discoveryFindCancledKey, canceled), - attribute.Bool(discoveryEnougPeersKey, isEnoughPeers)) + attribute.Bool(discoveryEnoughPeersKey, isEnoughPeers)) } func (m *metrics) observeHandlePeer(ctx context.Context, result handlePeerResult) { @@ -179,14 +169,3 @@ func (m *metrics) observeOnPeersUpdate(_ peer.ID, isAdded bool) { } m.peerRemoved.Add(ctx, 1) } - -func (m *metrics) observeDiscoveryStuck(ctx context.Context) { - if m == nil { - return - } - if ctx.Err() != nil { - ctx = context.Background() - } - - m.discoveryStuck.Add(ctx, 1) -}