Skip to content

Commit

Permalink
chore: update relay REST and RPC API's and fix unit tests (#866)
Browse files Browse the repository at this point in the history
* update relay REST API's to remove duplicate message cache, fix relay tests and admin test

* chore: enable REST and RPC unit tests

* update lightpush rest api to match yaml

* fix: filter rest unit test failures

* skipping legacy filter tests

* chore: add unit tests for autosharding relay REST API, fix success response (#868)
  • Loading branch information
chaitanyaprem authored Nov 7, 2023
1 parent 9315de8 commit 2616d43
Show file tree
Hide file tree
Showing 15 changed files with 270 additions and 217 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ lint-full:
@golangci-lint run ./... --config=./.golangci.full.yaml --deadline=5m

test-with-race:
${GOBIN} test -race -timeout 300s ./waku/...
${GOBIN} test -race -timeout 300s ./waku/... ./cmd/waku/server/...

test:
${GOBIN} test -timeout 300s ./waku/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./...
${GOBIN} test -timeout 300s ./waku/... ./cmd/waku/server/... -coverprofile=${GO_TEST_OUTFILE}.tmp -coverpkg ./...
cat ${GO_TEST_OUTFILE}.tmp | grep -v ".pb.go" > ${GO_TEST_OUTFILE}
${GOBIN} tool cover -html=${GO_TEST_OUTFILE} -o ${GO_HTML_COV}

Expand Down
9 changes: 8 additions & 1 deletion cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,14 @@ func Execute(options NodeOptions) error {
var restServer *rest.WakuRest
if options.RESTServer.Enable {
wg.Add(1)
restServer = rest.NewWakuRest(wakuNode, options.RESTServer.Address, options.RESTServer.Port, options.PProf, options.RESTServer.Admin, options.RESTServer.RelayCacheCapacity, options.RESTServer.FilterCacheCapacity, logger)
restConfig := rest.RestConfig{Address: options.RESTServer.Address,
Port: uint(options.RESTServer.Port),
EnablePProf: options.PProf,
EnableAdmin: options.RESTServer.Admin,
RelayCacheCapacity: uint(options.RESTServer.RelayCacheCapacity),
FilterCacheCapacity: uint(options.RESTServer.FilterCacheCapacity)}

restServer = rest.NewWakuRest(wakuNode, restConfig, logger)
restServer.Start(ctx, &wg)
}

Expand Down
10 changes: 6 additions & 4 deletions cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,13 @@ func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResul
var peerIds string
ind := 0
for _, entry := range result.Errors() {
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
if entry.Err != nil {
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
}
peerIds += entry.PeerID.String()
}
peerIds += entry.PeerID.String()
ind++
}
if peerIds != "" {
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rest/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func TestFilterGetMessages(t *testing.T) {
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)
require.Equal(t,
fmt.Sprintf("Not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic),
fmt.Sprintf("not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic),
rr.Body.String(),
)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rest/lightpush_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Req
_, err = w.Write([]byte(err.Error()))
serv.log.Error("writing response", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
writeErrOrResponse(w, err, true)
}
}
130 changes: 30 additions & 100 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package rest

import (
"context"
"encoding/json"
"errors"
"net/http"
"net/url"
"strings"
"sync"

"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/cmd/waku/server"
Expand All @@ -26,29 +22,21 @@ const routeRelayV1AutoMessages = "/relay/v1/auto/messages"

// RelayService represents the REST service for WakuRelay
type RelayService struct {
node *node.WakuNode
cancel context.CancelFunc
node *node.WakuNode

log *zap.Logger

messages map[string][]*pb.WakuMessage
cacheCapacity int
messagesMutex sync.RWMutex

runner *runnerService
cacheCapacity uint
}

// NewRelayService returns an instance of RelayService
func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService {
func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity uint, log *zap.Logger) *RelayService {
s := &RelayService{
node: node,
log: log.Named("relay"),
cacheCapacity: cacheCapacity,
messages: make(map[string][]*pb.WakuMessage),
}

s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)

m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions)
m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions)
m.Get(routeRelayV1Messages, s.getV1Messages)
Expand All @@ -65,46 +53,6 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za
return s
}

func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

if _, ok := r.messages[envelope.PubsubTopic()]; !ok {
return
}

// Keep a specific max number of messages per topic
if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity {
r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:]
}

