Skip to content

Commit

Permalink
feat(CommonService): add channel and use commonService in discv5
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh-98 committed Sep 14, 2023
1 parent 4b1c188 commit c5c61a2
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 125 deletions.
125 changes: 28 additions & 97 deletions waku/v2/discv5/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -18,6 +16,7 @@ import (
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
Expand All @@ -34,23 +33,20 @@ type PeerConnector interface {
}

type DiscoveryV5 struct {
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
metrics Metrics
peerChannel *peerChannel
params *discV5Parameters
host host.Host
config discover.Config
udpAddr *net.UDPAddr
listener *discover.UDPv5
localnode *enode.LocalNode
metrics Metrics

peerConnector PeerConnector
NAT nat.Interface

log *zap.Logger

started atomic.Bool
cancel context.CancelFunc
wg *sync.WaitGroup
*protocol.CommonService[peermanager.PeerData]
}

type discV5Parameters struct {
Expand Down Expand Up @@ -135,8 +131,7 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn
params: params,
peerConnector: peerConnector,
NAT: NAT,
wg: &sync.WaitGroup{},
peerChannel: &peerChannel{},
CommonService: protocol.NewCommonService[peermanager.PeerData](),
localnode: localnode,
metrics: newMetrics(reg),
config: discover.Config{
Expand Down Expand Up @@ -167,9 +162,9 @@ func (d *DiscoveryV5) listen(ctx context.Context) error {
d.udpAddr = conn.LocalAddr().(*net.UDPAddr)

if d.NAT != nil && !d.udpAddr.IP.IsLoopback() {
d.wg.Add(1)
d.WaitGroup().Add(1)
go func() {
defer d.wg.Done()
defer d.WaitGroup().Done()
nat.Map(d.NAT, ctx.Done(), "udp", d.udpAddr.Port, d.udpAddr.Port, "go-waku discv5 discovery")
}()

Expand Down Expand Up @@ -197,74 +192,24 @@ func (d *DiscoveryV5) SetHost(h host.Host) {
d.host = h
}

type peerChannel struct {
mutex sync.Mutex
channel chan peermanager.PeerData
started bool
ctx context.Context
}

func (p *peerChannel) Start(ctx context.Context) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.started = true
p.ctx = ctx
p.channel = make(chan peermanager.PeerData)
}

func (p *peerChannel) Stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return
}
p.started = false
close(p.channel)
}

func (p *peerChannel) Subscribe() chan peermanager.PeerData {
return p.channel
}

func (p *peerChannel) Publish(peer peermanager.PeerData) bool {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.started {
return false
}
select {
case p.channel <- peer:
case <-p.ctx.Done():
return false

}
return true
}

// only works if the discovery v5 hasn't been started yet.
func (d *DiscoveryV5) Start(ctx context.Context) error {
// compare and swap sets the discovery v5 to `started` state
// and prevents multiple calls to the start method by being atomic.
if !d.started.CompareAndSwap(false, true) {
return nil
}

ctx, cancel := context.WithCancel(ctx)
d.cancel = cancel
return d.CommonService.Start(ctx, d.start)
}

d.peerChannel.Start(ctx)
d.peerConnector.Subscribe(ctx, d.peerChannel.Subscribe())
func (d *DiscoveryV5) start() error {
d.peerConnector.Subscribe(d.Context(), d.GetListeningChan())

err := d.listen(ctx)
err := d.listen(d.Context())
if err != nil {
return err
}

if d.params.autoFindPeers {
d.wg.Add(1)
d.WaitGroup().Add(1)
go func() {
defer d.wg.Done()
d.runDiscoveryV5Loop(ctx)
defer d.WaitGroup().Done()
d.runDiscoveryV5Loop(d.Context())
}()
}

Expand All @@ -284,27 +229,18 @@ func (d *DiscoveryV5) SetBootnodes(nodes []*enode.Node) error {
// only works if the discovery v5 is in running state
// so we can assume that cancel method is set
func (d *DiscoveryV5) Stop() {
if !d.started.CompareAndSwap(true, false) { // if Discoveryv5 is running, set started to false
return
}

d.cancel()

if d.listener != nil {
d.listener.Close()
d.listener = nil
d.log.Info("stopped Discovery V5")
}

d.wg.Wait()

defer func() {
if r := recover(); r != nil {
d.log.Info("recovering from panic and quitting")
}
}()

d.peerChannel.Stop()
d.CommonService.Stop(func() {
if d.listener != nil {
d.listener.Close()
d.listener = nil
d.log.Info("stopped Discovery V5")
}
})
}

/*
Expand Down Expand Up @@ -495,7 +431,7 @@ func (d *DiscoveryV5) peerLoop(ctx context.Context) error {
ENR: n,
}

if d.peerChannel.Publish(peer) {
if d.PushToChan(peer) {
d.log.Debug("published peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
} else {
d.log.Debug("could not publish peer into peer channel", logging.HostID("peerID", peer.AddrInfo.ID))
Expand Down Expand Up @@ -527,8 +463,3 @@ restartLoop:
}
d.log.Warn("Discv5 loop stopped")
}

// IsStarted determines whether discoveryV5 started or not
func (d *DiscoveryV5) IsStarted() bool {
return d.started.Load()
}
49 changes: 34 additions & 15 deletions waku/v2/protocol/common_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@ import (
"context"
"errors"
"sync"
"sync/atomic"
)

// this is common layout for all the services that require mutex protection and a guarantee that all running goroutines will be finished before stop finishes execution. This guarantee comes from waitGroup all one has to use CommonService.WaitGroup() in the goroutines that should finish by the end of stop function.
type CommonService struct {
type CommonService[T any] struct {
sync.RWMutex
cancel context.CancelFunc
ctx context.Context
wg sync.WaitGroup
started bool
started atomic.Bool
channel chan T
}

func NewCommonService() *CommonService {
return &CommonService{
func NewCommonService[T any]() *CommonService[T] {
return &CommonService[T]{
wg: sync.WaitGroup{},
RWMutex: sync.RWMutex{},
}
Expand All @@ -25,16 +27,18 @@ func NewCommonService() *CommonService {
// mutex protected start function
// creates internal context over provided context and runs fn safely
// fn is excerpt to be executed to start the protocol
func (sp *CommonService) Start(ctx context.Context, fn func() error) error {
func (sp *CommonService[T]) Start(ctx context.Context, fn func() error) error {
sp.Lock()
defer sp.Unlock()
if sp.started {
if sp.started.Load() {
return ErrAlreadyStarted
}
sp.started = true
sp.started.Store(true)
sp.ctx, sp.cancel = context.WithCancel(ctx)
// currently is used in discv5 for returning new discovered Peers to peerConnector for connecting with them
sp.channel = make(chan T)
if err := fn(); err != nil {
sp.started = false
sp.started.Store(false)
sp.cancel()
return err
}
Expand All @@ -45,29 +49,44 @@ var ErrAlreadyStarted = errors.New("already started")
var ErrNotStarted = errors.New("not started")

// mutex protected stop function
func (sp *CommonService) Stop(fn func()) {
func (sp *CommonService[T]) Stop(fn func()) {
sp.Lock()
defer sp.Unlock()
if !sp.started {
if !sp.started.Load() {
return
}
sp.cancel()
fn()
sp.wg.Wait()
sp.started = false
close(sp.channel)
sp.started.Store(false)
}

// This is not a mutex protected function, it is up to the caller to use it in a mutex protected context
func (sp *CommonService) ErrOnNotRunning() error {
if !sp.started {
func (sp *CommonService[T]) ErrOnNotRunning() error {
if !sp.started.Load() {
return ErrNotStarted
}
return nil
}

func (sp *CommonService) Context() context.Context {
func (sp *CommonService[T]) Context() context.Context {
return sp.ctx
}
func (sp *CommonService) WaitGroup() *sync.WaitGroup {
func (sp *CommonService[T]) WaitGroup() *sync.WaitGroup {
return &sp.wg
}
func (sp *CommonService[T]) GetListeningChan() <-chan T {
return sp.channel
}
func (sp *CommonService[T]) PushToChan(data T) bool {
if !sp.started.Load() {
return false
}
select {
case sp.channel <- data:
return true
case <-sp.ctx.Done():
return false
}
}
2 changes: 1 addition & 1 deletion waku/v2/protocol/common_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// check if start and stop on common service works in random order
func TestCommonService(t *testing.T) {
s := NewCommonService()
s := NewCommonService[struct{}]()
wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
)

type WakuFilterLightNode struct {
*protocol.CommonService
*protocol.CommonService[struct{}]
h host.Host
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s
timesource timesource.Timesource
Expand Down Expand Up @@ -64,7 +64,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM
wf.broadcaster = broadcaster
wf.timesource = timesource
wf.pm = pm
wf.CommonService = protocol.NewCommonService()
wf.CommonService = protocol.NewCommonService[struct{}]()
wf.metrics = newMetrics(reg)

return wf
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type (
msgSub relay.Subscription
metrics Metrics
log *zap.Logger
*protocol.CommonService
*protocol.CommonService[struct{}]
subscriptions *SubscribersMap

maxSubscriptions int
Expand All @@ -53,7 +53,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
opt(params)
}

wf.CommonService = protocol.NewCommonService()
wf.CommonService = protocol.NewCommonService[struct{}]()
wf.metrics = newMetrics(reg)
wf.subscriptions = NewSubscribersMap(params.Timeout)
wf.maxSubscriptions = params.MaxSubscribers
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/legacy_filter/waku_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type (
}

WakuFilter struct {
*protocol.CommonService
*protocol.CommonService[struct{}]
h host.Host
isFullNode bool
msgSub relay.Subscription
Expand Down Expand Up @@ -74,7 +74,7 @@ func NewWakuFilter(broadcaster relay.Broadcaster, isFullNode bool, timesource ti
}

wf.isFullNode = isFullNode
wf.CommonService = protocol.NewCommonService()
wf.CommonService = protocol.NewCommonService[struct{}]()
wf.filters = NewFilterMap(broadcaster, timesource)
wf.subscribers = NewSubscribers(params.Timeout)
wf.metrics = newMetrics(reg)
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/peer_exchange/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type WakuPeerExchange struct {
metrics Metrics
log *zap.Logger

*protocol.CommonService
*protocol.CommonService[struct{}]

peerConnector PeerConnector
enrCache *enrCache
Expand All @@ -63,7 +63,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector,
wakuPX.enrCache = newEnrCache
wakuPX.peerConnector = peerConnector
wakuPX.pm = pm
wakuPX.CommonService = protocol.NewCommonService()
wakuPX.CommonService = protocol.NewCommonService[struct{}]()

return wakuPX, nil
}
Expand Down
Loading

0 comments on commit c5c61a2

Please sign in to comment.