Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get Peer Infos #69

Merged
merged 2 commits into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)

// peerInfo holds all related information for a peer in the K-Bucket.
type peerInfo struct {
// PeerInfo holds all related information for a peer in the K-Bucket.
type PeerInfo struct {
Id peer.ID
// lastSuccessfulOutboundQuery is the time instant when we last made a successful
// LastSuccessfulOutboundQuery is the time instant when we last made a successful
// outbound query to this peer
lastSuccessfulOutboundQuery time.Time
LastSuccessfulOutboundQuery time.Time

// Id of the peer in the DHT XOR keyspace
dhtId ID
Expand All @@ -37,10 +37,10 @@ func newBucket() *bucket {

// returns all peers in the bucket
// it is safe for the caller to modify the returned objects as it is a defensive copy
func (b *bucket) peers() []peerInfo {
var ps []peerInfo
func (b *bucket) peers() []PeerInfo {
var ps []PeerInfo
for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo)
p := e.Value.(*PeerInfo)
ps = append(ps, *p)
}
return ps
Expand All @@ -50,18 +50,18 @@ func (b *bucket) peers() []peerInfo {
func (b *bucket) peerIds() []peer.ID {
ps := make([]peer.ID, 0, b.list.Len())
for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo)
p := e.Value.(*PeerInfo)
ps = append(ps, p.Id)
}
return ps
}

// returns the peer with the given Id if it exists
// returns nil if the peerId does not exist
func (b *bucket) getPeer(p peer.ID) *peerInfo {
func (b *bucket) getPeer(p peer.ID) *PeerInfo {
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == p {
return e.Value.(*peerInfo)
if e.Value.(*PeerInfo).Id == p {
return e.Value.(*PeerInfo)
}
}
return nil
Expand All @@ -71,7 +71,7 @@ func (b *bucket) getPeer(p peer.ID) *peerInfo {
// returns true if successful, false otherwise.
func (b *bucket) remove(id peer.ID) bool {
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == id {
if e.Value.(*PeerInfo).Id == id {
b.list.Remove(e)
return true
}
Expand All @@ -82,13 +82,13 @@ func (b *bucket) remove(id peer.ID) bool {
func (b *bucket) moveToFront(id peer.ID) {

for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == id {
if e.Value.(*PeerInfo).Id == id {
b.list.MoveToFront(e)
}
}
}

func (b *bucket) pushFront(p *peerInfo) {
func (b *bucket) pushFront(p *PeerInfo) {
b.list.PushFront(p)
}

Expand All @@ -105,7 +105,7 @@ func (b *bucket) split(cpl int, target ID) *bucket {
newbuck.list = out
e := b.list.Front()
for e != nil {
pDhtId := e.Value.(*peerInfo).dhtId
pDhtId := e.Value.(*PeerInfo).dhtId
peerCPL := CommonPrefixLen(pDhtId, target)
if peerCPL > cpl {
cur := e
Expand Down
2 changes: 1 addition & 1 deletion sorting.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (pds *peerDistanceSorter) appendPeer(p peer.ID, pDhtId ID) {
// Append the peer.ID values in the list to the sorter's slice. It may no longer be sorted.
func (pds *peerDistanceSorter) appendPeersFromList(l *list.List) {
for e := l.Front(); e != nil; e = e.Next() {
pds.appendPeer(e.Value.(*peerInfo).Id, e.Value.(*peerInfo).dhtId)
pds.appendPeer(e.Value.(*PeerInfo).Id, e.Value.(*PeerInfo).dhtId)
}
}

Expand Down
38 changes: 26 additions & 12 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type RoutingTable struct {
PeerRemoved func(peer.ID)
PeerAdded func(peer.ID)

// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "lastSuccessfulOutboundQuery"
// maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "LastSuccessfulOutboundQuery"
// of the peer in the bucket above which we will evict it to make place for a new peer if the bucket
// is full
maxLastSuccessfulOutboundThreshold float64
Expand Down Expand Up @@ -87,12 +87,12 @@ func (rt *RoutingTable) Close() error {
}

// TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op.
// If the peer is a queryPeer i.e. we queried it or it queried us, we set the lastSuccessfulOutboundQuery to the current time.
// If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time.
// If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having
// no lastSuccessfulOutboundQuery.
// no LastSuccessfulOutboundQuery.
//
// If the logical bucket to which the peer belongs is full and it's not the last bucket, we try to replace an existing peer
// whose lastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer.
// whose LastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer.
// If no such peer exists in that bucket, we do NOT add the peer to the Routing Table and return error "ErrPeerRejectedNoCapacity".

// It returns a boolean value set to true if the peer was newly added to the Routing Table, false otherwise.
Expand Down Expand Up @@ -129,7 +129,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {

// We have enough space in the bucket (whether spawned or grouped).
if bucket.len() < rt.bucketsize {
bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
rt.PeerAdded(p)
return true, nil
}
Expand All @@ -143,20 +143,20 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {

// push the peer only if the bucket isn't overflowing after slitting
if bucket.len() < rt.bucketsize {
bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
rt.PeerAdded(p)
return true, nil
}
}

// the bucket to which the peer belongs is full. Let's try to find a peer
// in that bucket with a lastSuccessfulOutboundQuery value above the maximum threshold and replace it.
// in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it.
allPeers := bucket.peers()
for _, pc := range allPeers {
if float64(time.Since(pc.lastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold {
if float64(time.Since(pc.LastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold {
// let's evict it and add the new peer
if bucket.remove(pc.Id) {
bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)})
rt.PeerAdded(p)
return true, nil
}
Expand All @@ -166,7 +166,21 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) {
return false, ErrPeerRejectedNoCapacity
}

// UpdateLastSuccessfulOutboundQuery updates the lastSuccessfulOutboundQuery time of the peer
// GetPeerInfos returns the peer information that we've stored in the buckets
func (rt *RoutingTable) GetPeerInfos() []PeerInfo {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

var pis []PeerInfo
for _, b := range rt.buckets {
for _, p := range b.peers() {
pis = append(pis, p)
}
}
return pis
}

// UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQuery time of the peer
// Returns true if the update was successful, false otherwise.
func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time) bool {
rt.tabLock.Lock()
Expand All @@ -176,7 +190,7 @@ func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time
bucket := rt.buckets[bucketID]

if pc := bucket.getPeer(p); pc != nil {
pc.lastSuccessfulOutboundQuery = t
pc.LastSuccessfulOutboundQuery = t
return true
}
return false
Expand Down Expand Up @@ -334,7 +348,7 @@ func (rt *RoutingTable) Print() {
fmt.Printf("\tbucket: %d\n", i)

for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo).Id
p := e.Value.(*PeerInfo).Id
fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
}
}
Expand Down
49 changes: 40 additions & 9 deletions table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestBucket(t *testing.T) {
peers := make([]peer.ID, 100)
for i := 0; i < 100; i++ {
peers[i] = test.RandPeerIDFatal(t)
b.pushFront(&peerInfo{peers[i], testTime1, ConvertPeerID(peers[i])})
b.pushFront(&PeerInfo{peers[i], testTime1, ConvertPeerID(peers[i])})
}

local := test.RandPeerIDFatal(t)
Expand All @@ -47,19 +47,19 @@ func TestBucket(t *testing.T) {
require.NotNil(t, p)
require.Equal(t, peers[i], p.Id)
require.Equal(t, ConvertPeerID(peers[i]), p.dhtId)
require.EqualValues(t, testTime1, p.lastSuccessfulOutboundQuery)
require.EqualValues(t, testTime1, p.LastSuccessfulOutboundQuery)

// mark as missing
t2 := time.Now().Add(1 * time.Hour)
p.lastSuccessfulOutboundQuery = t2
p.LastSuccessfulOutboundQuery = t2
p = b.getPeer(peers[i])
require.NotNil(t, p)
require.EqualValues(t, t2, p.lastSuccessfulOutboundQuery)
require.EqualValues(t, t2, p.LastSuccessfulOutboundQuery)

spl := b.split(0, ConvertPeerID(local))
llist := b.list
for e := llist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peerInfo).Id)
p := ConvertPeerID(e.Value.(*PeerInfo).Id)
cpl := CommonPrefixLen(p, localID)
if cpl > 0 {
t.Fatalf("split failed. found id with cpl > 0 in 0 bucket")
Expand All @@ -68,7 +68,7 @@ func TestBucket(t *testing.T) {

rlist := spl.list
for e := rlist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peerInfo).Id)
p := ConvertPeerID(e.Value.(*PeerInfo).Id)
cpl := CommonPrefixLen(p, localID)
if cpl == 0 {
t.Fatalf("split failed. found id with cpl == 0 in non 0 bucket")
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) {
rt.tabLock.Lock()
pi := rt.buckets[0].getPeer(p)
require.NotNil(t, pi)
require.EqualValues(t, t2, pi.lastSuccessfulOutboundQuery)
require.EqualValues(t, t2, pi.LastSuccessfulOutboundQuery)
rt.tabLock.Unlock()
}

Expand Down Expand Up @@ -257,7 +257,7 @@ func TestTryAddPeer(t *testing.T) {
require.True(t, b)
require.Equal(t, p4, rt.Find(p4))

// adding a peer with cpl 0 works if an existing peer has lastSuccessfulOutboundQuery above the max threshold
// adding a peer with cpl 0 works if an existing peer has LastSuccessfulOutboundQuery above the max threshold
// because that existing peer will get replaced
require.True(t, rt.UpdateLastSuccessfulOutboundQuery(p2, time.Now().AddDate(0, 0, -1)))
b, err = rt.TryAddPeer(p3, true)
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestTryAddPeer(t *testing.T) {
rt.tabLock.Lock()
pi := rt.buckets[rt.bucketIdForPeer(p6)].getPeer(p6)
require.NotNil(t, p6)
require.True(t, pi.lastSuccessfulOutboundQuery.IsZero())
require.True(t, pi.LastSuccessfulOutboundQuery.IsZero())
rt.tabLock.Unlock()

}
Expand Down Expand Up @@ -480,6 +480,37 @@ func TestTableMultithreaded(t *testing.T) {
<-done
}

func TestGetPeerInfos(t *testing.T) {
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, NoOpThreshold)
require.NoError(t, err)

require.Empty(t, rt.GetPeerInfos())

p1 := test.RandPeerIDFatal(t)
p2 := test.RandPeerIDFatal(t)

b, err := rt.TryAddPeer(p1, false)
require.True(t, b)
require.NoError(t, err)
b, err = rt.TryAddPeer(p2, true)
require.True(t, b)
require.NoError(t, err)

ps := rt.GetPeerInfos()
require.Len(t, ps, 2)
ms := make(map[peer.ID]PeerInfo)
for _, p := range ps {
ms[p.Id] = p
}

require.Equal(t, p1, ms[p1].Id)
require.True(t, ms[p1].LastSuccessfulOutboundQuery.IsZero())
require.Equal(t, p2, ms[p2].Id)
require.False(t, ms[p2].LastSuccessfulOutboundQuery.IsZero())
}

func BenchmarkAddPeer(b *testing.B) {
b.StopTimer()
local := ConvertKey("localKey")
Expand Down