r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
}

// Start starts the RelayService
func (r *RelayService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel

r.messagesMutex.Lock()
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for _, topic := range r.node.Relay().Topics() {
r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic))
r.messages[topic] = []*pb.WakuMessage{}
}
r.messagesMutex.Unlock()

r.runner.Start(ctx)
}

// Stop stops the RelayService
func (r *RelayService) Stop() {
if r.cancel == nil {
return
}
r.cancel()
}

func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Request) {
var topics []string
decoder := json.NewDecoder(req.Body)
Expand All @@ -114,16 +62,11 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re
}
defer req.Body.Close()

r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

var err error
for _, topic := range topics {
err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
} else {
delete(r.messages, topic)
}
}

Expand All @@ -140,26 +83,29 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ
defer req.Body.Close()

var err error
var sub *relay.Subscription
var subs []*relay.Subscription
var successCnt int
var topicToSubscribe string
for _, topic := range topics {
if topic == "" {
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic))
topicToSubscribe = relay.DefaultWakuTopic
} else {
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic))
topicToSubscribe = topic
}
_, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity))

if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
} else {
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = []*pb.WakuMessage{}
r.messagesMutex.Unlock()
continue
}
successCnt++
}

// on partial subscribe failure
if successCnt > 0 && err != nil {
r.log.Error("partial subscribe failed", zap.Error(err))
// on partial failure
writeResponse(w, err, http.StatusOK)
return
}

writeErrOrResponse(w, err, true)
Expand All @@ -170,20 +116,22 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
if topic == "" {
return
}

r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

if _, ok := r.messages[topic]; !ok {
//TODO: Update the API to also take a contentTopic since relay now supports filtering based on contentTopic as well.
sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(topic, "")
if err != nil {
w.WriteHeader(http.StatusNotFound)
_, err := w.Write([]byte("not subscribed to topic"))
_, err = w.Write([]byte("not subscribed to topic"))
r.log.Error("writing response", zap.Error(err))
return
}
var response []*pb.WakuMessage
select {
case msg := <-sub.Ch:
response = append(response, msg.Message())
default:
break
}

response := r.messages[topic]

r.messages[topic] = []*pb.WakuMessage{}
writeErrOrResponse(w, nil, response)
}

Expand All @@ -205,11 +153,6 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
topic = relay.DefaultWakuTopic
}

if !r.node.Relay().IsSubscribed(topic) {
writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil)
return
}

if err := server.AppendRLNProof(r.node, message); err != nil {
writeErrOrResponse(w, err, nil)
return
Expand Down Expand Up @@ -250,7 +193,7 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.
defer req.Body.Close()

var err error
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...))
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...), relay.WithCacheSize(r.cacheCapacity))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err))
}
Expand All @@ -260,27 +203,14 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.
_, err := w.Write([]byte(err.Error()))
r.log.Error("writing response", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
writeErrOrResponse(w, err, true)
}

}

func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) {
cTopic := chi.URLParam(req, "contentTopic")
if cTopic == "" {
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte("contentTopic is required"))
r.log.Error("writing response", zap.Error(err))
return
}
cTopic, err := url.QueryUnescape(cTopic)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, err = w.Write([]byte("invalid contentTopic format"))
r.log.Error("writing response", zap.Error(err))
return
}

cTopic := topicFromPath(w, req, "contentTopic", r.log)
sub, err := r.node.Relay().GetSubscription(cTopic)
if err != nil {
w.WriteHeader(http.StatusNotFound)
Expand Down
Loading

0 comments on commit 2616d43

Please sign in to comment.