Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement peer blacklist #149

Merged
merged 7 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions blacklist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package pubsub

import (
"context"
"testing"
"time"
)

func TestBlacklist(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])

sub, err := psubs[1].Subscribe("test")
if err != nil {
t.Fatal(err)
}

time.Sleep(time.Millisecond * 100)
psubs[1].BlacklistPeer(hosts[0].ID())
time.Sleep(time.Millisecond * 100)

psubs[0].Publish("test", []byte("message"))

wctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
_, err = sub.Next(wctx)

if err == nil {
t.Fatal("got message from blacklisted peer")
}
}

func TestBlacklist2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])

_, err := psubs[0].Subscribe("test")
if err != nil {
t.Fatal(err)
}

sub1, err := psubs[1].Subscribe("test")
if err != nil {
t.Fatal(err)
}

time.Sleep(time.Millisecond * 100)
psubs[1].BlacklistPeer(hosts[0].ID())
time.Sleep(time.Millisecond * 100)

psubs[0].Publish("test", []byte("message"))

wctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
_, err = sub1.Next(wctx)

if err == nil {
t.Fatal("got message from blacklisted peer")
}
}

func TestBlacklist3(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)

psubs[1].BlacklistPeer(hosts[0].ID())
time.Sleep(time.Millisecond * 100)
connect(t, hosts[0], hosts[1])

sub, err := psubs[1].Subscribe("test")
if err != nil {
t.Fatal(err)
}

time.Sleep(time.Millisecond * 100)

psubs[0].Publish("test", []byte("message"))

wctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
_, err = sub.Next(wctx)

if err == nil {
t.Fatal("got message from blacklisted peer")
}
}
53 changes: 52 additions & 1 deletion pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ type PubSub struct {
// eval thunk in event loop
eval chan func()

// peer blacklist
blacklist map[peer.ID]struct{}
blacklistPeer chan peer.ID

peers map[peer.ID]chan *RPC
seenMessages *timecache.TimeCache

Expand Down Expand Up @@ -179,6 +183,8 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
topics: make(map[string]map[peer.ID]struct{}),
peers: make(map[peer.ID]chan *RPC),
topicVals: make(map[string]*topicVal),
blacklist: make(map[peer.ID]struct{}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to use an LRU. This could turn into a DoS vector.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. As we discussed with why, let's make it an interface the user can pass with a default implementation using a map.

blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
counter: uint64(time.Now().UnixNano()),
}
Expand Down Expand Up @@ -282,6 +288,12 @@ func (p *PubSub) processLoop(ctx context.Context) {
continue
}

_, ok = p.blacklist[pid]
if ok {
log.Warning("ignoring connection from blacklisted peer: ", pid)
continue
}

messages := make(chan *RPC, 32)
messages <- p.getHelloPacket()
go p.handleNewPeer(ctx, pid, messages)
Expand All @@ -290,13 +302,21 @@ func (p *PubSub) processLoop(ctx context.Context) {
case s := <-p.newPeerStream:
pid := s.Conn().RemotePeer()

_, ok := p.peers[pid]
ch, ok := p.peers[pid]
if !ok {
log.Warning("new stream for unknown peer: ", pid)
s.Reset()
continue
}

_, ok = p.blacklist[pid]
if ok {
log.Warning("closing stream for blacklisted peer: ", pid)
close(ch)
s.Reset()
continue
}

p.rt.AddPeer(pid, s.Protocol())

case pid := <-p.newPeerError:
Expand Down Expand Up @@ -374,6 +394,20 @@ func (p *PubSub) processLoop(ctx context.Context) {
case thunk := <-p.eval:
thunk()

case pid := <-p.blacklistPeer:
log.Infof("Blacklisting peer %s", pid)
p.blacklist[pid] = struct{}{}

ch, ok := p.peers[pid]
if ok {
close(ch)
delete(p.peers, pid)
for _, t := range p.topics {
delete(t, pid)
}
p.rt.RemovePeer(pid)
}

case <-ctx.Done():
log.Info("pubsub processloop shutting down")
return
Expand Down Expand Up @@ -567,6 +601,18 @@ func msgID(pmsg *pb.Message) string {

// pushMsg pushes a message performing validation as necessary
func (p *PubSub) pushMsg(vals []*topicVal, src peer.ID, msg *Message) {
// reject messages from blacklisted peers
if _, ok := p.blacklist[src]; ok {
log.Warningf("dropping message from blacklisted peer %s", src)
return
}

// even if they are forwarded by good peers
if _, ok := p.blacklist[msg.GetFrom()]; ok {
log.Warningf("dropping message from blacklisted source %s", src)
return
}

// reject unsigned messages when strict before we even process the id
if p.signStrict && msg.Signature == nil {
log.Debugf("dropping unsigned message from %s", src)
Expand Down Expand Up @@ -821,6 +867,11 @@ func (p *PubSub) ListPeers(topic string) []peer.ID {
return <-out
}

// BlacklistPeer blacklists a peer; all messages from this peer will be unconditionally dropped.
func (p *PubSub) BlacklistPeer(pid peer.ID) {
p.blacklistPeer <- pid
}

// per topic validators
type addValReq struct {
topic string
Expand Down