Skip to content

Commit

Permalink
update relay REST API's to remove duplicate message cache, fix relay …
Browse files Browse the repository at this point in the history
…tests and admin test
  • Loading branch information
chaitanyaprem committed Nov 6, 2023
1 parent 25eb4d6 commit 1d6ac0f
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 116 deletions.
106 changes: 28 additions & 78 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@ package rest
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/url"
"strings"
"sync"

"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/cmd/waku/server"
Expand All @@ -31,11 +28,8 @@ type RelayService struct {

log *zap.Logger

messages map[string][]*pb.WakuMessage
cacheCapacity int
messagesMutex sync.RWMutex

runner *runnerService
ctx context.Context
}

// NewRelayService returns an instance of RelayService
Expand All @@ -44,10 +38,9 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za
node: node,
log: log.Named("relay"),
cacheCapacity: cacheCapacity,
messages: make(map[string][]*pb.WakuMessage),
}

s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)
//s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)

m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions)
m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions)
Expand All @@ -65,36 +58,11 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za
return s
}

func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

if _, ok := r.messages[envelope.PubsubTopic()]; !ok {
return
}

// Keep a specific max number of messages per topic
if len(r.messages[envelope.PubsubTopic()]) >= r.cacheCapacity {
r.messages[envelope.PubsubTopic()] = r.messages[envelope.PubsubTopic()][1:]
}

r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
}

// Start starts the RelayService
func (r *RelayService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel

r.messagesMutex.Lock()
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
for _, topic := range r.node.Relay().Topics() {
r.log.Info("adding topic handler for existing subscription", zap.String("topic", topic))
r.messages[topic] = []*pb.WakuMessage{}
}
r.messagesMutex.Unlock()

r.runner.Start(ctx)
r.ctx = ctx
}

// Stop stops the RelayService
Expand All @@ -114,16 +82,11 @@ func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Re
}
defer req.Body.Close()

r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

var err error
for _, topic := range topics {
err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
} else {
delete(r.messages, topic)
}
}

Expand All @@ -140,26 +103,29 @@ func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Requ
defer req.Body.Close()

var err error
var sub *relay.Subscription
var subs []*relay.Subscription
var successCnt int
var topicToSubscribe string
for _, topic := range topics {
if topic == "" {
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(relay.DefaultWakuTopic))
topicToSubscribe = relay.DefaultWakuTopic
} else {
subs, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topic))
topicToSubscribe = topic
}
_, err = r.node.Relay().Subscribe(req.Context(), protocol.NewContentFilter(topicToSubscribe))

if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
} else {
sub = subs[0]
sub.Unsubscribe()
r.messagesMutex.Lock()
r.messages[topic] = []*pb.WakuMessage{}
r.messagesMutex.Unlock()
continue
}
successCnt++
}

// on partial subscribe failure
if successCnt > 0 && err != nil {
r.log.Error("partial subscribe failed", zap.Error(err))
// on partial failure
writeResponse(w, err, http.StatusOK)
return
}

writeErrOrResponse(w, err, true)
Expand All @@ -170,20 +136,22 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
if topic == "" {
return
}

r.messagesMutex.Lock()
defer r.messagesMutex.Unlock()

if _, ok := r.messages[topic]; !ok {
//TODO: Update the API to also take a contentTopic since relay now supports filtering based on contentTopic as well.
sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(topic, "")
if err != nil {
w.WriteHeader(http.StatusNotFound)
_, err := w.Write([]byte("not subscribed to topic"))
_, err = w.Write([]byte("not subscribed to topic"))
r.log.Error("writing response", zap.Error(err))
return
}
var response []*pb.WakuMessage
select {
case msg := <-sub.Ch:
response = append(response, msg.Message())
default:
break
}

response := r.messages[topic]

r.messages[topic] = []*pb.WakuMessage{}
writeErrOrResponse(w, nil, response)
}

Expand All @@ -205,11 +173,6 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
topic = relay.DefaultWakuTopic
}

if !r.node.Relay().IsSubscribed(topic) {
writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil)
return
}

if err := server.AppendRLNProof(r.node, message); err != nil {
writeErrOrResponse(w, err, nil)
return
Expand Down Expand Up @@ -266,21 +229,8 @@ func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.
}

func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) {
cTopic := chi.URLParam(req, "contentTopic")
if cTopic == "" {
w.WriteHeader(http.StatusBadRequest)
_, err := w.Write([]byte("contentTopic is required"))
r.log.Error("writing response", zap.Error(err))
return
}
cTopic, err := url.QueryUnescape(cTopic)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, err = w.Write([]byte("invalid contentTopic format"))
r.log.Error("writing response", zap.Error(err))
return
}

cTopic := topicFromPath(w, req, "contentTopic", r.log)
sub, err := r.node.Relay().GetSubscription(cTopic)
if err != nil {
w.WriteHeader(http.StatusNotFound)
Expand Down
45 changes: 20 additions & 25 deletions cmd/waku/server/rest/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)

