Skip to content
This repository has been archived by the owner on Apr 21, 2022. It is now read-only.

Add peer protection capability (implementation) #36

Merged
merged 3 commits into from
Mar 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 57 additions & 16 deletions connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

const silencePeriod = 10 * time.Second
var SilencePeriod = 10 * time.Second

var log = logging.Logger("connmgr")

Expand All @@ -32,9 +32,13 @@ type BasicConnMgr struct {
gracePeriod time.Duration
peers map[peer.ID]*peerInfo

plk sync.RWMutex
protected map[peer.ID]map[string]struct{}

// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
lastTrim time.Time
silencePeriod time.Duration
raulk marked this conversation as resolved.
Show resolved Hide resolved
}

var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil)
Expand All @@ -52,11 +56,41 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
gracePeriod: grace,
peers: make(map[peer.ID]*peerInfo),
trimRunningCh: make(chan struct{}, 1),
protected: make(map[peer.ID]map[string]struct{}, 16),
silencePeriod: SilencePeriod,
}
}

func (cm *BasicConnMgr) Protect(id peer.ID, tag string) {
cm.plk.Lock()
defer cm.plk.Unlock()

tags, ok := cm.protected[id]
if !ok {
tags = make(map[string]struct{}, 2)
cm.protected[id] = tags
}
tags[tag] = struct{}{}
}

func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {
cm.plk.Lock()
defer cm.plk.Unlock()

tags, ok := cm.protected[id]
if !ok {
return false
}
if delete(tags, tag); len(tags) == 0 {
delete(cm.protected, id)
return false
}
return true
}

// peerInfo stores metadata for a given peer.
type peerInfo struct {
id peer.ID
tags map[string]int // value for each tag
value int // cached sum of all tag values

Expand All @@ -79,7 +113,7 @@ func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
return
}
defer func() { <-cm.trimRunningCh }()
if time.Since(cm.lastTrim) < silencePeriod {
if time.Since(cm.lastTrim) < cm.silencePeriod {
// skip this attempt to trim as the last one just took place.
return
}
Expand Down Expand Up @@ -108,39 +142,46 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {
return nil
}

var infos []*peerInfo

for _, inf := range cm.peers {
infos = append(infos, inf)
var candidates []*peerInfo
cm.plk.RLock()
for id, inf := range cm.peers {
if _, ok := cm.protected[id]; ok {
// skip over protected peer.
continue
}
candidates = append(candidates, inf)
}
cm.plk.RUnlock()

// Sort peers according to their value.
sort.Slice(infos, func(i, j int) bool {
return infos[i].value < infos[j].value
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].value < candidates[j].value
})

closeCount := len(infos) - cm.lowWater
toclose := infos[:closeCount]
target := len(cm.peers) - cm.lowWater

// 2x number of peers we're disconnecting from because we may have more
// than one connection per peer. Slightly over allocating isn't an issue
// as this is a very short-lived array.
closed := make([]inet.Conn, 0, len(toclose)*2)
selected := make([]inet.Conn, 0, target*2)

for _, inf := range toclose {
for _, inf := range candidates {
// TODO: should we be using firstSeen or the time associated with the connection itself?
if inf.firstSeen.Add(cm.gracePeriod).After(now) {
continue
}

// TODO: if a peer has more than one connection, maybe only close one?
for c := range inf.conns {
// TODO: probably don't want to always do this in a goroutine
closed = append(closed, c)
selected = append(selected, c)
}

target--
if target == 0 {
break
}
}

return closed
return selected
}

// GetTagInfo is called to fetch the tag information associated with a given
Expand Down
190 changes: 190 additions & 0 deletions connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
// wait for a few seconds
time.Sleep(time.Second * 3)

cm.lk.Lock() // pacify the race detector
defer cm.lk.Unlock()

// only the first trim is allowed in; make sure we close at most 20 connections, not all of them.
var closed int
for _, c := range conns {
Expand All @@ -331,3 +334,190 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
t.Fatalf("expected closed connections + open conn count to equal 30, value: %d", total)
}
}

