From 3277dbf98c071f7d340db08da3d96a32b1afaea9 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 5 Feb 2021 11:43:38 +0100 Subject: [PATCH 1/3] eth: don't wait for snap registration if we're not running snap --- eth/backend.go | 1 + eth/handler.go | 15 +++++++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index a6390facb30b..622906767ab6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -529,6 +529,7 @@ func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer } func (s *Ethereum) Protocols() []p2p.Protocol { protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates) if s.config.SnapshotCache > 0 { + s.handler.snapEnabled = true protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler), s.snapDialCandidates)...) } return protos diff --git a/eth/handler.go b/eth/handler.go index a5a62b894dba..69d38197ae91 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -95,6 +95,10 @@ type handler struct { snapSync uint32 // Flag whether fast sync should operate on top of the snap protocol acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) + // Flag whether we're running the snap protocol handler. This determines if we have to + // wait for snap extension or not + snapEnabled bool + checkpointNumber uint64 // Block number for the sync progress validator to cross reference checkpointHash common.Hash // Block hash for the sync progress validator to cross reference @@ -234,10 +238,13 @@ func newHandler(config *handlerConfig) (*handler, error) { func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // If the peer has a `snap` extension, wait for it to connect so we can have // a uniform initialization/teardown mechanism - snap, err := h.peers.waitSnapExtension(peer) - if err != nil { - peer.Log().Error("Snapshot extension barrier failed", "err", err) - return err + var snap *snap.Peer + if h.snapEnabled { + var err error + if snap, err = h.peers.waitSnapExtension(peer); err != nil { + peer.Log().Error("Snapshot extension barrier failed", "err", err) + return err + } } // TODO(karalabe): Not sure why this is needed if !h.chainSync.handlePeerEvent(peer) { From ecada570938290175429a5eae5bcad5f5ed8cb6b Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 5 Feb 2021 12:16:48 +0100 Subject: [PATCH 2/3] eth, p2p: better logic surrounding protocol capabilities --- eth/backend.go | 1 - eth/handler.go | 18 +++++++----------- eth/peerset.go | 4 ++-- p2p/peer.go | 17 ++++++++--------- 4 files changed, 17 insertions(+), 23 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 622906767ab6..a6390facb30b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -529,7 +529,6 @@ func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer } func (s *Ethereum) Protocols() []p2p.Protocol { protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates) if s.config.SnapshotCache > 0 { - s.handler.snapEnabled = true protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler), s.snapDialCandidates)...) } return protos diff --git a/eth/handler.go b/eth/handler.go index 69d38197ae91..f957bf73466d 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -95,10 +95,6 @@ type handler struct { snapSync uint32 // Flag whether fast sync should operate on top of the snap protocol acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) - // Flag whether we're running the snap protocol handler. This determines if we have to - // wait for snap extension or not - snapEnabled bool - checkpointNumber uint64 // Block number for the sync progress validator to cross reference checkpointHash common.Hash // Block hash for the sync progress validator to cross reference @@ -238,10 +234,10 @@ func newHandler(config *handlerConfig) (*handler, error) { func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // If the peer has a `snap` extension, wait for it to connect so we can have // a uniform initialization/teardown mechanism - var snap *snap.Peer - if h.snapEnabled { + var snapPeer *snap.Peer + if peer.RunningCap(snap.ProtocolName, snap.ProtocolVersions) { var err error - if snap, err = h.peers.waitSnapExtension(peer); err != nil { + if snapPeer, err = h.peers.waitSnapExtension(peer); err != nil { peer.Log().Error("Snapshot extension barrier failed", "err", err) return err } @@ -268,7 +264,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { } reject := false // reserved peer slots if atomic.LoadUint32(&h.snapSync) == 1 { - if snap == nil { + if snapPeer == nil { // If we are running snap-sync, we want to reserve roughly half the peer // slots for peers supporting the snap protocol. // The logic here is; we only allow up to 5 more non-snap peers than snap-peers. @@ -286,7 +282,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Debug("Ethereum peer connected", "name", peer.Name()) // Register the peer locally - if err := h.peers.registerPeer(peer, snap); err != nil { + if err := h.peers.registerPeer(peer, snapPeer); err != nil { peer.Log().Error("Ethereum peer registration failed", "err", err) return err } @@ -301,8 +297,8 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Error("Failed to register peer in eth syncer", "err", err) return err } - if snap != nil { - if err := h.downloader.SnapSyncer.Register(snap); err != nil { + if snapPeer != nil { + if err := h.downloader.SnapSyncer.Register(snapPeer); err != nil { peer.Log().Error("Failed to register peer in snap syncer", "err", err) return err } diff --git a/eth/peerset.go b/eth/peerset.go index f0657e140b31..1e864a8e46f2 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -73,7 +73,7 @@ func newPeerSet() *peerSet { func (ps *peerSet) registerSnapExtension(peer *snap.Peer) error { // Reject the peer if it advertises `snap` without `eth` as `snap` is only a // satellite protocol meaningful with the chain selection of `eth` - if !peer.SupportsCap(eth.ProtocolName, eth.ProtocolVersions) { + if !peer.RunningCap(eth.ProtocolName, eth.ProtocolVersions) { return errSnapWithoutEth } // Ensure nobody can double connect @@ -101,7 +101,7 @@ func (ps *peerSet) registerSnapExtension(peer *snap.Peer) error { // by the peerset. func (ps *peerSet) waitSnapExtension(peer *eth.Peer) (*snap.Peer, error) { // If the peer does not support a compatible `snap`, don't wait - if !peer.SupportsCap(snap.ProtocolName, snap.ProtocolVersions) { + if !peer.RunningCap(snap.ProtocolName, snap.ProtocolVersions) { return nil, nil } // Ensure nobody can double connect diff --git a/p2p/peer.go b/p2p/peer.go index 08881e258389..8ebc858392b5 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -158,15 +158,14 @@ func (p *Peer) Caps() []Cap { return p.rw.caps } -// SupportsCap returns true if the peer supports any of the enumerated versions -// of a specific protocol. -func (p *Peer) SupportsCap(protocol string, versions []uint) bool { - for _, cap := range p.rw.caps { - if cap.Name == protocol { - for _, ver := range versions { - if cap.Version == ver { - return true - } +// RunningCap returns true if the peer is actively connected using any of the +// enumerated versions of a specific protocol, meaning that at least one of the +// versions is supported by both this node and the peer p. +func (p *Peer) RunningCap(protocol string, versions []uint) bool { + if proto, ok := p.running[protocol]; ok { + for _, ver := range versions { + if proto.Version == ver { + return true } } } From 34241b5729835041d7864def69b706448d6e4761 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 5 Feb 2021 12:47:11 +0100 Subject: [PATCH 3/3] eth: simplify protocol logic --- eth/handler.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index f957bf73466d..a5a62b894dba 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -234,13 +234,10 @@ func newHandler(config *handlerConfig) (*handler, error) { func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // If the peer has a `snap` extension, wait for it to connect so we can have // a uniform initialization/teardown mechanism - var snapPeer *snap.Peer - if peer.RunningCap(snap.ProtocolName, snap.ProtocolVersions) { - var err error - if snapPeer, err = h.peers.waitSnapExtension(peer); err != nil { - peer.Log().Error("Snapshot extension barrier failed", "err", err) - return err - } + snap, err := h.peers.waitSnapExtension(peer) + if err != nil { + peer.Log().Error("Snapshot extension barrier failed", "err", err) + return err } // TODO(karalabe): Not sure why this is needed if !h.chainSync.handlePeerEvent(peer) { @@ -264,7 +261,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { } reject := false // reserved peer slots if atomic.LoadUint32(&h.snapSync) == 1 { - if snapPeer == nil { + if snap == nil { // If we are running snap-sync, we want to reserve roughly half the peer // slots for peers supporting the snap protocol. // The logic here is; we only allow up to 5 more non-snap peers than snap-peers. @@ -282,7 +279,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Debug("Ethereum peer connected", "name", peer.Name()) // Register the peer locally - if err := h.peers.registerPeer(peer, snapPeer); err != nil { + if err := h.peers.registerPeer(peer, snap); err != nil { peer.Log().Error("Ethereum peer registration failed", "err", err) return err } @@ -297,8 +294,8 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Error("Failed to register peer in eth syncer", "err", err) return err } - if snapPeer != nil { - if err := h.downloader.SnapSyncer.Register(snapPeer); err != nil { + if snap != nil { + if err := h.downloader.SnapSyncer.Register(snap); err != nil { peer.Log().Error("Failed to register peer in snap syncer", "err", err) return err }