Skip to content

Commit

Permalink
Merge pull request #3 from libp2p/feat/get-topics
Browse files Browse the repository at this point in the history
add way to query subscribed topics
  • Loading branch information
whyrusleeping authored Sep 14, 2016
2 parents d733293 + 28f2c2f commit ccede23
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 1 deletion.
22 changes: 21 additions & 1 deletion floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type PubSub struct {
// addSub is a control channel for us to add and remove subscriptions
addSub chan *addSub

//
getTopics chan *topicReq

// a notification channel for incoming streams from other peers
newPeers chan inet.Stream

Expand Down Expand Up @@ -75,6 +78,7 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
newPeers: make(chan inet.Stream),
peerDead: make(chan peer.ID),
addSub: make(chan *addSub),
getTopics: make(chan *topicReq),
myTopics: make(map[string]chan *Message),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
Expand Down Expand Up @@ -112,6 +116,12 @@ func (p *PubSub) processLoop(ctx context.Context) {
for _, t := range p.topics {
delete(t, pid)
}
case treq := <-p.getTopics:
var out []string
for t := range p.myTopics {
out = append(out, t)
}
treq.resp <- out
case sub := <-p.addSub:
p.handleSubscriptionChange(sub)
case rpc := <-p.incoming:
Expand Down Expand Up @@ -270,12 +280,22 @@ type addSub struct {
resp chan chan *Message
}

func (p *PubSub) Subscribe(topic string) (<-chan *Message, error) {
func (p *PubSub) Subscribe(ctx context.Context, topic string) (<-chan *Message, error) {
return p.SubscribeComplicated(&pb.TopicDescriptor{
Name: proto.String(topic),
})
}

type topicReq struct {
resp chan []string
}

func (p *PubSub) GetTopics() []string {
out := make(chan []string, 1)
p.getTopics <- &topicReq{resp: out}
return <-out
}

func (p *PubSub) SubscribeComplicated(td *pb.TopicDescriptor) (<-chan *Message, error) {
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
return nil, fmt.Errorf("Auth method not yet supported")
Expand Down
56 changes: 56 additions & 0 deletions floodsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"math/rand"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -360,3 +361,58 @@ func TestTreeTopology(t *testing.T) {

checkMessageRouting(t, "fizzbuzz", []*PubSub{psubs[9], psubs[3]}, chs)
}

func assertHasTopics(t *testing.T, ps *PubSub, exptopics ...string) {
topics := ps.GetTopics()
sort.Strings(topics)
sort.Strings(exptopics)

if len(topics) != len(exptopics) {
t.Fatalf("expected to have %v, but got %v", exptopics, topics)
}

for i, v := range exptopics {
if topics[i] != v {
t.Fatalf("expected %s but have %s", v, topics[i])
}
}
}

func TestSubReporting(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

host := getNetHosts(t, ctx, 1)[0]
psub := NewFloodSub(ctx, host)

_, err := psub.Subscribe(ctx, "foo")
if err != nil {
t.Fatal(err)
}

_, err = psub.Subscribe(ctx, "bar")
if err != nil {
t.Fatal(err)
}

assertHasTopics(t, psub, "foo", "bar")

_, err = psub.Subscribe(ctx, "baz")
if err != nil {
t.Fatal(err)
}

assertHasTopics(t, psub, "foo", "bar", "baz")

psub.Unsub("bar")
assertHasTopics(t, psub, "foo", "baz")
psub.Unsub("foo")
assertHasTopics(t, psub, "baz")

_, err = psub.Subscribe(ctx, "fish")
if err != nil {
t.Fatal(err)
}

assertHasTopics(t, psub, "baz", "fish")
}

0 comments on commit ccede23

Please sign in to comment.