Expand All @@ -34,7 +34,6 @@ func TestPostV1Message(t *testing.T) {
router := chi.NewRouter()

_ = makeRelayService(t, router)

msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Expand All @@ -54,10 +53,10 @@ func TestPostV1Message(t *testing.T) {
func TestRelaySubscription(t *testing.T) {
router := chi.NewRouter()

d := makeRelayService(t, router)
r := makeRelayService(t, router)

go d.Start(context.Background())
defer d.Stop()
go r.Start(context.Background())
defer r.Stop()

// Wait for node to start
time.Sleep(500 * time.Millisecond)
Expand All @@ -74,39 +73,37 @@ func TestRelaySubscription(t *testing.T) {

// Test max messages in subscription
now := utils.GetUnixEpoch()
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+1), now, "test"))
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+2), now, "test"))
d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+3), now, "test"))

// Wait for the messages to be processed
time.Sleep(500 * time.Millisecond)

require.Len(t, d.messages["test"], 3)

d.runner.broadcaster.Submit(protocol.NewEnvelope(tests.CreateWakuMessage("test", now+4), now+4, "test"))
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+1), relay.WithPubSubTopic("test"))
require.NoError(t, err)
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+2), relay.WithPubSubTopic("test"))
require.NoError(t, err)

time.Sleep(500 * time.Millisecond)
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", now+3), relay.WithPubSubTopic("test"))
require.NoError(t, err)

// Should only have 3 messages
require.Len(t, d.messages["test"], 3)
// Wait for the messages to be processed
time.Sleep(5 * time.Millisecond)

// Test deletion
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodDelete, "/relay/v1/subscriptions", bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
require.Len(t, d.messages["test"], 0)

}

func TestRelayGetV1Messages(t *testing.T) {
router := chi.NewRouter()
router1 := chi.NewRouter()

serviceA := makeRelayService(t, router)
go serviceA.Start(context.Background())
defer serviceA.Stop()
serviceB := makeRelayService(t, router)

serviceB := makeRelayService(t, router1)
go serviceB.Start(context.Background())
defer serviceB.Stop()

Expand Down Expand Up @@ -165,9 +162,7 @@ func TestRelayGetV1Messages(t *testing.T) {

rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, "/relay/v1/messages/test", bytes.NewReader([]byte{}))
router.ServeHTTP(rr, req)
router1.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)

err = json.Unmarshal(rr.Body.Bytes(), &messages)
require.NoError(t, err)
require.Len(t, messages, 0)
}
3 changes: 0 additions & 3 deletions cmd/waku/server/rest/waku_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enablePProf bool
func (r *WakuRest) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()

if r.node.Relay() != nil {
go r.relayService.Start(ctx)
}
if r.node.FilterLightnode() != nil {
go r.filterService.Start(ctx)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/waku/server/rpc/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func TestV1Peers(t *testing.T) {

host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := relay.NewWakuRelay(nil, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
bcast := relay.NewBroadcaster(10)
relay := relay.NewWakuRelay(bcast, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
Expand Down
5 changes: 0 additions & 5 deletions cmd/waku/server/rpc/relay.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rpc

import (
"errors"
"fmt"
"net/http"
"sync"
Expand Down Expand Up @@ -105,10 +104,6 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
topic = args.Topic
}

if !r.node.Relay().IsSubscribed(topic) {
return errors.New("not subscribed to pubsubTopic")
}

msg := args.Message.toProto()

if err = server.AppendRLNProof(r.node, msg); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/waku/server/rpc/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func TestRelayGetV1Messages(t *testing.T) {
&RelayMessageArgs{
Topic: "test",
Message: ProtoToRPC(&pb.WakuMessage{
Payload: []byte("test"),
Payload: []byte("test"),
ContentTopic: "testContentTopic",
}),
},
&reply,
Expand Down
24 changes: 21 additions & 3 deletions waku/v2/protocol/relay/waku_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,31 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .
return hash, nil
}

// GetSubscriptionWithPubsubTopic fetches subscription matching pubsub and contentTopic
func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTopic string) (*Subscription, error) {
var contentFilter waku_proto.ContentFilter
if contentTopic != "" {
contentFilter = waku_proto.NewContentFilter(pubsubTopic, contentTopic)
} else {
contentFilter = waku_proto.NewContentFilter(pubsubTopic)
}
cSubs := w.contentSubs[pubsubTopic]
for _, sub := range cSubs {
if sub.contentFilter.Equals(contentFilter) {
return sub, nil
}
}
return nil, errors.New("no subscription found for content topic")
}

// GetSubscription fetches subscription matching a contentTopic(via autosharding)
func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) {
pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return nil, err
}
contentFilter := waku_proto.NewContentFilter(pubSubTopic, contentTopic)
cSubs := w.contentSubs[pubSubTopic]
contentFilter := waku_proto.NewContentFilter(pubsubTopic, contentTopic)
cSubs := w.contentSubs[pubsubTopic]
for _, sub := range cSubs {
if sub.contentFilter.Equals(contentFilter) {
return sub, nil
Expand Down

0 comments on commit 1d6ac0f

Please sign in to comment.