From 51478358b01a3c21e92de9b6a4233ebdc84be1e4 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Fri, 3 Apr 2020 04:25:00 +0530 Subject: [PATCH 1/2] get peer infos --- bucket.go | 26 +++++++++++++------------- sorting.go | 2 +- table.go | 22 ++++++++++++++++++---- table_test.go | 37 ++++++++++++++++++++++++++++++++++--- 4 files changed, 66 insertions(+), 21 deletions(-) diff --git a/bucket.go b/bucket.go index 6a5dd56..d277e96 100644 --- a/bucket.go +++ b/bucket.go @@ -9,8 +9,8 @@ import ( "github.com/libp2p/go-libp2p-core/peer" ) -// peerInfo holds all related information for a peer in the K-Bucket. -type peerInfo struct { +// PeerInfo holds all related information for a peer in the K-Bucket. +type PeerInfo struct { Id peer.ID // lastSuccessfulOutboundQuery is the time instant when we last made a successful // outbound query to this peer @@ -37,10 +37,10 @@ func newBucket() *bucket { // returns all peers in the bucket // it is safe for the caller to modify the returned objects as it is a defensive copy -func (b *bucket) peers() []peerInfo { - var ps []peerInfo +func (b *bucket) peers() []PeerInfo { + var ps []PeerInfo for e := b.list.Front(); e != nil; e = e.Next() { - p := e.Value.(*peerInfo) + p := e.Value.(*PeerInfo) ps = append(ps, *p) } return ps @@ -50,7 +50,7 @@ func (b *bucket) peers() []peerInfo { func (b *bucket) peerIds() []peer.ID { ps := make([]peer.ID, 0, b.list.Len()) for e := b.list.Front(); e != nil; e = e.Next() { - p := e.Value.(*peerInfo) + p := e.Value.(*PeerInfo) ps = append(ps, p.Id) } return ps @@ -58,10 +58,10 @@ func (b *bucket) peerIds() []peer.ID { // returns the peer with the given Id if it exists // returns nil if the peerId does not exist -func (b *bucket) getPeer(p peer.ID) *peerInfo { +func (b *bucket) getPeer(p peer.ID) *PeerInfo { for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(*peerInfo).Id == p { - return e.Value.(*peerInfo) + if e.Value.(*PeerInfo).Id == p { + return e.Value.(*PeerInfo) } } return nil @@ -71,7 +71,7 @@ func (b *bucket) getPeer(p peer.ID) *peerInfo { // returns true if successful, false otherwise. func (b *bucket) remove(id peer.ID) bool { for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(*peerInfo).Id == id { + if e.Value.(*PeerInfo).Id == id { b.list.Remove(e) return true } @@ -82,13 +82,13 @@ func (b *bucket) remove(id peer.ID) bool { func (b *bucket) moveToFront(id peer.ID) { for e := b.list.Front(); e != nil; e = e.Next() { - if e.Value.(*peerInfo).Id == id { + if e.Value.(*PeerInfo).Id == id { b.list.MoveToFront(e) } } } -func (b *bucket) pushFront(p *peerInfo) { +func (b *bucket) pushFront(p *PeerInfo) { b.list.PushFront(p) } @@ -105,7 +105,7 @@ func (b *bucket) split(cpl int, target ID) *bucket { newbuck.list = out e := b.list.Front() for e != nil { - pDhtId := e.Value.(*peerInfo).dhtId + pDhtId := e.Value.(*PeerInfo).dhtId peerCPL := CommonPrefixLen(pDhtId, target) if peerCPL > cpl { cur := e diff --git a/sorting.go b/sorting.go index d9dd793..1999099 100644 --- a/sorting.go +++ b/sorting.go @@ -38,7 +38,7 @@ func (pds *peerDistanceSorter) appendPeer(p peer.ID, pDhtId ID) { // Append the peer.ID values in the list to the sorter's slice. It may no longer be sorted. func (pds *peerDistanceSorter) appendPeersFromList(l *list.List) { for e := l.Front(); e != nil; e = e.Next() { - pds.appendPeer(e.Value.(*peerInfo).Id, e.Value.(*peerInfo).dhtId) + pds.appendPeer(e.Value.(*PeerInfo).Id, e.Value.(*PeerInfo).dhtId) } } diff --git a/table.go b/table.go index 0cfdfa1..269d383 100644 --- a/table.go +++ b/table.go @@ -129,7 +129,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { // We have enough space in the bucket (whether spawned or grouped). if bucket.len() < rt.bucketsize { - bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) + bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) rt.PeerAdded(p) return true, nil } @@ -143,7 +143,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { // push the peer only if the bucket isn't overflowing after slitting if bucket.len() < rt.bucketsize { - bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) + bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) rt.PeerAdded(p) return true, nil } @@ -156,7 +156,7 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { if float64(time.Since(pc.lastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold { // let's evict it and add the new peer if bucket.remove(pc.Id) { - bucket.pushFront(&peerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) + bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) rt.PeerAdded(p) return true, nil } @@ -166,6 +166,20 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { return false, ErrPeerRejectedNoCapacity } +// GetPeerInfos returns the peer information that we've stored in the buckets +func (rt *RoutingTable) GetPeerInfos() []PeerInfo { + rt.tabLock.RLock() + defer rt.tabLock.RUnlock() + + var pis []PeerInfo + for _, b := range rt.buckets { + for _, p := range b.peers() { + pis = append(pis, p) + } + } + return pis +} + // UpdateLastSuccessfulOutboundQuery updates the lastSuccessfulOutboundQuery time of the peer // Returns true if the update was successful, false otherwise. func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time) bool { @@ -334,7 +348,7 @@ func (rt *RoutingTable) Print() { fmt.Printf("\tbucket: %d\n", i) for e := b.list.Front(); e != nil; e = e.Next() { - p := e.Value.(*peerInfo).Id + p := e.Value.(*PeerInfo).Id fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String()) } } diff --git a/table_test.go b/table_test.go index 215e82f..79c53e0 100644 --- a/table_test.go +++ b/table_test.go @@ -33,7 +33,7 @@ func TestBucket(t *testing.T) { peers := make([]peer.ID, 100) for i := 0; i < 100; i++ { peers[i] = test.RandPeerIDFatal(t) - b.pushFront(&peerInfo{peers[i], testTime1, ConvertPeerID(peers[i])}) + b.pushFront(&PeerInfo{peers[i], testTime1, ConvertPeerID(peers[i])}) } local := test.RandPeerIDFatal(t) @@ -59,7 +59,7 @@ func TestBucket(t *testing.T) { spl := b.split(0, ConvertPeerID(local)) llist := b.list for e := llist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(*peerInfo).Id) + p := ConvertPeerID(e.Value.(*PeerInfo).Id) cpl := CommonPrefixLen(p, localID) if cpl > 0 { t.Fatalf("split failed. found id with cpl > 0 in 0 bucket") @@ -68,7 +68,7 @@ func TestBucket(t *testing.T) { rlist := spl.list for e := rlist.Front(); e != nil; e = e.Next() { - p := ConvertPeerID(e.Value.(*peerInfo).Id) + p := ConvertPeerID(e.Value.(*PeerInfo).Id) cpl := CommonPrefixLen(p, localID) if cpl == 0 { t.Fatalf("split failed. found id with cpl == 0 in non 0 bucket") @@ -480,6 +480,37 @@ func TestTableMultithreaded(t *testing.T) { <-done } +func TestGetPeerInfos(t *testing.T) { + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt, err := NewRoutingTable(10, ConvertPeerID(local), time.Hour, m, NoOpThreshold) + require.NoError(t, err) + + require.Empty(t, rt.GetPeerInfos()) + + p1 := test.RandPeerIDFatal(t) + p2 := test.RandPeerIDFatal(t) + + b, err := rt.TryAddPeer(p1, false) + require.True(t, b) + require.NoError(t, err) + b, err = rt.TryAddPeer(p2, true) + require.True(t, b) + require.NoError(t, err) + + ps := rt.GetPeerInfos() + require.Len(t, ps, 2) + ms := make(map[peer.ID]PeerInfo) + for _, p := range ps { + ms[p.Id] = p + } + + require.Equal(t, p1, ms[p1].Id) + require.True(t, ms[p1].lastSuccessfulOutboundQuery.IsZero()) + require.Equal(t, p2, ms[p2].Id) + require.False(t, ms[p2].lastSuccessfulOutboundQuery.IsZero()) +} + func BenchmarkAddPeer(b *testing.B) { b.StopTimer() local := ConvertKey("localKey") From 0779168bdae656167ddb6c5de5c319272c948512 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Fri, 3 Apr 2020 04:32:33 +0530 Subject: [PATCH 2/2] export field --- bucket.go | 4 ++-- table.go | 16 ++++++++-------- table_test.go | 16 ++++++++-------- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/bucket.go b/bucket.go index d277e96..2dbb307 100644 --- a/bucket.go +++ b/bucket.go @@ -12,9 +12,9 @@ import ( // PeerInfo holds all related information for a peer in the K-Bucket. type PeerInfo struct { Id peer.ID - // lastSuccessfulOutboundQuery is the time instant when we last made a successful + // LastSuccessfulOutboundQuery is the time instant when we last made a successful // outbound query to this peer - lastSuccessfulOutboundQuery time.Time + LastSuccessfulOutboundQuery time.Time // Id of the peer in the DHT XOR keyspace dhtId ID diff --git a/table.go b/table.go index 269d383..62f206c 100644 --- a/table.go +++ b/table.go @@ -49,7 +49,7 @@ type RoutingTable struct { PeerRemoved func(peer.ID) PeerAdded func(peer.ID) - // maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "lastSuccessfulOutboundQuery" + // maxLastSuccessfulOutboundThreshold is the max threshold/upper limit for the value of "LastSuccessfulOutboundQuery" // of the peer in the bucket above which we will evict it to make place for a new peer if the bucket // is full maxLastSuccessfulOutboundThreshold float64 @@ -87,12 +87,12 @@ func (rt *RoutingTable) Close() error { } // TryAddPeer tries to add a peer to the Routing table. If the peer ALREADY exists in the Routing Table, this call is a no-op. -// If the peer is a queryPeer i.e. we queried it or it queried us, we set the lastSuccessfulOutboundQuery to the current time. +// If the peer is a queryPeer i.e. we queried it or it queried us, we set the LastSuccessfulOutboundQuery to the current time. // If the peer is just a peer that we connect to/it connected to us without any DHT query, we consider it as having -// no lastSuccessfulOutboundQuery. +// no LastSuccessfulOutboundQuery. // // If the logical bucket to which the peer belongs is full and it's not the last bucket, we try to replace an existing peer -// whose lastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer. +// whose LastSuccessfulOutboundQuery is above the maximum allowed threshold in that bucket with the new peer. // If no such peer exists in that bucket, we do NOT add the peer to the Routing Table and return error "ErrPeerRejectedNoCapacity". // It returns a boolean value set to true if the peer was newly added to the Routing Table, false otherwise. @@ -150,10 +150,10 @@ func (rt *RoutingTable) addPeer(p peer.ID, queryPeer bool) (bool, error) { } // the bucket to which the peer belongs is full. Let's try to find a peer - // in that bucket with a lastSuccessfulOutboundQuery value above the maximum threshold and replace it. + // in that bucket with a LastSuccessfulOutboundQuery value above the maximum threshold and replace it. allPeers := bucket.peers() for _, pc := range allPeers { - if float64(time.Since(pc.lastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold { + if float64(time.Since(pc.LastSuccessfulOutboundQuery)) > rt.maxLastSuccessfulOutboundThreshold { // let's evict it and add the new peer if bucket.remove(pc.Id) { bucket.pushFront(&PeerInfo{p, lastSuccessfulOutboundQuery, ConvertPeerID(p)}) @@ -180,7 +180,7 @@ func (rt *RoutingTable) GetPeerInfos() []PeerInfo { return pis } -// UpdateLastSuccessfulOutboundQuery updates the lastSuccessfulOutboundQuery time of the peer +// UpdateLastSuccessfulOutboundQuery updates the LastSuccessfulOutboundQuery time of the peer // Returns true if the update was successful, false otherwise. func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time) bool { rt.tabLock.Lock() @@ -190,7 +190,7 @@ func (rt *RoutingTable) UpdateLastSuccessfulOutboundQuery(p peer.ID, t time.Time bucket := rt.buckets[bucketID] if pc := bucket.getPeer(p); pc != nil { - pc.lastSuccessfulOutboundQuery = t + pc.LastSuccessfulOutboundQuery = t return true } return false diff --git a/table_test.go b/table_test.go index 79c53e0..4e2e82d 100644 --- a/table_test.go +++ b/table_test.go @@ -47,14 +47,14 @@ func TestBucket(t *testing.T) { require.NotNil(t, p) require.Equal(t, peers[i], p.Id) require.Equal(t, ConvertPeerID(peers[i]), p.dhtId) - require.EqualValues(t, testTime1, p.lastSuccessfulOutboundQuery) + require.EqualValues(t, testTime1, p.LastSuccessfulOutboundQuery) // mark as missing t2 := time.Now().Add(1 * time.Hour) - p.lastSuccessfulOutboundQuery = t2 + p.LastSuccessfulOutboundQuery = t2 p = b.getPeer(peers[i]) require.NotNil(t, p) - require.EqualValues(t, t2, p.lastSuccessfulOutboundQuery) + require.EqualValues(t, t2, p.LastSuccessfulOutboundQuery) spl := b.split(0, ConvertPeerID(local)) llist := b.list @@ -218,7 +218,7 @@ func TestUpdateLastSuccessfulOutboundQuery(t *testing.T) { rt.tabLock.Lock() pi := rt.buckets[0].getPeer(p) require.NotNil(t, pi) - require.EqualValues(t, t2, pi.lastSuccessfulOutboundQuery) + require.EqualValues(t, t2, pi.LastSuccessfulOutboundQuery) rt.tabLock.Unlock() } @@ -257,7 +257,7 @@ func TestTryAddPeer(t *testing.T) { require.True(t, b) require.Equal(t, p4, rt.Find(p4)) - // adding a peer with cpl 0 works if an existing peer has lastSuccessfulOutboundQuery above the max threshold + // adding a peer with cpl 0 works if an existing peer has LastSuccessfulOutboundQuery above the max threshold // because that existing peer will get replaced require.True(t, rt.UpdateLastSuccessfulOutboundQuery(p2, time.Now().AddDate(0, 0, -1))) b, err = rt.TryAddPeer(p3, true) @@ -285,7 +285,7 @@ func TestTryAddPeer(t *testing.T) { rt.tabLock.Lock() pi := rt.buckets[rt.bucketIdForPeer(p6)].getPeer(p6) require.NotNil(t, p6) - require.True(t, pi.lastSuccessfulOutboundQuery.IsZero()) + require.True(t, pi.LastSuccessfulOutboundQuery.IsZero()) rt.tabLock.Unlock() } @@ -506,9 +506,9 @@ func TestGetPeerInfos(t *testing.T) { } require.Equal(t, p1, ms[p1].Id) - require.True(t, ms[p1].lastSuccessfulOutboundQuery.IsZero()) + require.True(t, ms[p1].LastSuccessfulOutboundQuery.IsZero()) require.Equal(t, p2, ms[p2].Id) - require.False(t, ms[p2].lastSuccessfulOutboundQuery.IsZero()) + require.False(t, ms[p2].LastSuccessfulOutboundQuery.IsZero()) } func BenchmarkAddPeer(b *testing.B) {