Skip to content

Commit

Permalink
feat: update store client Query API for autosharding (#885)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Nov 13, 2023
1 parent 73bcb2e commit a5ce5df
Show file tree
Hide file tree
Showing 19 changed files with 258 additions and 135 deletions.
52 changes: 20 additions & 32 deletions cmd/waku/server/rest/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ import (
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)

type StoreService struct {
Expand Down Expand Up @@ -57,24 +55,20 @@ func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService {
return s
}

func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store.HistoryRequestOption, error) {
func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption, error) {
query := &store.Query{}
var options []store.HistoryRequestOption

var err error
peerAddrStr := r.URL.Query().Get("peerAddr")
m, err := multiaddr.NewMultiaddr(peerAddrStr)
if err != nil {
return nil, nil, nil, err
}

peerID, err := utils.GetPeerID(m)
if err != nil {
return nil, nil, nil, err
var m multiaddr.Multiaddr
if peerAddrStr != "" {
m, err = multiaddr.NewMultiaddr(peerAddrStr)
if err != nil {
return nil, nil, err
}
options = append(options, store.WithPeerAddr(m))
}

options = append(options, store.WithPeer(peerID))

query.Topic = r.URL.Query().Get("pubsubTopic")
query.PubsubTopic = r.URL.Query().Get("pubsubTopic")

contentTopics := r.URL.Query().Get("contentTopics")
if contentTopics != "" {
Expand All @@ -85,7 +79,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if startTimeStr != "" {
startTime, err := strconv.ParseInt(startTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
query.StartTime = &startTime
}
Expand All @@ -94,7 +88,7 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if endTimeStr != "" {
endTime, err := strconv.ParseInt(endTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
query.EndTime = &endTime
}
Expand All @@ -111,25 +105,25 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if senderTimeStr != "" {
cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}

if storeTimeStr != "" {
cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}

if digestStr != "" {
cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}

cursor.PubsubTopic = query.Topic
cursor.PubsubTopic = query.PubsubTopic

options = append(options, store.WithCursor(cursor))
}
Expand All @@ -142,21 +136,21 @@ func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store
if ascendingStr != "" {
ascending, err = strconv.ParseBool(ascendingStr)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}

if pageSizeStr != "" {
pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
}

options = append(options, store.WithPaging(ascending, pageSize))
}

return m, query, options, nil
return query, options, nil
}

func writeStoreError(w http.ResponseWriter, code int, err error) {
Expand Down Expand Up @@ -190,7 +184,7 @@ func toStoreResponse(result *store.Result) StoreResponse {
}

func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
peerAddr, query, options, err := getStoreParams(r)
query, options, err := getStoreParams(r)
if err != nil {
writeStoreError(w, http.StatusBadRequest, err)
return
Expand All @@ -199,12 +193,6 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()

_, err = d.node.AddPeer(peerAddr, peerstore.Static, d.node.Relay().Topics())
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
}

result, err := d.node.Store().Query(ctx, *query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/waku/server/rpc/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs,
res, err := s.node.Store().Query(
req.Context(),
store.Query{
Topic: args.Topic,
PubsubTopic: args.Topic,
ContentTopics: args.ContentFilters,
StartTime: args.StartTime,
EndTime: args.EndTime,
Expand Down
2 changes: 1 addition & 1 deletion library/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func queryResponse(ctx context.Context, args storeMessagesArgs, options []store.
res, err := wakuState.node.Store().Query(
ctx,
store.Query{
Topic: args.Topic,
PubsubTopic: args.Topic,
ContentTopics: args.ContentTopics,
StartTime: args.StartTime,
EndTime: args.EndTime,
Expand Down
8 changes: 6 additions & 2 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,11 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro
// AddPeer is used to add a peer and the protocols it support to the node peerstore
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
return w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...)
pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...)
if err != nil {
return "", err
}
return pData.AddrInfo.ID, nil
}

// AddDiscoveredPeer to add a discovered peer to the node peerStore
Expand All @@ -725,7 +729,7 @@ func (w *WakuNode) AddDiscoveredPeer(ID peer.ID, addrs []ma.Multiaddr, origin wp
ID: ID,
Addrs: addrs,
},
PubSubTopics: pubsubTopics,
PubsubTopics: pubsubTopics,
}
w.peermanager.AddDiscoveredPeer(p, connectNow)
}
Expand Down
18 changes: 10 additions & 8 deletions waku/v2/peermanager/peer_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,20 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16,
return peers, nil
}

func (pm *PeerManager) discoverPeersByPubsubTopic(pubsubTopic string, proto protocol.ID, ctx context.Context, maxCount int) {
shardInfo, err := waku_proto.TopicsToRelayShards(pubsubTopic)
func (pm *PeerManager) discoverPeersByPubsubTopics(pubsubTopics []string, proto protocol.ID, ctx context.Context, maxCount int) {
shardsInfo, err := waku_proto.TopicsToRelayShards(pubsubTopics...)
if err != nil {
pm.logger.Error("failed to convert pubsub topic to shard", zap.String("topic", pubsubTopic), zap.Error(err))
pm.logger.Error("failed to convert pubsub topic to shard", zap.Strings("topics", pubsubTopics), zap.Error(err))
return
}
if len(shardInfo) > 0 {
err = pm.DiscoverAndConnectToPeers(ctx, shardInfo[0].ClusterID, shardInfo[0].ShardIDs[0], proto, maxCount)
if err != nil {
pm.logger.Error("failed to discover and conenct to peers", zap.Error(err))
if len(shardsInfo) > 0 {
for _, shardInfo := range shardsInfo {
err = pm.DiscoverAndConnectToPeers(ctx, shardInfo.ClusterID, shardInfo.ShardIDs[0], proto, maxCount)
if err != nil {
pm.logger.Error("failed to discover and conenct to peers", zap.Error(err))
}
}
} else {
pm.logger.Debug("failed to convert pubsub topic to shard as topic is named pubsubTopic", zap.String("topic", pubsubTopic))
pm.logger.Debug("failed to convert pubsub topics to shards as one of the topics is named pubsubTopic", zap.Strings("topics", pubsubTopics))
}
}
51 changes: 41 additions & 10 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
//Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers(topicStr)
if notConnectedPeers.Len() == 0 {
pm.discoverPeersByPubsubTopic(topicStr, relay.WakuRelayID_v200, pm.ctx, 2)
pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2)
continue
}
//Connect to eligible peers.
Expand Down Expand Up @@ -321,11 +321,11 @@ func (pm *PeerManager) processPeerENR(p *service.PeerData) []protocol.ID {
logging.HostID("peer", p.AddrInfo.ID), zap.String("enr", p.ENR.String()))
} else {
if shards != nil {
p.PubSubTopics = make([]string, 0)
p.PubsubTopics = make([]string, 0)
topics := shards.Topics()
for _, topic := range topics {
topicStr := topic.String()
p.PubSubTopics = append(p.PubSubTopics, topicStr)
p.PubsubTopics = append(p.PubsubTopics, topicStr)
}
} else {
pm.logger.Debug("ENR doesn't have relay shards", logging.HostID("peer", p.AddrInfo.ID))
Expand Down Expand Up @@ -361,12 +361,12 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
return
}
supportedProtos := []protocol.ID{}
if len(p.PubSubTopics) == 0 && p.ENR != nil {
if len(p.PubsubTopics) == 0 && p.ENR != nil {
// Try to fetch shard info and supported protocols from ENR to arrive at pubSub topics.
supportedProtos = pm.processPeerENR(&p)
}

_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubSubTopics, supportedProtos...)
_ = pm.addPeer(p.AddrInfo.ID, p.AddrInfo.Addrs, p.Origin, p.PubsubTopics, supportedProtos...)

if p.ENR != nil {
err := pm.host.Peerstore().(wps.WakuPeerstore).SetENR(p.AddrInfo.ID, p.ENR)
Expand Down Expand Up @@ -419,12 +419,29 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig
return nil
}

func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsubTopics ...string) *service.PeerData {
addrs := host.Peerstore().Addrs(peerID)
if len(addrs) == 0 {
//Addresses expired, remove peer from peerStore
host.Peerstore().RemovePeer(peerID)
return nil
}
return &service.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: peerID,
Addrs: addrs,
},
PubsubTopics: pubsubTopics,
}
}

// AddPeer adds peer to the peerStore and also to service slots
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
//Assuming all addresses have peerId
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {
return "", err
return nil, err
}

//Add Service peers to serviceSlots.
Expand All @@ -433,12 +450,26 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTo
}

//Add to the peer-store
err = pm.addPeer(info.ID, info.Addrs, origin, pubSubTopics, protocols...)
err = pm.addPeer(info.ID, info.Addrs, origin, pubsubTopics, protocols...)
if err != nil {
return "", err
return nil, err
}

return info.ID, nil
pData := &service.PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: info.ID,
Addrs: info.Addrs,
},
PubsubTopics: pubsubTopics,
}