func TestPeerProtectionSingleTag(t *testing.T) {
SilencePeriod = 0
cm := NewConnManager(19, 20, 0)
SilencePeriod = 10 * time.Second

not := cm.Notifee()

// produce 20 connections with unique peers.
var conns []inet.Conn
for i := 0; i < 20; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// protect the first 5 peers.
var protected []inet.Conn
for _, c := range conns[0:5] {
cm.Protect(c.RemotePeer(), "global")
protected = append(protected, c)
// tag them negatively to make them preferred for pruning.
cm.TagPeer(c.RemotePeer(), "test", -100)
}

// add one more connection, sending the connection manager overboard.
not.Connected(nil, randConn(t, not.Disconnected))

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

for _, c := range protected {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}

// unprotect the first peer.
cm.Unprotect(protected[0].RemotePeer(), "global")

// add 2 more connections, sending the connection manager overboard again.
for i := 0; i < 2; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

cm.lk.Lock() // pacify the race detector
defer cm.lk.Unlock()

if !protected[0].(*tconn).closed {
t.Error("unprotected connection was kept open by connection manager")
}
for _, c := range protected[1:] {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}
}

func TestPeerProtectionMultipleTags(t *testing.T) {
SilencePeriod = 0
cm := NewConnManager(19, 20, 0)
SilencePeriod = 10 * time.Second

not := cm.Notifee()

// produce 20 connections with unique peers.
var conns []inet.Conn
for i := 0; i < 20; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// protect the first 5 peers under two tags.
var protected []inet.Conn
for _, c := range conns[0:5] {
cm.Protect(c.RemotePeer(), "tag1")
cm.Protect(c.RemotePeer(), "tag2")
protected = append(protected, c)
// tag them negatively to make them preferred for pruning.
cm.TagPeer(c.RemotePeer(), "test", -100)
}

// add one more connection, sending the connection manager overboard.
not.Connected(nil, randConn(t, not.Disconnected))

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

for _, c := range protected {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}

// remove the protection from one tag.
for _, c := range protected {
if !cm.Unprotect(c.RemotePeer(), "tag1") {
t.Error("peer should still be protected")
}
}

// add 2 more connections, sending the connection manager overboard again.
for i := 0; i < 2; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

// connections should still remain open, as they were protected.
for _, c := range protected[0:] {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}

// unprotect the first peer entirely.
cm.Unprotect(protected[0].RemotePeer(), "tag2")

// add 2 more connections, sending the connection manager overboard again.
for i := 0; i < 2; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
cm.TagPeer(rc.RemotePeer(), "test", 20)
}

// the pruning happens in the background -- this timing condition is not good.
time.Sleep(1 * time.Second)

cm.lk.Lock() // pacify the race detector
defer cm.lk.Unlock()

if !protected[0].(*tconn).closed {
t.Error("unprotected connection was kept open by connection manager")
}
for _, c := range protected[1:] {
if c.(*tconn).closed {
t.Error("protected connection was closed by connection manager")
}
}

}

func TestPeerProtectionIdempotent(t *testing.T) {
SilencePeriod = 0
cm := NewConnManager(10, 20, 0)
SilencePeriod = 10 * time.Second

id, _ := tu.RandPeerID()
cm.Protect(id, "global")
cm.Protect(id, "global")
cm.Protect(id, "global")
cm.Protect(id, "global")

if len(cm.protected[id]) > 1 {
t.Error("expected peer to be protected only once")
}

if !cm.Unprotect(id, "unused") {
t.Error("expected peer to continue to be protected")
}

if !cm.Unprotect(id, "unused2") {
t.Error("expected peer to continue to be protected")
}

if cm.Unprotect(id, "global") {
t.Error("expected peer to be unprotected")
}

if len(cm.protected) > 0 {
t.Error("expected no protections")
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/libp2p/go-libp2p-connmgr

require (
github.com/ipfs/go-log v0.0.1
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1
github.com/libp2p/go-libp2p-interface-connmgr v0.0.2
github.com/libp2p/go-libp2p-net v0.0.1
github.com/libp2p/go-libp2p-peer v0.0.1
github.com/libp2p/go-testutil v0.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=
github.com/libp2p/go-libp2p-crypto v0.0.1 h1:JNQd8CmoGTohO/akqrH16ewsqZpci2CbgYH/LmYl8gw=
github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.0-20190329140505-d342b88bdd5f h1:aU+VoDSOlDvlD+6kXNN/oo0Ww4ORZh0fiZspWTppbms=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.0-20190329140505-d342b88bdd5f/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1 h1:Q9EkNSLAOF+u90L88qmE9z/fTdjLh8OsJwGw74mkwk4=
github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k=
github.com/libp2p/go-libp2p-net v0.0.1 h1:xJ4Vh4yKF/XKb8fd1Ev0ebAGzVjMxXzrxG2kjtU+F5Q=
Expand Down