diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index b9e3baaa1..67bd4a2d6 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -16,7 +16,6 @@ import ( "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" - "github.com/waku-org/go-waku/waku/v2/protocol" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -46,7 +45,7 @@ type DiscoveryV5 struct { log *zap.Logger - *protocol.CommonService[peermanager.PeerData] + *peermanager.CommonDiscoveryService } type discV5Parameters struct { @@ -128,12 +127,12 @@ func NewDiscoveryV5(priv *ecdsa.PrivateKey, localnode *enode.LocalNode, peerConn } return &DiscoveryV5{ - params: params, - peerConnector: peerConnector, - NAT: NAT, - CommonService: protocol.NewCommonService[peermanager.PeerData](), - localnode: localnode, - metrics: newMetrics(reg), + params: params, + peerConnector: peerConnector, + NAT: NAT, + CommonDiscoveryService: peermanager.NewCommonDiscoveryService(), + localnode: localnode, + metrics: newMetrics(reg), config: discover.Config{ PrivateKey: priv, Bootnodes: params.bootnodes, @@ -194,7 +193,7 @@ func (d *DiscoveryV5) SetHost(h host.Host) { // only works if the discovery v5 hasn't been started yet. func (d *DiscoveryV5) Start(ctx context.Context) error { - return d.CommonService.Start(ctx, d.start) + return d.CommonDiscoveryService.Start(ctx, d.start) } func (d *DiscoveryV5) start() error { @@ -234,7 +233,7 @@ func (d *DiscoveryV5) Stop() { d.log.Info("recovering from panic and quitting") } }() - d.CommonService.Stop(func() { + d.CommonDiscoveryService.Stop(func() { if d.listener != nil { d.listener.Close() d.listener = nil diff --git a/waku/v2/peermanager/common_discovery_service.go b/waku/v2/peermanager/common_discovery_service.go new file mode 100644 index 000000000..ba8d66830 --- /dev/null +++ b/waku/v2/peermanager/common_discovery_service.go @@ -0,0 +1,80 @@ +package peermanager + +import ( + "context" + "sync" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/libp2p/go-libp2p/core/peer" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/protocol" +) + +// PeerData contains information about a peer useful in establishing connections with it. +type PeerData struct { + Origin wps.Origin + AddrInfo peer.AddrInfo + ENR *enode.Node +} + +type CommonDiscoveryService struct { + commonService *protocol.CommonService + channel chan PeerData +} + +func NewCommonDiscoveryService() *CommonDiscoveryService { + return &CommonDiscoveryService{ + commonService: protocol.NewCommonService(), + } +} + +func (sp *CommonDiscoveryService) Start(ctx context.Context, fn func() error) error { + return sp.commonService.Start(ctx, func() error { + // currently is used in discv5,peerConnector,rendevzous for returning new discovered Peers to peerConnector for connecting with them + // mutex protection for this operation + sp.channel = make(chan PeerData) + return fn() + }) +} + +func (sp *CommonDiscoveryService) Stop(stopFn func()) { + sp.commonService.Stop(func() { + stopFn() + sp.WaitGroup().Wait() // waitgroup is waited here so that channel can be closed after all the go rountines have stopped in service. + // there is a wait in the CommonService too + close(sp.channel) + }) +} +func (sp *CommonDiscoveryService) GetListeningChan() <-chan PeerData { + return sp.channel +} +func (sp *CommonDiscoveryService) PushToChan(data PeerData) bool { + sp.RLock() + defer sp.RUnlock() + if err := sp.ErrOnNotRunning(); err != nil { + return false + } + select { + case sp.channel <- data: + return true + case <-sp.Context().Done(): + return false + } +} + +func (sp *CommonDiscoveryService) RLock() { + sp.commonService.RLock() +} +func (sp *CommonDiscoveryService) RUnlock() { + sp.commonService.RUnlock() +} + +func (sp *CommonDiscoveryService) Context() context.Context { + return sp.commonService.Context() +} +func (sp *CommonDiscoveryService) ErrOnNotRunning() error { + return sp.commonService.ErrOnNotRunning() +} +func (sp *CommonDiscoveryService) WaitGroup() *sync.WaitGroup { + return sp.commonService.WaitGroup() +} diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index e8c0c31e6..cf7414b9e 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -25,14 +24,6 @@ import ( lru "github.com/hashicorp/golang-lru" ) -// PeerData contains information about a peer useful in establishing connections with it. -type PeerData struct { - Origin wps.Origin - AddrInfo peer.AddrInfo - PubSubTopics []string - ENR *enode.Node -} - // PeerConnectionStrategy is a utility to connect to peers, // but only if we have not recently tried connecting to them already type PeerConnectionStrategy struct { diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index 0d856001d..f8390fbd7 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -20,6 +20,7 @@ const ( PeerExchange DNSDiscovery Rendezvous + PeerManager ) const peerOrigin = "origin" diff --git a/waku/v2/protocol/common_service.go b/waku/v2/protocol/common_service.go index f33e12b48..657469612 100644 --- a/waku/v2/protocol/common_service.go +++ b/waku/v2/protocol/common_service.go @@ -7,17 +7,16 @@ import ( ) // this is common layout for all the services that require mutex protection and a guarantee that all running goroutines will be finished before stop finishes execution. This guarantee comes from waitGroup all one has to use CommonService.WaitGroup() in the goroutines that should finish by the end of stop function. -type CommonService[T any] struct { +type CommonService struct { sync.RWMutex cancel context.CancelFunc ctx context.Context wg sync.WaitGroup started bool - channel chan T } -func NewCommonService[T any]() *CommonService[T] { - return &CommonService[T]{ +func NewCommonService() *CommonService { + return &CommonService{ wg: sync.WaitGroup{}, RWMutex: sync.RWMutex{}, } @@ -26,7 +25,7 @@ func NewCommonService[T any]() *CommonService[T] { // mutex protected start function // creates internal context over provided context and runs fn safely // fn is excerpt to be executed to start the protocol -func (sp *CommonService[T]) Start(ctx context.Context, fn func() error) error { +func (sp *CommonService) Start(ctx context.Context, fn func() error) error { sp.Lock() defer sp.Unlock() if sp.started { @@ -34,8 +33,6 @@ func (sp *CommonService[T]) Start(ctx context.Context, fn func() error) error { } sp.started = true sp.ctx, sp.cancel = context.WithCancel(ctx) - // currently is used in discv5 for returning new discovered Peers to peerConnector for connecting with them - sp.channel = make(chan T) if err := fn(); err != nil { sp.started = false sp.cancel() @@ -48,7 +45,7 @@ var ErrAlreadyStarted = errors.New("already started") var ErrNotStarted = errors.New("not started") // mutex protected stop function -func (sp *CommonService[T]) Stop(fn func()) { +func (sp *CommonService) Stop(fn func()) { sp.Lock() defer sp.Unlock() if !sp.started { @@ -57,37 +54,20 @@ func (sp *CommonService[T]) Stop(fn func()) { sp.cancel() fn() sp.wg.Wait() - close(sp.channel) sp.started = false } // This is not a mutex protected function, it is up to the caller to use it in a mutex protected context -func (sp *CommonService[T]) ErrOnNotRunning() error { +func (sp *CommonService) ErrOnNotRunning() error { if !sp.started { return ErrNotStarted } return nil } -func (sp *CommonService[T]) Context() context.Context { +func (sp *CommonService) Context() context.Context { return sp.ctx } -func (sp *CommonService[T]) WaitGroup() *sync.WaitGroup { +func (sp *CommonService) WaitGroup() *sync.WaitGroup { return &sp.wg } -func (sp *CommonService[T]) GetListeningChan() <-chan T { - return sp.channel -} -func (sp *CommonService[T]) PushToChan(data T) bool { - sp.RLock() - defer sp.RUnlock() - if !sp.started { - return false - } - select { - case sp.channel <- data: - return true - case <-sp.ctx.Done(): - return false - } -} diff --git a/waku/v2/protocol/common_service_test.go b/waku/v2/protocol/common_service_test.go index 2c8c29832..cd707e11f 100644 --- a/waku/v2/protocol/common_service_test.go +++ b/waku/v2/protocol/common_service_test.go @@ -8,7 +8,7 @@ import ( // check if start and stop on common service works in random order func TestCommonService(t *testing.T) { - s := NewCommonService[struct{}]() + s := NewCommonService() wg := &sync.WaitGroup{} for i := 0; i < 1000; i++ { wg.Add(1) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index da2e8f5a6..200fa1fc1 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -33,7 +33,7 @@ var ( ) type WakuFilterLightNode struct { - *protocol.CommonService[struct{}] + *protocol.CommonService h host.Host broadcaster relay.Broadcaster //TODO: Move the broadcast functionality outside of relay client to a higher SDK layer.s timesource timesource.Timesource @@ -64,7 +64,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM wf.broadcaster = broadcaster wf.timesource = timesource wf.pm = pm - wf.CommonService = protocol.NewCommonService[struct{}]() + wf.CommonService = protocol.NewCommonService() wf.metrics = newMetrics(reg) return wf diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 7fa1b8d86..060ea3e36 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -34,7 +34,7 @@ type ( msgSub relay.Subscription metrics Metrics log *zap.Logger - *protocol.CommonService[struct{}] + *protocol.CommonService subscriptions *SubscribersMap maxSubscriptions int @@ -53,7 +53,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi opt(params) } - wf.CommonService = protocol.NewCommonService[struct{}]() + wf.CommonService = protocol.NewCommonService() wf.metrics = newMetrics(reg) wf.subscriptions = NewSubscribersMap(params.Timeout) wf.maxSubscriptions = params.MaxSubscribers diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index 32b13f8cf..e28a0729b 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -46,7 +46,7 @@ type ( } WakuFilter struct { - *protocol.CommonService[struct{}] + *protocol.CommonService h host.Host isFullNode bool msgSub relay.Subscription @@ -74,7 +74,7 @@ func NewWakuFilter(broadcaster relay.Broadcaster, isFullNode bool, timesource ti } wf.isFullNode = isFullNode - wf.CommonService = protocol.NewCommonService[struct{}]() + wf.CommonService = protocol.NewCommonService() wf.filters = NewFilterMap(broadcaster, timesource) wf.subscribers = NewSubscribers(params.Timeout) wf.metrics = newMetrics(reg) diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index d9742ff8d..af9f47e85 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -42,7 +42,7 @@ type WakuPeerExchange struct { metrics Metrics log *zap.Logger - *protocol.CommonService[struct{}] + *protocol.CommonService peerConnector PeerConnector enrCache *enrCache @@ -63,7 +63,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, peerConnector PeerConnector, wakuPX.enrCache = newEnrCache wakuPX.peerConnector = peerConnector wakuPX.pm = pm - wakuPX.CommonService = protocol.NewCommonService[struct{}]() + wakuPX.CommonService = protocol.NewCommonService() return wakuPX, nil } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 18cfc6b33..672732ace 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -64,7 +64,7 @@ type WakuRelay struct { EvtRelayUnsubscribed event.Emitter } - *waku_proto.CommonService[struct{}] + *waku_proto.CommonService } // EvtRelaySubscribed is an event emitted when a new subscription to a pubsub topic is created @@ -90,7 +90,7 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou w.topicValidators = make(map[string][]validatorFn) w.bcaster = bcaster w.minPeersToPublish = minPeersToPublish - w.CommonService = waku_proto.NewCommonService[struct{}]() + w.CommonService = waku_proto.NewCommonService() w.log = log.Named("relay") w.events = eventbus.NewBus() w.metrics = newMetrics(reg, w.log) diff --git a/waku/v2/rendezvous/rendezvous.go b/waku/v2/rendezvous/rendezvous.go index 02318e68b..e28d7b15f 100644 --- a/waku/v2/rendezvous/rendezvous.go +++ b/waku/v2/rendezvous/rendezvous.go @@ -31,7 +31,7 @@ type Rendezvous struct { peerConnector PeerConnector log *zap.Logger - *protocol.CommonService[peermanager.PeerData] + *peermanager.CommonDiscoveryService } // PeerConnector will subscribe to a channel containing the information for all peers found by this discovery protocol @@ -43,10 +43,10 @@ type PeerConnector interface { func NewRendezvous(db *DB, peerConnector PeerConnector, log *zap.Logger) *Rendezvous { logger := log.Named("rendezvous") return &Rendezvous{ - db: db, - peerConnector: peerConnector, - log: logger, - CommonService: protocol.NewCommonService[peermanager.PeerData](), + db: db, + peerConnector: peerConnector, + log: logger, + CommonDiscoveryService: peermanager.NewCommonDiscoveryService(), } } @@ -56,7 +56,7 @@ func (r *Rendezvous) SetHost(h host.Host) { } func (r *Rendezvous) Start(ctx context.Context) error { - return r.CommonService.Start(ctx, r.start) + return r.CommonDiscoveryService.Start(ctx, r.start) } func (r *Rendezvous) start() error { @@ -180,7 +180,7 @@ func (r *Rendezvous) RegisterWithNamespace(ctx context.Context, namespace string } func (r *Rendezvous) Stop() { - r.CommonService.Stop(func() { + r.CommonDiscoveryService.Stop(func() { r.host.RemoveStreamHandler(rvs.RendezvousProto) r.rendezvousSvc = nil })