Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(CommonService): add channel and use commonService in discv5 #735

Merged
merged 3 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 32 additions & 102 deletions waku/v2/discv5/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -34,23 +32,20 @@ type PeerConnector interface {
}

type DiscoveryV5 struct {
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
metrics Metrics
peerChannel *peerChannel
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
metrics Metrics

peerConnector PeerConnector
NAT nat.Interface

log *zap.Logger

started atomic.Bool
cancel context.CancelFunc
wg *sync.WaitGroup
*peermanager.CommonDiscoveryService
}

type discV5Parameters struct {
Expand Down Expand Up @@ -132,13 +127,12 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn
}

return &DiscoveryV5{
params: params,
peerConnector: peerConnector,
NAT: NAT,
wg: &sync.WaitGroup{},
peerChannel: &peerChannel{},
localnode: localnode,
metrics: newMetrics(reg),
params: params,
peerConnector: peerConnector,
NAT: NAT,
CommonDiscoveryService: peermanager.NewCommonDiscoveryService(),
localnode: localnode,
metrics: newMetrics(reg),
config: discover.Config{
PrivateKey: priv,
Bootnodes: params.bootnodes,
Expand Down Expand Up @@ -167,9 +161,9 @@ func (d *DiscoveryV5) listen(ctx context.Context) error {
d.udpAddr = conn.LocalAddr().(*net.UDPAddr)

if d.NAT != nil && !d.udpAddr.IP.IsLoopback() {
d.wg.Add(1)
d.WaitGroup().Add(1)
go func() {
defer d.wg.Done()
defer d.WaitGroup().Done()
nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
}()

Expand Down Expand Up @@ -197,74 +191,24 @@ func (d *DiscoveryV5) SetHost(h host.Host) {
d.host = h
}

type peerChannel struct {
harsh-98 marked this conversation as resolved.
Show resolved Hide resolved
mutex sync.Mutex
channel chan peermanager.PeerData
started bool
ctx context.Context
}

func (p *peerChannel) Start(ctx context.Context) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.started = true
p.ctx = ctx
p.channel = make(chan peermanager.PeerData)
}

func (p *peerChannel) Stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return
}
p.started = false
close(p.channel)
}

func (p *peerChannel) Subscribe() chan peermanager.PeerData {
return p.channel
}

func (p *peerChannel) Publish(peer peermanager.PeerData) bool {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return false
}
select {
case p.channel <- peer:
case <-p.ctx.Done():
return false

}
return true
}

// only works if the discovery v5 hasn't been started yet.
func (d *DiscoveryV5) Start(ctx context.Context) error {
// compare and swap sets the discovery v5 to `started` state
// and prevents multiple calls to the start method by being atomic.
if !d.started.CompareAndSwap(false, true) {
return nil
}

ctx, cancel := context.WithCancel(ctx)
d.cancel = cancel
return d.CommonDiscoveryService.Start(ctx, d.start)
}

