Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor to use multiple feeds #2

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 82 additions & 23 deletions floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ type PubSub struct {
// addSub is a control channel for us to add and remove subscriptions
addSub chan *addSub

//
addFeedHook chan *addFeedReq

// a notification channel for incoming streams from other peers
newPeers chan inet.Stream

// a notification channel for when our peers die
peerDead chan peer.ID

// The set of topics we are subscribed to
myTopics map[string]chan *Message
myTopics map[string][]*clientFeed

// topics tracks which topics each of our peers are subscribed to
topics map[string]map[peer.ID]struct{}
Expand Down Expand Up @@ -74,8 +77,9 @@ func NewFloodSub(ctx context.Context, h host.Host) *PubSub {
publish: make(chan *Message),
newPeers: make(chan inet.Stream),
peerDead: make(chan peer.ID),
addFeedHook: make(chan *addFeedReq, 32),
addSub: make(chan *addSub),
myTopics: make(map[string]chan *Message),
myTopics: make(map[string][]*clientFeed),
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
seenMessages: timecache.NewTimeCache(time.Second * 30),
Expand Down Expand Up @@ -107,6 +111,21 @@ func (p *PubSub) processLoop(ctx context.Context) {

p.peers[pid] = messages

case req := <-p.addFeedHook:
feeds, ok := p.myTopics[req.topic]

var out chan *Message
if ok {
out = make(chan *Message, 32)
nfeed := &clientFeed{
out: out,
ctx: req.ctx,
}

p.myTopics[req.topic] = append(feeds, nfeed)
}

req.resp <- out
case pid := <-p.peerDead:
delete(p.peers, pid)
for _, t := range p.topics {
Expand Down Expand Up @@ -135,23 +154,21 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) {
Subscribe: &sub.sub,
}

ch, ok := p.myTopics[sub.topic]
feeds, ok := p.myTopics[sub.topic]
if sub.sub {
if ok {
// we don't allow multiple subs per topic at this point
sub.resp <- nil
return
}

resp := make(chan *Message, 16)
p.myTopics[sub.topic] = resp
sub.resp <- resp
p.myTopics[sub.topic] = nil
} else {
if !ok {
return
}

close(ch)
for _, f := range feeds {
close(f.out)
}
delete(p.myTopics, sub.topic)
}

Expand All @@ -163,9 +180,26 @@ func (p *PubSub) handleSubscriptionChange(sub *addSub) {

func (p *PubSub) notifySubs(msg *pb.Message) {
for _, topic := range msg.GetTopicIDs() {
subch, ok := p.myTopics[topic]
if ok {
subch <- &Message{msg}
var cleanup bool
feeds := p.myTopics[topic]
for _, f := range feeds {
select {
case f.out <- &Message{msg}:
case <-f.ctx.Done():
close(f.out)
f.out = nil
cleanup = true
}
}

if cleanup {
out := make([]*clientFeed, 0, len(feeds))
for _, f := range feeds {
if f.out != nil {
out = append(out, f)
}
}
p.myTopics[topic] = out
}
}
}
Expand Down Expand Up @@ -270,34 +304,59 @@ type addSub struct {
resp chan chan *Message
}

func (p *PubSub) Subscribe(topic string) (<-chan *Message, error) {
return p.SubscribeComplicated(&pb.TopicDescriptor{
func (p *PubSub) Subscribe(ctx context.Context, topic string) (<-chan *Message, error) {
err := p.AddTopicSubscription(&pb.TopicDescriptor{
Name: proto.String(topic),
})

if err != nil {
return nil, err
}

return p.GetFeed(ctx, topic)
}

func (p *PubSub) SubscribeComplicated(td *pb.TopicDescriptor) (<-chan *Message, error) {
func (p *PubSub) AddTopicSubscription(td *pb.TopicDescriptor) error {
if td.GetAuth().GetMode() != pb.TopicDescriptor_AuthOpts_NONE {
return nil, fmt.Errorf("Auth method not yet supported")
return fmt.Errorf("Auth method not yet supported")
}

if td.GetEnc().GetMode() != pb.TopicDescriptor_EncOpts_NONE {
return nil, fmt.Errorf("Encryption method not yet supported")
return fmt.Errorf("Encryption method not yet supported")
}

resp := make(chan chan *Message)
p.addSub <- &addSub{
topic: td.GetName(),
resp: resp,
sub: true,
}

outch := <-resp
if outch == nil {
return nil, fmt.Errorf("error, duplicate subscription")
return nil
}

type addFeedReq struct {
ctx context.Context
topic string
resp chan chan *Message
}

type clientFeed struct {
out chan *Message
ctx context.Context
}

func (p *PubSub) GetFeed(ctx context.Context, topic string) (<-chan *Message, error) {
out := make(chan chan *Message, 1)
p.addFeedHook <- &addFeedReq{
ctx: ctx,
topic: topic,
resp: out,
}

return outch, nil
resp := <-out
if resp == nil {
return nil, fmt.Errorf("not subscribed to topic %s", topic)
}
return resp, nil
}

func (p *PubSub) Unsub(topic string) {
Expand Down