Skip to content

Commit

Permalink
Merge pull request #69 from libp2p/feat/expose-peerinfo
Browse files Browse the repository at this point in the history
Get Peer Infos
  • Loading branch information
Stebalien authored Apr 2, 2020
2 parents 3d3bf8c + 0779168 commit ee7b926
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 37 deletions.
30 changes: 15 additions & 15 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

// peerInfo holds all related information for a peer in the K-Bucket.
type peerInfo struct {
// PeerInfo holds all related information for a peer in the K-Bucket.
type PeerInfo struct {
Id peer.ID
// lastSuccessfulOutboundQuery is the time instant when we last made a successful
// LastSuccessfulOutboundQuery is the time instant when we last made a successful
// outbound query to this peer
lastSuccessfulOutboundQuery time.Time
LastSuccessfulOutboundQuery time.Time

// Id of the peer in the DHT XOR keyspace
dhtId ID
Expand All @@ -37,10 +37,10 @@ func newBucket() *bucket {

// returns all peers in the bucket
// it is safe for the caller to modify the returned objects as it is a defensive copy
func (b *bucket) peers() []peerInfo {
var ps []peerInfo
func (b *bucket) peers() []PeerInfo {
var ps []PeerInfo
for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo)
p := e.Value.(*PeerInfo)
ps = append(ps, *p)
}
return ps
Expand All @@ -50,18 +50,18 @@ func (b *bucket) peers() []peerInfo {
func (b *bucket) peerIds() []peer.ID {
ps := make([]peer.ID, 0, b.list.Len())
for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo)
p := e.Value.(*PeerInfo)
ps = append(ps, p.Id)
}
return ps
}

// returns the peer with the given Id if it exists
// returns nil if the peerId does not exist
func (b *bucket) getPeer(p peer.ID) *peerInfo {
func (b *bucket) getPeer(p peer.ID) *PeerInfo {
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == p {
return e.Value.(*peerInfo)
if e.Value.(*PeerInfo).Id == p {
return e.Value.(*PeerInfo)
}
}
return nil
Expand All @@ -71,7 +71,7 @@ func (b *bucket) getPeer(p peer.ID) *peerInfo {
// returns true if successful, false otherwise.
func (b *bucket) remove(id peer.ID) bool {
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == id {
if e.Value.(*PeerInfo).Id == id {
b.list.Remove(e)
return true
}
Expand All @@ -82,13 +82,13 @@ func (b *bucket) remove(id peer.ID) bool {
func (b *bucket) moveToFront(id peer.ID) {

for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == id {
if e.Value.(*PeerInfo).Id == id {
b.list.MoveToFront(e)
}
}
}

func (b *bucket) pushFront(p *peerInfo) {
func (b *bucket) pushFront(p *PeerInfo) {
b.list.PushFront(p)
}

Expand All @@ -105,7 +105,7 @@ func (b *bucket) split(cpl int, target ID) *bucket {
newbuck.list = out
e := b.list.Front()
for e != nil {
pDhtId := e.Value.(*peerInfo).dhtId
pDhtId := e.Value.(*PeerInfo).dhtId
peerCPL := CommonPrefixLen(pDhtId, target)
if peerCPL > cpl {
cur := e
Expand Down
2 changes: 1 addition & 1 deletion sorting.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (pds *peerDistanceSorter) appendPeer(p peer.ID, pDhtId ID) {
// Append the peer.ID values in the list to the sorter's slice. It may no longer be sorted.
func (pds *peerDistanceSorter) appendPeersFromList(l *list.List) {
for e := l.Front(); e != nil; e = e.Next() {
pds.appendPeer(e.Value.(*peerInfo).Id, e.Value.(*peerInfo).dhtId)
pds.appendPeer(e.Value.(*PeerInfo).Id, e.Value.(*PeerInfo).dhtId)
}
}

Expand Down
38 changes: 26 additions & 12 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type RoutingTable struct {
PeerRemoved func(peer.ID)
PeerAdded func(peer.ID)

// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "lastSuccessfulOutboundQuery"
// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "LastSuccessfulOutboundQuery"
// of the peer in the bucket above which we will evict it to make place for a new peer if the bucket
// is full
maxLastSuccessfulOutboundThreshold float64
Expand Down Expand Up @@ -87,12 +87,12 @@ func (rt *RoutingTable) Close() error {
}

// TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op.
// If the peer is a queryPeer i.e. we queried it or it queried us, we set the lastSuccessfulOutboundQuery to the current time.
// If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time.
// If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having
// no lastSuccessfulOutboundQuery.
// no LastSuccessfulOutboundQuery.
//
// If the logical bucket to which the peer belongs is full and it's not the last bucket, we try to replace an existing peer
// whose lastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer.
// whose LastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer.
// If no such peer exists in that bucket, we do NOT add the peer to the Routing Table and return error "ErrPeerRejectedNoCapacity".

// It returns a boolean value set to true if the peer was newly added to the Routing Table, false otherwise.
Expand Down Expand Up @@ -129,7 +129,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {

// We have enough space in the bucket (whether spawned or grouped).
if bucket.len() < rt.bucketsize {
bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
rt.PeerAdded(p)
return true, nil
}
Expand All @@ -143,20 +143,20 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {

// push the peer only if the bucket isn't overflowing after slitting
if bucket.len() < rt.bucketsize {
bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
rt.PeerAdded(p)
return true, nil
}
}

// the bucket to which the peer belongs is full. Let's try to find a peer
// in that bucket with a lastSuccessfulOutboundQuery value above the maximum threshold and replace it.
// in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it.
allPeers := bucket.peers()
for _, pc := range allPeers {
if float64(time.Since(pc.lastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold {
if float64(time.Since(pc.LastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold {
// let's evict it and add the new peer
if bucket.remove(pc.Id) {
bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
rt.PeerAdded(p)
return true, nil
}
Expand All @@ -166,7 +166,21 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
return false, ErrPeerRejectedNoCapacity
}

// UpdateLastSuccessfulOutboundQuery updates the lastSuccessfulOutboundQuery time of the peer
// GetPeerInfos returns the peer information that we've stored in the buckets
func (rt *RoutingTable) GetPeerInfos() []PeerInfo {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

var pis []PeerInfo
for _, b := range rt.buckets {
for _, p := range b.peers() {
pis = append(pis, p)
}
}
return pis
}

// UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQuery time of the peer
// Returns true if the update was successful, false otherwise.
func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time) bool {
rt.tabLock.Lock()
Expand All @@ -176,7 +190,7 @@ func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time
bucket := rt.buckets[bucketID]

if pc := bucket.getPeer(p); pc != nil {
pc.lastSuccessfulOutboundQuery = t
pc.LastSuccessfulOutboundQuery = t
return true
}
return false
Expand Down Expand Up @@ -334,7 +348,7 @@ func (rt *RoutingTable) Print() {
fmt.Printf("\tbucket: %d\n", i)

for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo).Id
p := e.Value.(*PeerInfo).Id
fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
}
}
Expand Down
49 changes: 40 additions & 9 deletions table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestBucket(t *testing.T) {
peers := make([]peer.ID, 100)
for i := 0; i < 100; i++ {
peers[i] = test.RandPeerIDFatal(t)
b.pushFront(&peerInfo{peers[i], testTime1, ConvertPeerID(peers[i])})
b.pushFront(&PeerInfo{peers[i], testTime1, ConvertPeerID(peers[i])})
}

local := test.RandPeerIDFatal(t)
Expand All @@ -47,19 +47,19 @@ func TestBucket(t *testing.T) {
require.NotNil(t, p)
require.Equal(t, peers[i], p.Id)
require.Equal(t, ConvertPeerID(peers[i]), p.dhtId)
require.EqualValues(t, testTime1, p.lastSuccessfulOutboundQuery)
require.EqualValues(t, testTime1, p.LastSuccessfulOutboundQuery)

// mark as missing
t2 := time.Now().Add(1 * time.Hour)
p.lastSuccessfulOutboundQuery = t2
p.LastSuccessfulOutboundQuery = t2
p = b.getPeer(peers[i])
require.NotNil(t, p)
require.EqualValues(t, t2, p.lastSuccessfulOutboundQuery)
require.EqualValues(t, t2, p.LastSuccessfulOutboundQuery)

spl := b.split(0, ConvertPeerID(local))
llist := b.list
for e := llist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peerInfo).Id)
p := ConvertPeerID(e.Value.(*PeerInfo).Id)
cpl := CommonPrefixLen(p, localID)
if cpl > 0 {
t.Fatalf("split failed. found id with cpl > 0 in 0 bucket")
Expand All @@ -68,7 +68,7 @@ func TestBucket(t *testing.T) {

rlist := spl.list
for e := rlist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peerInfo).Id)
p := ConvertPeerID(e.Value.(*PeerInfo).Id)
cpl := CommonPrefixLen(p, localID)
if cpl == 0 {
t.Fatalf("split failed. found id with cpl == 0 in non 0 bucket")
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) {
rt.tabLock.Lock()
pi := rt.buckets[0].getPeer(p)
require.NotNil(t, pi)
require.EqualValues(t, t2, pi.lastSuccessfulOutboundQuery)
require.EqualValues(t, t2, pi.LastSuccessfulOutboundQuery)
rt.tabLock.Unlock()
}

Expand Down Expand Up @@ -257,7 +257,7 @@ func TestTryAddPeer(t *testing.T) {
require.True(t, b)
require.Equal(t, p4, rt.Find(p4))

// adding a peer with cpl 0 works if an existing peer has lastSuccessfulOutboundQuery above the max threshold
// adding a peer with cpl 0 works if an existing peer has LastSuccessfulOutboundQuery above the max threshold
// because that existing peer will get replaced
require.True(t, rt.UpdateLastSuccessfulOutboundQuery(p2, time.Now().AddDate(0, 0, -1)))
b, err = rt.TryAddPeer(p3, true)
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestTryAddPeer(t *testing.T) {
rt.tabLock.Lock()
pi := rt.buckets[rt.bucketIdForPeer(p6)].getPeer(p6)
require.NotNil(t, p6)
require.True(t, pi.lastSuccessfulOutboundQuery.IsZero())
require.True(t, pi.LastSuccessfulOutboundQuery.IsZero())
rt.tabLock.Unlock()

}
Expand Down Expand Up @@ -399,6 +399,37 @@ func TestTableMultithreaded(t *testing.T) {
<-done
}

func TestGetPeerInfos(t *testing.T) {
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
require.NoError(t, err)

require.Empty(t, rt.GetPeerInfos())

p1 := test.RandPeerIDFatal(t)
p2 := test.RandPeerIDFatal(t)

b, err := rt.TryAddPeer(p1, false)
require.True(t, b)
require.NoError(t, err)
b, err = rt.TryAddPeer(p2, true)
require.True(t, b)
require.NoError(t, err)

ps := rt.GetPeerInfos()
require.Len(t, ps, 2)
ms := make(map[peer.ID]PeerInfo)
for _, p := range ps {
ms[p.Id] = p
}

require.Equal(t, p1, ms[p1].Id)
require.True(t, ms[p1].LastSuccessfulOutboundQuery.IsZero())
require.Equal(t, p2, ms[p2].Id)
require.False(t, ms[p2].LastSuccessfulOutboundQuery.IsZero())
}

func BenchmarkAddPeer(b *testing.B) {
b.StopTimer()
local := ConvertKey("localKey")
Expand Down

0 comments on commit ee7b926

Please sign in to comment.