d.peerChannel.Start(ctx)
d.peerConnector.Subscribe(ctx, d.peerChannel.Subscribe())
func (d *DiscoveryV5) start() error {
d.peerConnector.Subscribe(d.Context(), d.GetListeningChan())

err := d.listen(ctx)
err := d.listen(d.Context())
if err != nil {
return err
}

if d.params.autoFindPeers {
d.wg.Add(1)
d.WaitGroup().Add(1)
go func() {
defer d.wg.Done()
d.runDiscoveryV5Loop(ctx)
defer d.WaitGroup().Done()
d.runDiscoveryV5Loop(d.Context())
}()
}

Expand All @@ -284,27 +228,18 @@ func (d *DiscoveryV5) SetBootnodes(nodes []*enode.Node) error {
// only works if the discovery v5 is in running state
// so we can assume that cancel method is set
func (d *DiscoveryV5) Stop() {
if !d.started.CompareAndSwap(true, false) { // if Discoveryv5 is running, set started to false
return
}

d.cancel()

if d.listener != nil {
d.listener.Close()
d.listener = nil
d.log.Info("stopped Discovery V5")
}

d.wg.Wait()

defer func() {
if r := recover(); r != nil {
d.log.Info("recovering from panic and quitting")
}
}()

d.peerChannel.Stop()
d.CommonDiscoveryService.Stop(func() {
if d.listener != nil {
d.listener.Close()
d.listener = nil
d.log.Info("stopped Discovery V5")
}
})
}

/*
Expand Down Expand Up @@ -495,7 +430,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
ENR: n,
}

if d.peerChannel.Publish(peer) {
if d.PushToChan(peer) {
d.log.Debug("published peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
} else {
d.log.Debug("could not publish peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
Expand Down Expand Up @@ -527,8 +462,3 @@ restartLoop:
}
d.log.Warn("Discv5 loop stopped")
}

// IsStarted determines whether discoveryV5 started or not
func (d *DiscoveryV5) IsStarted() bool {
return d.started.Load()
}
81 changes: 81 additions & 0 deletions waku/v2/peermanager/common_discovery_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package peermanager

import (
"context"
"sync"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
)

// PeerData contains information about a peer useful in establishing connections with it.
type PeerData struct {
Origin wps.Origin
AddrInfo peer.AddrInfo
ENR *enode.Node
PubSubTopics []string
}

type CommonDiscoveryService struct {
commonService *protocol.CommonService
channel chan PeerData
}

func NewCommonDiscoveryService() *CommonDiscoveryService {
return &CommonDiscoveryService{
commonService: protocol.NewCommonService(),
}
}

func (sp *CommonDiscoveryService) Start(ctx context.Context, fn func() error) error {
return sp.commonService.Start(ctx, func() error {
// currently is used in discv5,peerConnector,rendevzous for returning new discovered Peers to peerConnector for connecting with them
// mutex protection for this operation
sp.channel = make(chan PeerData)
return fn()
})
}

func (sp *CommonDiscoveryService) Stop(stopFn func()) {
sp.commonService.Stop(func() {
stopFn()
sp.WaitGroup().Wait() // waitgroup is waited here so that channel can be closed after all the go rountines have stopped in service.
// there is a wait in the CommonService too
close(sp.channel)
})
}
func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData {
return sp.channel
}
func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool {
sp.RLock()
defer sp.RUnlock()
if err := sp.ErrOnNotRunning(); err != nil {
return false
}
select {
case sp.channel <- data:
return true
case <-sp.Context().Done():
return false
}
}

func (sp *CommonDiscoveryService) RLock() {
sp.commonService.RLock()
}
func (sp *CommonDiscoveryService) RUnlock() {
sp.commonService.RUnlock()
}

func (sp *CommonDiscoveryService) Context() context.Context {
return sp.commonService.Context()
}
func (sp *CommonDiscoveryService) ErrOnNotRunning() error {
return sp.commonService.ErrOnNotRunning()
}
func (sp *CommonDiscoveryService) WaitGroup() *sync.WaitGroup {
return sp.commonService.WaitGroup()
}
9 changes: 0 additions & 9 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -25,14 +24,6 @@ import (
lru "github.com/hashicorp/golang-lru"
)

// PeerData contains information about a peer useful in establishing connections with it.
type PeerData struct {
Origin wps.Origin
AddrInfo peer.AddrInfo
PubSubTopics []string
ENR *enode.Node
}

// PeerConnectionStrategy is a utility to connect to peers,
// but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
Expand Down
1 change: 1 addition & 0 deletions waku/v2/peerstore/waku_peer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
PeerExchange
DNSDiscovery
Rendezvous
PeerManager
harsh-98 marked this conversation as resolved.
Show resolved Hide resolved
)

const peerOrigin = "origin"
Expand Down
32 changes: 16 additions & 16 deletions waku/v2/rendezvous/rendezvous.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Rendezvous struct {
peerConnector PeerConnector

log *zap.Logger
*protocol.CommonService
*peermanager.CommonDiscoveryService
}

// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
Expand All @@ -43,10 +43,10 @@ type PeerConnector interface {
func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
logger := log.Named("rendezvous")
return &Rendezvous{
db: db,
peerConnector: peerConnector,
log: logger,
CommonService: protocol.NewCommonService(),
db: db,
peerConnector: peerConnector,
log: logger,
CommonDiscoveryService: peermanager.NewCommonDiscoveryService(),
}
}

Expand All @@ -56,14 +56,19 @@ func (r *Rendezvous) SetHost(h host.Host) {
}

func (r *Rendezvous) Start(ctx context.Context) error {
return r.CommonService.Start(ctx, r.start)
return r.CommonDiscoveryService.Start(ctx, r.start)
}

func (r *Rendezvous) start() error {
err := r.db.Start(r.Context())
if err != nil {
return err
if r.db != nil {
if err := r.db.Start(r.Context()); err != nil {
return err
}
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
}
if r.peerConnector != nil {
r.peerConnector.Subscribe(r.Context(), r.GetListeningChan())
}

r.rendezvousSvc = rvs.NewRendezvousService(r.host, r.db)

r.log.Info("rendezvous protocol started")
Expand Down Expand Up @@ -98,19 +103,14 @@ func (r *Rendezvous) DiscoverWithNamespace(ctx context.Context, namespace string
if len(addrInfo) != 0 {
rp.SetSuccess(cookie)

peerCh := make(chan peermanager.PeerData)
defer close(peerCh)
r.peerConnector.Subscribe(ctx, peerCh)
for _, p := range addrInfo {
peer := peermanager.PeerData{
Origin: peerstore.Rendezvous,
AddrInfo: p,
PubSubTopics: []string{namespace},
}
select {
case <-ctx.Done():
if !r.PushToChan(peer) {
return
case peerCh <- peer:
}
}
} else {
Expand Down Expand Up @@ -180,7 +180,7 @@ func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string
}

func (r *Rendezvous) Stop() {
r.CommonService.Stop(func() {
r.CommonDiscoveryService.Stop(func() {
r.host.RemoveStreamHandler(rvs.RendezvousProto)
r.rendezvousSvc = nil
})
Expand Down
Loading