Skip to content

Commit

Permalink
fix: limit number of subscribers and criteria
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Mar 4, 2023
1 parent 2d2fbc7 commit 30d4829
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 7 deletions.
4 changes: 4 additions & 0 deletions waku/v2/protocol/filterv2/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package filterv2

const DefaultMaxSubscriptions = 1000
const MaxCriteriaPerSubscription = 1000
27 changes: 27 additions & 0 deletions waku/v2/protocol/filterv2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package filterv2

import (
"context"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -25,10 +26,23 @@ type (
log *zap.Logger
}

FilterParameters struct {
Timeout time.Duration
MaxSubscribers int
}

Option func(*FilterParameters)

FilterSubscribeOption func(*FilterSubscribeParameters)
FilterUnsubscribeOption func(*FilterUnsubscribeParameters)
)

func WithTimeout(timeout time.Duration) Option {
return func(params *FilterParameters) {
params.Timeout = timeout
}
}

func WithPeer(p peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.selectedPeer = p
Expand Down Expand Up @@ -112,3 +126,16 @@ func DefaultUnsubscribeOptions() []FilterUnsubscribeOption {
AutomaticRequestId(),
}
}

func WithMaxSubscribers(maxSubscribers int) Option {
return func(params *FilterParameters) {
params.MaxSubscribers = maxSubscribers
}
}

func DefaultOptions() []Option {
return []Option{
WithTimeout(24 * time.Hour),
WithMaxSubscribers(DefaultMaxSubscriptions),
}
}
34 changes: 27 additions & 7 deletions waku/v2/protocol/filterv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.opencensus.io/tag"
Expand All @@ -27,6 +26,8 @@ import (
// allow filter clients to subscribe, modify, refresh and unsubscribe a desired set of filter criteria
const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/2.0.0-beta1")

const peerHasNoSubscription = "peer has no subscriptions"

type (
WakuFilterFull struct {
cancel context.CancelFunc
Expand All @@ -36,16 +37,18 @@ type (
log *zap.Logger

subscriptions *SubscribersMap

maxSubscriptions int
}
)

// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...filter.Option) *WakuFilterFull {
func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFull {
wf := new(WakuFilterFull)
wf.log = log.Named("filterv2-fullnode")

params := new(filter.FilterParameters)
optList := filter.DefaultOptions()
params := new(FilterParameters)
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
Expand Down Expand Up @@ -138,7 +141,7 @@ func (wf *WakuFilterFull) ping(s network.Stream, logger *zap.Logger, request *pb
if exists {
reply(s, logger, request, http.StatusOK)
} else {
reply(s, logger, request, http.StatusNotFound)
reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
}
}

Expand All @@ -153,8 +156,25 @@ func (wf *WakuFilterFull) subscribe(s network.Stream, logger *zap.Logger, reques
return
}

if wf.subscriptions.Count() >= wf.maxSubscriptions {
reply(s, logger, request, http.StatusServiceUnavailable, "node has reached maximum number of subscriptions")
return
}

peerID := s.Conn().RemotePeer()

if totalSubs, exists := wf.subscriptions.Get(peerID); exists {
ctTotal := 0
for _, contentTopicSet := range totalSubs {
ctTotal += len(contentTopicSet)
}

if ctTotal+len(request.ContentTopics) > MaxCriteriaPerSubscription {
reply(s, logger, request, http.StatusServiceUnavailable, "peer has reached maximum number of filter criteria")
return
}
}

wf.subscriptions.Set(peerID, request.PubsubTopic, request.ContentTopics)

reply(s, logger, request, http.StatusOK)
Expand All @@ -173,7 +193,7 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ

err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics)
if err != nil {
reply(s, logger, request, http.StatusNotFound)
reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
} else {
reply(s, logger, request, http.StatusOK)
}
Expand All @@ -182,7 +202,7 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ
func (wf *WakuFilterFull) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer())
if err != nil {
reply(s, logger, request, http.StatusNotFound)
reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
} else {
reply(s, logger, request, http.StatusOK)
}
Expand Down
7 changes: 7 additions & 0 deletions waku/v2/protocol/filterv2/subscribers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ func (sub *SubscribersMap) RemoveAll() {
sub.items = make(map[peer.ID]PubsubTopics)
}

func (sub *SubscribersMap) Count() int {
sub.RLock()
defer sub.RUnlock()

return len(sub.items)
}

func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID {
c := make(chan peer.ID)

Expand Down

0 comments on commit 30d4829

Please sign in to comment.