Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update relay REST and RPC API's and fix unit tests #866

Merged
merged 11 commits into from
Nov 7, 2023
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ lint-full:
@golangci-lint run ./... --config=./.golangci.full.yaml --deadline=5m

test-with-race:
${GOBIN} test -race -timeout 300s ./waku/...
${GOBIN} test -race -timeout 300s ./waku/... ./cmd/waku/server/...

test:
${GOBIN} test -timeout 300s ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./...
${GOBIN} test -timeout 300s ./waku/... ./cmd/waku/server/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./...
cat ${GO_TEST_OUTFILE}.tmp | grep -v ".pb.go" > ${GO_TEST_OUTFILE}
${GOBIN} tool cover -html=${GO_TEST_OUTFILE} -o ${GO_HTML_COV}

Expand Down
9 changes: 8 additions & 1 deletion cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,14 @@ func Execute(options NodeOptions) error {
var restServer *rest.WakuRest
if options.RESTServer.Enable {
wg.Add(1)
restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.Admin, options.RESTServer.RelayCacheCapacity, options.RESTServer.FilterCacheCapacity, logger)
restConfig := rest.RestConfig{Address: options.RESTServer.Address,
Port: uint(options.RESTServer.Port),
EnablePProf: options.PProf,
EnableAdmin: options.RESTServer.Admin,
RelayCacheCapacity: uint(options.RESTServer.RelayCacheCapacity),
FilterCacheCapacity: uint(options.RESTServer.FilterCacheCapacity)}

restServer = rest.NewWakuRest(wakuNode, restConfig, logger)
restServer.Start(ctx, &wg)
}

Expand Down
10 changes: 6 additions & 4 deletions cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,13 @@ func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResul
var peerIds string
ind := 0
for _, entry := range result.Errors() {
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
if entry.Err != nil {
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
}
peerIds += entry.PeerID.String()
}
peerIds += entry.PeerID.String()
ind++
}
if peerIds != "" {
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rest/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func TestFilterGetMessages(t *testing.T) {
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)
require.Equal(t,
fmt.Sprintf("Not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic),
fmt.Sprintf("not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic),
rr.Body.String(),
)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rest/lightpush_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req
_, err = w.Write([]byte(err.Error()))
serv.log.Error("writing response", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
writeErrOrResponse(w, err, true)
}
}
130 changes: 30 additions & 100 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package rest

import (
"context"
"encoding/json"
"errors"
"net/http"
"net/url"
"strings"
"sync"

"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/cmd/waku/server"
Expand All @@ -26,29 +22,21 @@ const routeRelayV1AutoMessages = "/relay/v1/auto/messages"

// RelayService represents the REST service for WakuRelay
type RelayService struct {
node *node.WakuNode
cancel context.CancelFunc
node *node.WakuNode

log *zap.Logger

messages map[string][]*pb.WakuMessage
cacheCapacity int
messagesMutex sync.RWMutex

runner *runnerService
cacheCapacity uint
}

// NewRelayService returns an instance of RelayService
func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService {
func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity uint, log *zap.Logger) *RelayService {
s := &RelayService{
node: node,
log: log.Named("relay"),
cacheCapacity: cacheCapacity,
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
messages: make(map[string][]*pb.WakuMessage),
}

s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)

m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions)
m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions)
m.Get(routeRelayV1Messages, s.getV1Messages)
Expand All @@ -65,46 +53,6 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za
return s
}

func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

if _, ok := r.messages[envelope.PubsubTopic()]; !ok {
return
}

// Keep a specific max number of messages per topic
if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity {
r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:]
}

r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
}

// Start starts the RelayService
func (r *RelayService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel

r.messagesMutex.Lock()
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for _, topic := range r.node.Relay().Topics() {
r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic))
r.messages[topic] = []*pb.WakuMessage{}
}
r.messagesMutex.Unlock()

r.runner.Start(ctx)
}

// Stop stops the RelayService
func (r *RelayService) Stop() {
if r.cancel == nil {
return
}
r.cancel()
}

func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Request) {
var topics []string
decoder := json.NewDecoder(req.Body)
Expand All @@ -114,16 +62,11 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re
}
defer req.Body.Close()

r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

var err error
for _, topic := range topics {
err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
} else {
delete(r.messages, topic)
}
}

Expand All @@ -140,26 +83,29 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ
defer req.Body.Close()

var err error
var sub *relay.Subscription
var subs []*relay.Subscription
var successCnt int
var topicToSubscribe string
for _, topic := range topics {
if topic == "" {
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic))
topicToSubscribe = relay.DefaultWakuTopic
} else {
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic))
topicToSubscribe = topic
}
_, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity))

if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
} else {
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = []*pb.WakuMessage{}
r.messagesMutex.Unlock()
continue
}
successCnt++
}

// on partial subscribe failure
if successCnt > 0 && err != nil {
r.log.Error("partial subscribe failed", zap.Error(err))
// on partial failure
writeResponse(w, err, http.StatusOK)
return
}

writeErrOrResponse(w, err, true)
Expand All @@ -170,20 +116,22 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
if topic == "" {
return
}

r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

if _, ok := r.messages[topic]; !ok {
//TODO: Update the API to also take a contentTopic since relay now supports filtering based on contentTopic as well.
sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(topic, "")
if err != nil {
w.WriteHeader(http.StatusNotFound)
_, err := w.Write([]byte("not subscribed to topic"))
_, err = w.Write([]byte("not subscribed to topic"))
r.log.Error("writing response", zap.Error(err))
return
}
var response []*pb.WakuMessage
select {
case msg := <-sub.Ch:
response = append(response, msg.Message())
default:
break
}

response := r.messages[topic]

r.messages[topic] = []*pb.WakuMessage{}
writeErrOrResponse(w, nil, response)
}

Expand All @@ -205,11 +153,6 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
topic = relay.DefaultWakuTopic
}

if !r.node.Relay().IsSubscribed(topic) {
writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil)
return
}

if err := server.AppendRLNProof(r.node, message); err != nil {
writeErrOrResponse(w, err, nil)
return
Expand Down Expand Up @@ -250,7 +193,7 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.
defer req.Body.Close()

var err error
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...))
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...), relay.WithCacheSize(r.cacheCapacity))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err))
}
Expand All @@ -260,27 +203,14 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.
_, err := w.Write([]byte(err.Error()))
r.log.Error("writing response", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
writeErrOrResponse(w, err, true)
}

}

func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) {
cTopic := chi.URLParam(req, "contentTopic")
if cTopic == "" {
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte("contentTopic is required"))
r.log.Error("writing response", zap.Error(err))
return
}
cTopic, err := url.QueryUnescape(cTopic)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, err = w.Write([]byte("invalid contentTopic format"))
r.log.Error("writing response", zap.Error(err))
return
}

cTopic := topicFromPath(w, req, "contentTopic", r.log)
sub, err := r.node.Relay().GetSubscription(cTopic)
if err != nil {
w.WriteHeader(http.StatusNotFound)
Expand Down
Loading