Skip to content

Commit

Permalink
Merge branch 'master' into fix/postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored Nov 14, 2023
2 parents d5fdafe + 8122cf4 commit f017f06
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ var (
})
RESTRelayCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{
Name: "rest-relay-cache-capacity",
Value: 30,
Value: 1000,
Usage: "Capacity of the Relay REST API message cache",
Destination: &options.RESTServer.RelayCacheCapacity,
EnvVars: []string{"WAKUNODE2_REST_RELAY_CACHE_CAPACITY"},
Expand Down
69 changes: 42 additions & 27 deletions cmd/waku/server/rest/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,27 +124,34 @@ func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
return
}
var response []*RestWakuMessage
select {
case envelope, open := <-sub.Ch:
if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic))
w.WriteHeader(http.StatusNotFound)
_, err = w.Write([]byte("consume channel is closed for subscription"))
if err != nil {
r.log.Error("writing response", zap.Error(err))
}
return
done := false
for {
if done || len(response) > int(r.cacheCapacity) {
break
}
select {
case envelope, open := <-sub.Ch:
if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic))
w.WriteHeader(http.StatusNotFound)
_, err = w.Write([]byte("consume channel is closed for subscription"))
if err != nil {
r.log.Error("writing response", zap.Error(err))
}
return
}

message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else {
response = append(response, message)
message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else {
response = append(response, message)
}
case <-req.Context().Done():
done = true
default:
done = true
}

default:
break
}

writeErrOrResponse(w, nil, response)
Expand Down Expand Up @@ -240,16 +247,24 @@ func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Reques
return
}
var response []*RestWakuMessage
select {
case envelope := <-sub.Ch:
message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else {
response = append(response, message)
done := false
for {
if done || len(response) > int(r.cacheCapacity) {
break
}
select {
case envelope := <-sub.Ch:
message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else {
response = append(response, message)
}
case <-req.Context().Done():
done = true
default:
done = true
}
default:
break
}

writeErrOrResponse(w, nil, response)
Expand Down

0 comments on commit f017f06

Please sign in to comment.