Skip to content

Commit

Permalink
fix: remove generic functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh-98 committed Sep 18, 2023
1 parent da5ffde commit cafb0f0
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 65 deletions.
19 changes: 9 additions & 10 deletions waku/v2/discv5/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
Expand Down Expand Up @@ -46,7 +45,7 @@ type DiscoveryV5 struct {

log *zap.Logger

*protocol.CommonService[peermanager.PeerData]
*peermanager.CommonDiscoveryService
}

type discV5Parameters struct {
Expand Down Expand Up @@ -128,12 +127,12 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn
}

return &DiscoveryV5{
params: params,
peerConnector: peerConnector,
NAT: NAT,
CommonService: protocol.NewCommonService[peermanager.PeerData](),
localnode: localnode,
metrics: newMetrics(reg),
params: params,
peerConnector: peerConnector,
NAT: NAT,
CommonDiscoveryService: peermanager.NewCommonDiscoveryService(),
localnode: localnode,
metrics: newMetrics(reg),
config: discover.Config{
PrivateKey: priv,
Bootnodes: params.bootnodes,
Expand Down Expand Up @@ -194,7 +193,7 @@ func (d *DiscoveryV5) SetHost(h host.Host) {

// only works if the discovery v5 hasn't been started yet.
func (d *DiscoveryV5) Start(ctx context.Context) error {
return d.CommonService.Start(ctx, d.start)
return d.CommonDiscoveryService.Start(ctx, d.start)
}

func (d *DiscoveryV5) start() error {
Expand Down Expand Up @@ -234,7 +233,7 @@ func (d *DiscoveryV5) Stop() {
d.log.Info("recovering from panic and quitting")
}
}()
d.CommonService.Stop(func() {
d.CommonDiscoveryService.Stop(func() {
if d.listener != nil {
d.listener.Close()
d.listener = nil
Expand Down
81 changes: 81 additions & 0 deletions waku/v2/peermanager/common_discovery_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package peermanager

import (
"context"
"sync"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
)

// PeerData contains information about a peer useful in establishing connections with it.
type PeerData struct {
Origin wps.Origin
AddrInfo peer.AddrInfo
ENR *enode.Node
PubSubTopics []string
}

type CommonDiscoveryService struct {
commonService *protocol.CommonService
channel chan PeerData
}

func NewCommonDiscoveryService() *CommonDiscoveryService {
return &CommonDiscoveryService{
commonService: protocol.NewCommonService(),
}
}

func (sp *CommonDiscoveryService) Start(ctx context.Context, fn func() error) error {
return sp.commonService.Start(ctx, func() error {
// currently is used in discv5,peerConnector,rendevzous for returning new discovered Peers to peerConnector for connecting with them
// mutex protection for this operation
sp.channel = make(chan PeerData)
return fn()
})
}

func (sp *CommonDiscoveryService) Stop(stopFn func()) {
sp.commonService.Stop(func() {
stopFn()
sp.WaitGroup().Wait() // waitgroup is waited here so that channel can be closed after all the go rountines have stopped in service.
// there is a wait in the CommonService too
close(sp.channel)
})
}
func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData {
return sp.channel
}
func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool {
sp.RLock()
defer sp.RUnlock()
if err := sp.ErrOnNotRunning(); err != nil {
return false
}
select {
case sp.channel <- data:
return true
case <-sp.Context().Done():
return false
}
}

func (sp *CommonDiscoveryService) RLock() {
sp.commonService.RLock()
}
func (sp *CommonDiscoveryService) RUnlock() {
sp.commonService.RUnlock()
}

func (sp *CommonDiscoveryService) Context() context.Context {
return sp.commonService.Context()
}
func (sp *CommonDiscoveryService) ErrOnNotRunning() error {
return sp.commonService.ErrOnNotRunning()
}
func (sp *CommonDiscoveryService) WaitGroup() *sync.WaitGroup {
return sp.commonService.WaitGroup()
}
9 changes: 0 additions & 9 deletions waku/v2/peermanager/peer_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"sync"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -25,14 +24,6 @@ import (
lru "github.com/hashicorp/golang-lru"
)

// PeerData contains information about a peer useful in establishing connections with it.
type PeerData struct {
Origin wps.Origin
AddrInfo peer.AddrInfo
PubSubTopics []string
ENR *enode.Node
}

// PeerConnectionStrategy is a utility to connect to peers,
// but only if we have not recently tried connecting to them already
type PeerConnectionStrategy struct {
Expand Down
1 change: 1 addition & 0 deletions waku/v2/peerstore/waku_peer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
PeerExchange
DNSDiscovery
Rendezvous
PeerManager
)

const peerOrigin = "origin"
Expand Down
36 changes: 8 additions & 28 deletions waku/v2/protocol/common_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ import (
)

// this is common layout for all the services that require mutex protection and a guarantee that all running goroutines will be finished before stop finishes execution. This guarantee comes from waitGroup all one has to use CommonService.WaitGroup() in the goroutines that should finish by the end of stop function.
type CommonService[T any] struct {
type CommonService struct {
sync.RWMutex
cancel context.CancelFunc
ctx context.Context
wg sync.WaitGroup
started bool
channel chan T
}

func NewCommonService[T any]() *CommonService[T] {
return &CommonService[T]{
func NewCommonService() *CommonService {
return &CommonService{
wg: sync.WaitGroup{},
RWMutex: sync.RWMutex{},
}
Expand All @@ -26,16 +25,14 @@ func NewCommonService[T any]() *CommonService[T] {
// mutex protected start function
// creates internal context over provided context and runs fn safely
// fn is excerpt to be executed to start the protocol
func (sp *CommonService[T]) Start(ctx context.Context, fn func() error) error {
func (sp *CommonService) Start(ctx context.Context, fn func() error) error {
sp.Lock()
defer sp.Unlock()
if sp.started {
return ErrAlreadyStarted
}
sp.started = true
sp.ctx, sp.cancel = context.WithCancel(ctx)
// currently is used in discv5 for returning new discovered Peers to peerConnector for connecting with them
sp.channel = make(chan T)
if err := fn(); err != nil {
sp.started = false
sp.cancel()
Expand All @@ -48,7 +45,7 @@ var ErrAlreadyStarted = errors.New("already started")
var ErrNotStarted = errors.New("not started")

// mutex protected stop function
func (sp *CommonService[T]) Stop(fn func()) {
func (sp *CommonService) Stop(fn func()) {
sp.Lock()
defer sp.Unlock()
if !sp.started {
Expand All @@ -57,37 +54,20 @@ func (sp *CommonService[T]) Stop(fn func()) {
sp.cancel()
fn()
sp.wg.Wait()
close(sp.channel)
sp.started = false
}

// This is not a mutex protected function, it is up to the caller to use it in a mutex protected context
func (sp *CommonService[T]) ErrOnNotRunning() error {
func (sp *CommonService) ErrOnNotRunning() error {
if !sp.started {
return ErrNotStarted
}
return nil
}

func (sp *CommonService[T]) Context() context.Context {
func (sp *CommonService) Context() context.Context {
return sp.ctx
}
func (sp *CommonService[T]) WaitGroup() *sync.WaitGroup {
func (sp *CommonService) WaitGroup() *sync.WaitGroup {
return &sp.wg
}
func (sp *CommonService[T]) GetListeningChan() <-chan T {
return sp.channel
}
func (sp *CommonService[T]) PushToChan(data T) bool {
sp.RLock()
defer sp.RUnlock()
if !sp.started {
return false
}
select {
case sp.channel <- data:
return true
case <-sp.ctx.Done():
return false
}
}
2 changes: 1 addition & 1 deletion waku/v2/protocol/common_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// check if start and stop on common service works in random order
func TestCommonService(t *testing.T) {
s := NewCommonService[struct{}]()
s := NewCommonService()
wg := &sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var (
)

type WakuFilterLightNode struct {
*protocol.CommonService[struct{}]
*protocol.CommonService
h host.Host
broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s
timesource timesource.Timesource
Expand Down Expand Up @@ -64,7 +64,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM
wf.broadcaster = broadcaster
wf.timesource = timesource
wf.pm = pm
wf.CommonService = protocol.NewCommonService[struct{}]()
wf.CommonService = protocol.NewCommonService()
wf.metrics = newMetrics(reg)

return wf
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type (
msgSub relay.Subscription
metrics Metrics
log *zap.Logger
*protocol.CommonService[struct{}]
*protocol.CommonService
subscriptions *SubscribersMap

maxSubscriptions int
Expand All @@ -53,7 +53,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
opt(params)
}

wf.CommonService = protocol.NewCommonService[struct{}]()
wf.CommonService = protocol.NewCommonService()
wf.metrics = newMetrics(reg)
wf.subscriptions = NewSubscribersMap(params.Timeout)
wf.maxSubscriptions = params.MaxSubscribers
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/legacy_filter/waku_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type (
}

WakuFilter struct {
*protocol.CommonService[struct{}]
*protocol.CommonService
h host.Host
isFullNode bool
msgSub relay.Subscription
Expand Down Expand Up @@ -74,7 +74,7 @@ func NewWakuFilter(broadcaster relay.Broadcaster, isFullNode bool, timesource ti
}

wf.isFullNode = isFullNode
wf.CommonService = protocol.NewCommonService[struct{}]()
wf.CommonService = protocol.NewCommonService()
wf.filters = NewFilterMap(broadcaster, timesource)
wf.subscribers = NewSubscribers(params.Timeout)
wf.metrics = newMetrics(reg)
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/peer_exchange/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type WakuPeerExchange struct {
metrics Metrics
log *zap.Logger

*protocol.CommonService[struct{}]
*protocol.CommonService

peerConnector PeerConnector
enrCache *enrCache
Expand All @@ -63,7 +63,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector,
wakuPX.enrCache = newEnrCache
wakuPX.peerConnector = peerConnector
wakuPX.pm = pm
wakuPX.CommonService = protocol.NewCommonService[struct{}]()
wakuPX.CommonService = protocol.NewCommonService()

return wakuPX, nil
}
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type WakuRelay struct {
EvtRelayUnsubscribed event.Emitter
}

*waku_proto.CommonService[struct{}]
*waku_proto.CommonService
}

// EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created
Expand All @@ -90,7 +90,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
w.topicValidators = make(map[string][]validatorFn)
w.bcaster = bcaster
w.minPeersToPublish = minPeersToPublish
w.CommonService = waku_proto.NewCommonService[struct{}]()
w.CommonService = waku_proto.NewCommonService()
w.log = log.Named("relay")
w.events = eventbus.NewBus()
w.metrics = newMetrics(reg, w.log)
Expand Down
14 changes: 7 additions & 7 deletions waku/v2/rendezvous/rendezvous.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Rendezvous struct {
peerConnector PeerConnector

log *zap.Logger
*protocol.CommonService[peermanager.PeerData]
*peermanager.CommonDiscoveryService
}

// PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol
Expand All @@ -43,10 +43,10 @@ type PeerConnector interface {
func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous {
logger := log.Named("rendezvous")
return &Rendezvous{
db: db,
peerConnector: peerConnector,
log: logger,
CommonService: protocol.NewCommonService[peermanager.PeerData](),
db: db,
peerConnector: peerConnector,
log: logger,
CommonDiscoveryService: peermanager.NewCommonDiscoveryService(),
}
}

Expand All @@ -56,7 +56,7 @@ func (r *Rendezvous) SetHost(h host.Host) {
}

func (r *Rendezvous) Start(ctx context.Context) error {
return r.CommonService.Start(ctx, r.start)
return r.CommonDiscoveryService.Start(ctx, r.start)
}

func (r *Rendezvous) start() error {
Expand Down Expand Up @@ -180,7 +180,7 @@ func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string
}

func (r *Rendezvous) Stop() {
r.CommonService.Stop(func() {
r.CommonDiscoveryService.Stop(func() {
r.host.RemoveStreamHandler(rvs.RendezvousProto)
r.rendezvousSvc = nil
})
Expand Down

0 comments on commit cafb0f0

Please sign in to comment.