diff --git a/connmgr.go b/connmgr.go index 626fe76..2629dff 100644 --- a/connmgr.go +++ b/connmgr.go @@ -13,7 +13,7 @@ import ( ma "github.com/multiformats/go-multiaddr" ) -const silencePeriod = 10 * time.Second +var SilencePeriod = 10 * time.Second var log = logging.Logger("connmgr") @@ -32,9 +32,13 @@ type BasicConnMgr struct { gracePeriod time.Duration peers map[peer.ID]*peerInfo + plk sync.RWMutex + protected map[peer.ID]map[string]struct{} + // channel-based semaphore that enforces only a single trim is in progress trimRunningCh chan struct{} lastTrim time.Time + silencePeriod time.Duration } var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil) @@ -52,11 +56,41 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr { gracePeriod: grace, peers: make(map[peer.ID]*peerInfo), trimRunningCh: make(chan struct{}, 1), + protected: make(map[peer.ID]map[string]struct{}, 16), + silencePeriod: SilencePeriod, } } +func (cm *BasicConnMgr) Protect(id peer.ID, tag string) { + cm.plk.Lock() + defer cm.plk.Unlock() + + tags, ok := cm.protected[id] + if !ok { + tags = make(map[string]struct{}, 2) + cm.protected[id] = tags + } + tags[tag] = struct{}{} +} + +func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) { + cm.plk.Lock() + defer cm.plk.Unlock() + + tags, ok := cm.protected[id] + if !ok { + return false + } + if delete(tags, tag); len(tags) == 0 { + delete(cm.protected, id) + return false + } + return true +} + // peerInfo stores metadata for a given peer. type peerInfo struct { + id peer.ID tags map[string]int // value for each tag value int // cached sum of all tag values @@ -79,7 +113,7 @@ func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { return } defer func() { <-cm.trimRunningCh }() - if time.Since(cm.lastTrim) < silencePeriod { + if time.Since(cm.lastTrim) < cm.silencePeriod { // skip this attempt to trim as the last one just took place. return } @@ -108,39 +142,46 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn { return nil } - var infos []*peerInfo - - for _, inf := range cm.peers { - infos = append(infos, inf) + var candidates []*peerInfo + cm.plk.RLock() + for id, inf := range cm.peers { + if _, ok := cm.protected[id]; ok { + // skip over protected peer. + continue + } + candidates = append(candidates, inf) } + cm.plk.RUnlock() // Sort peers according to their value. - sort.Slice(infos, func(i, j int) bool { - return infos[i].value < infos[j].value + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].value < candidates[j].value }) - closeCount := len(infos) - cm.lowWater - toclose := infos[:closeCount] + target := len(cm.peers) - cm.lowWater // 2x number of peers we're disconnecting from because we may have more // than one connection per peer. Slightly over allocating isn't an issue // as this is a very short-lived array. - closed := make([]inet.Conn, 0, len(toclose)*2) + selected := make([]inet.Conn, 0, target*2) - for _, inf := range toclose { + for _, inf := range candidates { // TODO: should we be using firstSeen or the time associated with the connection itself? if inf.firstSeen.Add(cm.gracePeriod).After(now) { continue } - // TODO: if a peer has more than one connection, maybe only close one? for c := range inf.conns { - // TODO: probably don't want to always do this in a goroutine - closed = append(closed, c) + selected = append(selected, c) + } + + target-- + if target == 0 { + break } } - return closed + return selected } // GetTagInfo is called to fetch the tag information associated with a given diff --git a/connmgr_test.go b/connmgr_test.go index 90285c5..a505761 100644 --- a/connmgr_test.go +++ b/connmgr_test.go @@ -317,6 +317,9 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) { // wait for a few seconds time.Sleep(time.Second * 3) + cm.lk.Lock() // pacify the race detector + defer cm.lk.Unlock() + // only the first trim is allowed in; make sure we close at most 20 connections, not all of them. var closed int for _, c := range conns { @@ -331,3 +334,190 @@ func TestQuickBurstRespectsSilencePeriod(t *testing.T) { t.Fatalf("expected closed connections + open conn count to equal 30, value: %d", total) } } + +func TestPeerProtectionSingleTag(t *testing.T) { + SilencePeriod = 0 + cm := NewConnManager(19, 20, 0) + SilencePeriod = 10 * time.Second + + not := cm.Notifee() + + // produce 20 connections with unique peers. + var conns []inet.Conn + for i := 0; i < 20; i++ { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + cm.TagPeer(rc.RemotePeer(), "test", 20) + } + + // protect the first 5 peers. + var protected []inet.Conn + for _, c := range conns[0:5] { + cm.Protect(c.RemotePeer(), "global") + protected = append(protected, c) + // tag them negatively to make them preferred for pruning. + cm.TagPeer(c.RemotePeer(), "test", -100) + } + + // add one more connection, sending the connection manager overboard. + not.Connected(nil, randConn(t, not.Disconnected)) + + // the pruning happens in the background -- this timing condition is not good. + time.Sleep(1 * time.Second) + + for _, c := range protected { + if c.(*tconn).closed { + t.Error("protected connection was closed by connection manager") + } + } + + // unprotect the first peer. + cm.Unprotect(protected[0].RemotePeer(), "global") + + // add 2 more connections, sending the connection manager overboard again. + for i := 0; i < 2; i++ { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + cm.TagPeer(rc.RemotePeer(), "test", 20) + } + + // the pruning happens in the background -- this timing condition is not good. + time.Sleep(1 * time.Second) + + cm.lk.Lock() // pacify the race detector + defer cm.lk.Unlock() + + if !protected[0].(*tconn).closed { + t.Error("unprotected connection was kept open by connection manager") + } + for _, c := range protected[1:] { + if c.(*tconn).closed { + t.Error("protected connection was closed by connection manager") + } + } +} + +func TestPeerProtectionMultipleTags(t *testing.T) { + SilencePeriod = 0 + cm := NewConnManager(19, 20, 0) + SilencePeriod = 10 * time.Second + + not := cm.Notifee() + + // produce 20 connections with unique peers. + var conns []inet.Conn + for i := 0; i < 20; i++ { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + cm.TagPeer(rc.RemotePeer(), "test", 20) + } + + // protect the first 5 peers under two tags. + var protected []inet.Conn + for _, c := range conns[0:5] { + cm.Protect(c.RemotePeer(), "tag1") + cm.Protect(c.RemotePeer(), "tag2") + protected = append(protected, c) + // tag them negatively to make them preferred for pruning. + cm.TagPeer(c.RemotePeer(), "test", -100) + } + + // add one more connection, sending the connection manager overboard. + not.Connected(nil, randConn(t, not.Disconnected)) + + // the pruning happens in the background -- this timing condition is not good. + time.Sleep(1 * time.Second) + + for _, c := range protected { + if c.(*tconn).closed { + t.Error("protected connection was closed by connection manager") + } + } + + // remove the protection from one tag. + for _, c := range protected { + if !cm.Unprotect(c.RemotePeer(), "tag1") { + t.Error("peer should still be protected") + } + } + + // add 2 more connections, sending the connection manager overboard again. + for i := 0; i < 2; i++ { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + cm.TagPeer(rc.RemotePeer(), "test", 20) + } + + // the pruning happens in the background -- this timing condition is not good. + time.Sleep(1 * time.Second) + + // connections should still remain open, as they were protected. + for _, c := range protected[0:] { + if c.(*tconn).closed { + t.Error("protected connection was closed by connection manager") + } + } + + // unprotect the first peer entirely. + cm.Unprotect(protected[0].RemotePeer(), "tag2") + + // add 2 more connections, sending the connection manager overboard again. + for i := 0; i < 2; i++ { + rc := randConn(t, not.Disconnected) + conns = append(conns, rc) + not.Connected(nil, rc) + cm.TagPeer(rc.RemotePeer(), "test", 20) + } + + // the pruning happens in the background -- this timing condition is not good. + time.Sleep(1 * time.Second) + + cm.lk.Lock() // pacify the race detector + defer cm.lk.Unlock() + + if !protected[0].(*tconn).closed { + t.Error("unprotected connection was kept open by connection manager") + } + for _, c := range protected[1:] { + if c.(*tconn).closed { + t.Error("protected connection was closed by connection manager") + } + } + +} + +func TestPeerProtectionIdempotent(t *testing.T) { + SilencePeriod = 0 + cm := NewConnManager(10, 20, 0) + SilencePeriod = 10 * time.Second + + id, _ := tu.RandPeerID() + cm.Protect(id, "global") + cm.Protect(id, "global") + cm.Protect(id, "global") + cm.Protect(id, "global") + + if len(cm.protected[id]) > 1 { + t.Error("expected peer to be protected only once") + } + + if !cm.Unprotect(id, "unused") { + t.Error("expected peer to continue to be protected") + } + + if !cm.Unprotect(id, "unused2") { + t.Error("expected peer to continue to be protected") + } + + if cm.Unprotect(id, "global") { + t.Error("expected peer to be unprotected") + } + + if len(cm.protected) > 0 { + t.Error("expected no protections") + } +} diff --git a/go.mod b/go.mod index e7caf7d..dc3a5d4 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module github.com/libp2p/go-libp2p-connmgr require ( github.com/ipfs/go-log v0.0.1 - github.com/libp2p/go-libp2p-interface-connmgr v0.0.1 + github.com/libp2p/go-libp2p-interface-connmgr v0.0.2 github.com/libp2p/go-libp2p-net v0.0.1 github.com/libp2p/go-libp2p-peer v0.0.1 github.com/libp2p/go-testutil v0.0.1 diff --git a/go.sum b/go.sum index bad5e64..43d17fa 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= github.com/libp2p/go-libp2p-crypto v0.0.1 h1:JNQd8CmoGTohO/akqrH16ewsqZpci2CbgYH/LmYl8gw= github.com/libp2p/go-libp2p-crypto v0.0.1/go.mod h1:yJkNyDmO341d5wwXxDUGO0LykUVT72ImHNUqh5D/dBE= +github.com/libp2p/go-libp2p-interface-connmgr v0.0.0-20190329140505-d342b88bdd5f h1:aU+VoDSOlDvlD+6kXNN/oo0Ww4ORZh0fiZspWTppbms= +github.com/libp2p/go-libp2p-interface-connmgr v0.0.0-20190329140505-d342b88bdd5f/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= github.com/libp2p/go-libp2p-interface-connmgr v0.0.1 h1:Q9EkNSLAOF+u90L88qmE9z/fTdjLh8OsJwGw74mkwk4= github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= github.com/libp2p/go-libp2p-net v0.0.1 h1:xJ4Vh4yKF/XKb8fd1Ev0ebAGzVjMxXzrxG2kjtU+F5Q=