Skip to content

Commit

Permalink
chore: refactor rest and provide cacheSize as config option for relay…
Browse files Browse the repository at this point in the history
… subscribe (#869)
  • Loading branch information
chaitanyaprem authored Nov 7, 2023
1 parent 9463591 commit b154233
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 13 deletions.
9 changes: 8 additions & 1 deletion cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,14 @@ func Execute(options NodeOptions) error {
var restServer *rest.WakuRest
if options.RESTServer.Enable {
wg.Add(1)
restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.Admin, options.RESTServer.RelayCacheCapacity, options.RESTServer.FilterCacheCapacity, logger)
restConfig := rest.RestConfig{Address: options.RESTServer.Address,
Port: uint(options.RESTServer.Port),
EnablePProf: options.PProf,
EnableAdmin: options.RESTServer.Admin,
RelayCacheCapacity: uint(options.RESTServer.RelayCacheCapacity),
FilterCacheCapacity: uint(options.RESTServer.FilterCacheCapacity)}

restServer = rest.NewWakuRest(wakuNode, restConfig, logger)
restServer.Start(ctx, &wg)
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ type RelayService struct {

log *zap.Logger

cacheCapacity int
cacheCapacity uint
}

// NewRelayService returns an instance of RelayService
func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService {
func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity uint, log *zap.Logger) *RelayService {
s := &RelayService{
node: node,
log: log.Named("relay"),
Expand Down Expand Up @@ -91,7 +91,7 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ
} else {
topicToSubscribe = topic
}
_, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe))
_, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity))

if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
Expand Down Expand Up @@ -193,7 +193,7 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.
defer req.Body.Close()

var err error
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...))
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...), relay.WithCacheSize(r.cacheCapacity))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err))
}
Expand Down
21 changes: 15 additions & 6 deletions cmd/waku/server/rest/waku_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,24 @@ type WakuRest struct {
filterService *FilterService
}

func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool, enableAdmin bool, relayCacheCapacity, filterCacheCapacity int, log *zap.Logger) *WakuRest {
type RestConfig struct {
Address string
Port uint
EnablePProf bool
EnableAdmin bool
RelayCacheCapacity uint
FilterCacheCapacity uint
}

func NewWakuRest(node *node.WakuNode, config RestConfig, log *zap.Logger) *WakuRest {
wrpc := new(WakuRest)
wrpc.log = log.Named("rest")

mux := chi.NewRouter()
mux.Use(middleware.Logger)
mux.Use(middleware.NoCache)

if enablePProf {
if config.EnablePProf {
mux.Mount("/debug", middleware.Profiler())
}

Expand All @@ -39,7 +48,7 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool
_ = NewStoreService(node, mux)
_ = NewLightpushService(node, mux, log)

listenAddr := fmt.Sprintf("%s:%d", address, port)
listenAddr := fmt.Sprintf("%s:%d", config.Address, config.Port)

server := &http.Server{
Addr: listenAddr,
Expand All @@ -50,16 +59,16 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool
wrpc.server = server

if node.Relay() != nil {
relayService := NewRelayService(node, mux, relayCacheCapacity, log)
relayService := NewRelayService(node, mux, config.RelayCacheCapacity, log)
wrpc.relayService = relayService
}

if enableAdmin {
if config.EnableAdmin {
_ = NewAdminService(node, mux, wrpc.log)
}

if node.FilterLightnode() != nil {
filterService := NewFilterService(node, mux, filterCacheCapacity, log)
filterService := NewFilterService(node, mux, int(config.FilterCacheCapacity), log)
server.RegisterOnShutdown(func() {
filterService.Stop()
})
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rest/waku_rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestWakuRest(t *testing.T) {
n, err := node.New(options)
require.NoError(t, err)

rpc := NewWakuRest(n, "127.0.0.1", 8080, false, false, 10, 0, utils.Logger())
rpc := NewWakuRest(n, RestConfig{Address: "127.0.0.1", Port: 8080, EnablePProf: false, EnableAdmin: false, RelayCacheCapacity: 10}, utils.Logger())
require.NotNil(t, rpc.server)
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
}
8 changes: 8 additions & 0 deletions waku/v2/protocol/relay/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var DefaultRelaySubscriptionBufferSize int = 1024

type RelaySubscribeParameters struct {
dontConsume bool
cacheSize uint
}

type RelaySubscribeOption func(*RelaySubscribeParameters) error
Expand All @@ -28,6 +29,13 @@ func WithoutConsumer() RelaySubscribeOption {
}
}

func WithCacheSize(size uint) RelaySubscribeOption {
return func(params *RelaySubscribeParameters) error {
params.cacheSize = size
return nil
}
}

func msgIDFn(pmsg *pubsub_pb.Message) string {
return string(hash.SHA256(pmsg.Data))
}
Expand Down
5 changes: 4 additions & 1 deletion waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
return nil, err
}
}
if params.cacheSize <= 0 {
params.cacheSize = uint(DefaultRelaySubscriptionBufferSize)
}

for pubSubTopic, cTopics := range pubSubTopicMap {
w.log.Info("subscribing to", zap.String("pubsubTopic", pubSubTopic), zap.Strings("contenTopics", cTopics))
Expand All @@ -365,7 +368,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
}
}

subscription := w.bcaster.Register(cFilter, WithBufferSize(DefaultRelaySubscriptionBufferSize),
subscription := w.bcaster.Register(cFilter, WithBufferSize(int(params.cacheSize)),
WithConsumerOption(params.dontConsume))

// Create Content subscription
Expand Down

0 comments on commit b154233

Please sign in to comment.