return pData, nil
}

// Connect establishes a connection to a peer.
func (pm *PeerManager) Connect(pData *service.PeerData) {
go pm.peerConnector.PushToChan(*pData)
}

// RemovePeer deletes peer from the peerStore after disconnecting it.
Expand Down
16 changes: 8 additions & 8 deletions waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)
require.Equal(t, peerID, h4.ID())

_, err = pm.SelectPeerByContentTopic(protocol1, "")
_, err = pm.SelectPeerByContentTopics(protocol1, []string{""})
require.Error(t, wakuproto.ErrInvalidFormat, err)

}
Expand All @@ -143,18 +143,18 @@ func TestPeerSelection(t *testing.T) {
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)

peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/2"})
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
require.NoError(t, err)
require.Equal(t, h2.ID(), peerID)

_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/3"})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, ErrNoPeersAvailable, err)

_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err)

//Test for selectWithLowestRTT
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/2/rs/2/1"})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err)
}

Expand Down Expand Up @@ -287,7 +287,7 @@ func TestOnDemandPeerDiscovery(t *testing.T) {
require.NoError(t, err)

//Discovery should fail for non-waku protocol
_, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/test"})
_, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/test"})
require.Error(t, err)

_, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: "/test"})
Expand All @@ -299,15 +299,15 @@ func TestOnDemandPeerDiscovery(t *testing.T) {
var enrField uint8
enrField |= (1 << 1)
pm3.RegisterWakuProtocol("/vac/waku/store/2.0.0-beta4", enrField)
peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx})
peerID, err := pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/store/2.0.0-beta4", Ctx: ctx})
require.NoError(t, err)
require.Equal(t, peerID, host2.ID())

var enrField1 uint8

enrField1 |= (1 << 3)
pm3.RegisterWakuProtocol("/vac/waku/lightpush/2.0.0-beta1", enrField1)
peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopic: topic, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx})
peerID, err = pm3.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, PubsubTopics: []string{topic}, Proto: "/vac/waku/lightpush/2.0.0-beta1", Ctx: ctx})
require.NoError(t, err)
require.Equal(t, peerID, host1.ID())

Expand Down
Loading

0 comments on commit a5ce5df

Please sign in to comment.