Skip to content
This repository has been archived by the owner on Apr 21, 2022. It is now read-only.

Commit

Permalink
Merge pull request #36 from libp2p/feat/protect
Browse files Browse the repository at this point in the history
Add peer protection capability (implementation)
  • Loading branch information
raulk authored Mar 29, 2019
2 parents bf03f82 + d82487f commit a3dceaf
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 17 deletions.
73 changes: 57 additions & 16 deletions connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

const silencePeriod = 10 * time.Second
var SilencePeriod = 10 * time.Second

var log = logging.Logger("connmgr")

Expand All @@ -32,9 +32,13 @@ type BasicConnMgr struct {
gracePeriod time.Duration
peers map[peer.ID]*peerInfo

plk sync.RWMutex
protected map[peer.ID]map[string]struct{}

// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
lastTrim time.Time
silencePeriod time.Duration
}

var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil)
Expand All @@ -52,11 +56,41 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
gracePeriod: grace,
peers: make(map[peer.ID]*peerInfo),
trimRunningCh: make(chan struct{}, 1),
protected: make(map[peer.ID]map[string]struct{}, 16),
silencePeriod: SilencePeriod,
}
}

func (cm *BasicConnMgr) Protect(id peer.ID, tag string) {
cm.plk.Lock()
defer cm.plk.Unlock()

tags, ok := cm.protected[id]
if !ok {
tags = make(map[string]struct{}, 2)
cm.protected[id] = tags
}
tags[tag] = struct{}{}
}

func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {
cm.plk.Lock()
defer cm.plk.Unlock()

tags, ok := cm.protected[id]
if !ok {
return false
}
if delete(tags, tag); len(tags) == 0 {
delete(cm.protected, id)
return false
}
return true
}

// peerInfo stores metadata for a given peer.
type peerInfo struct {
id peer.ID
tags map[string]int // value for each tag
value int // cached sum of all tag values

Expand All @@ -79,7 +113,7 @@ func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
return
}
defer func() { <-cm.trimRunningCh }()
if time.Since(cm.lastTrim) < silencePeriod {
if time.Since(cm.lastTrim) < cm.silencePeriod {
// skip this attempt to trim as the last one just took place.
return
}
Expand Down Expand Up @@ -108,39 +142,46 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {
return nil
}

var infos []*peerInfo

for _, inf := range cm.peers {
infos = append(infos, inf)
var candidates []*peerInfo
cm.plk.RLock()
for id, inf := range cm.peers {
if _, ok := cm.protected[id]; ok {
// skip over protected peer.
continue
}
candidates = append(candidates, inf)
}
cm.plk.RUnlock()

// Sort peers according to their value.
sort.Slice(infos, func(i, j int) bool {
return infos[i].value < infos[j].value
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].value < candidates[j].value
})

closeCount := len(infos) - cm.lowWater
toclose := infos[:closeCount]
target := len(cm.peers) - cm.lowWater

// 2x number of peers we're disconnecting from because we may have more
// than one connection per peer. Slightly over allocating isn't an issue
// as this is a very short-lived array.
closed := make([]inet.Conn, 0, len(toclose)*2)
selected := make([]inet.Conn, 0, target*2)

for _, inf := range toclose {
for _, inf := range candidates {
// TODO: should we be using firstSeen or the time associated with the connection itself?
if inf.firstSeen.Add(cm.gracePeriod).After(now) {
continue
}

// TODO: if a peer has more than one connection, maybe only close one?
for c := range inf.conns {
// TODO: probably don't want to always do this in a goroutine
closed = append(closed, c)
selected = append(selected, c)
}

target--
if target == 0 {
break
}
}

return closed
return selected
}

// GetTagInfo is called to fetch the tag information associated with a given
Expand Down
190 changes: 190 additions & 0 deletions connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
// wait for a few seconds
time.Sleep(time.Second * 3)

cm.lk.Lock() // pacify the race detector
defer cm.lk.Unlock()

// only the first trim is allowed in; make sure we close at most 20 connections, not all of them.
var closed int
for _, c := range conns {
Expand All @@ -331,3 +334,190 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
t.Fatalf("expected closed connections + open conn count to equal 30, value: %d", total)
}
}

func TestPeerProtectionSingleTag(t *testing.T) {
SilencePeriod = 0
cm := NewConnManager(19, 20, 0)
SilencePeriod = 10 * time.Second

not := cm.Notifee()

// produce 20 connections with unique peers.
var conns []inet.Conn
for i := 0; i < 20; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// protect the first 5 peers.
var protected []inet.Conn
for _, c := range conns[0:5] {
cm.Protect(c.RemotePeer(), "global")
protected = append(protected, c)
// tag them negatively to make them preferred for pruning.
cm.TagPeer(c.RemotePeer(), "test", -100)
}

// add one more connection, sending the connection manager overboard.
not.Connected(nil, randConn(t, not.Disconnected))

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

for _, c := range protected {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}

// unprotect the first peer.
cm.Unprotect(protected[0].RemotePeer(), "global")

// add 2 more connections, sending the connection manager overboard again.
for i := 0; i < 2; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

cm.lk.Lock() // pacify the race detector
defer cm.lk.Unlock()

if !protected[0].(*tconn).closed {
t.Error("unprotected connection was kept open by connection manager")
}
for _, c := range protected[1:] {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}
}

func TestPeerProtectionMultipleTags(t *testing.T) {
SilencePeriod = 0
cm := NewConnManager(19, 20, 0)
SilencePeriod = 10 * time.Second

not := cm.Notifee()

// produce 20 connections with unique peers.
var conns []inet.Conn
for i := 0; i < 20; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// protect the first 5 peers under two tags.
var protected []inet.Conn
for _, c := range conns[0:5] {
cm.Protect(c.RemotePeer(), "tag1")
cm.Protect(c.RemotePeer(), "tag2")
protected = append(protected, c)
// tag them negatively to make them preferred for pruning.
cm.TagPeer(c.RemotePeer(), "test", -100)
}

// add one more connection, sending the connection manager overboard.
not.Connected(nil, randConn(t, not.Disconnected))

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

for _, c := range protected {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}

// remove the protection from one tag.
for _, c := range protected {
if !cm.Unprotect(c.RemotePeer(), "tag1") {
t.Error("peer should still be protected")
}
}

// add 2 more connections, sending the connection manager overboard again.
for i := 0; i < 2; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

// connections should still remain open, as they were protected.
for _, c := range protected[0:] {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}

// unprotect the first peer entirely.
cm.Unprotect(protected[0].RemotePeer(), "tag2")

// add 2 more connections, sending the connection manager overboard again.
for i := 0; i < 2; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

cm.lk.Lock() // pacify the race detector
defer cm.lk.Unlock()

if !protected[0].(*tconn).closed {
t.Error("unprotected connection was kept open by connection manager")
}
for _, c := range protected[1:] {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}

}

func TestPeerProtectionIdempotent(t *testing.T) {
SilencePeriod = 0
cm := NewConnManager(10, 20, 0)
SilencePeriod = 10 * time.Second

id, _ := tu.RandPeerID()
cm.Protect(id, "global")
cm.Protect(id, "global")
cm.Protect(id, "global")
cm.Protect(id, "global")

if len(cm.protected[id]) > 1 {
t.Error("expected peer to be protected only once")
}

if !cm.Unprotect(id, "unused") {
t.Error("expected peer to continue to be protected")
}

if !cm.Unprotect(id, "unused2") {
t.Error("expected peer to continue to be protected")
}

if cm.Unprotect(id, "global") {
t.Error("expected peer to be unprotected")
}

if len(cm.protected) > 0 {
t.Error("expected no protections")
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/libp2p/go-libp2p-connmgr

require (
github.com/ipfs/go-log v0.0.1
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1
github.com/libp2p/go-libp2p-interface-connmgr v0.0.2
github.com/libp2p/go-libp2p-net v0.0.1
github.com/libp2p/go-libp2p-peer v0.0.1
github.com/libp2p/go-testutil v0.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
github.com/libp2p/go-libp2p-crypto v0.0.1 h1:JNQd8CmoGTohO/akqrH16ewsqZpci2CbgYH/LmYl8gw=
github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.0-20190329140505-d342b88bdd5f h1:aU+VoDSOlDvlD+6kXNN/oo0Ww4ORZh0fiZspWTppbms=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.0-20190329140505-d342b88bdd5f/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1 h1:Q9EkNSLAOF+u90L88qmE9z/fTdjLh8OsJwGw74mkwk4=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-net v0.0.1 h1:xJ4Vh4yKF/XKb8fd1Ev0ebAGzVjMxXzrxG2kjtU+F5Q=
Expand Down

0 comments on commit a3dceaf

Please sign in to comment.