Skip to content

Commit

Permalink
Merge pull request ipfs#166 from libp2p/fix/161
Browse files Browse the repository at this point in the history
fix dht client bugs
  • Loading branch information
Stebalien authored Jun 26, 2018
2 parents cb81b8d + 2093a71 commit 63b47bb
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 14 deletions.
10 changes: 6 additions & 4 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
ci "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
kb "github.com/libp2p/go-libp2p-kbucket"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
protocol "github.com/libp2p/go-libp2p-protocol"
Expand Down Expand Up @@ -272,11 +273,12 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {
p := dht.routingTable.Find(id)
if p != "" {
return dht.peerstore.PeerInfo(p)
switch dht.host.Network().Connectedness(id) {
case inet.Connected, inet.CanConnect:
return dht.peerstore.PeerInfo(id)
default:
return pstore.PeerInfo{}
}
return pstore.PeerInfo{}
}

// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
Expand Down
6 changes: 5 additions & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
dht.Update(ctx, p)
// Make sure that this node is actually a DHT server, not just a client.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
if err == nil && len(protos) > 0 {
dht.Update(ctx, p)
}
return nil
}

Expand Down
58 changes: 51 additions & 7 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer
}

func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
t.Helper()

idB := b.self
addrB := b.peerstore.Addrs(idB)
if len(addrB) == 0 {
Expand All @@ -127,18 +129,25 @@ func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
}
}

func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
connectNoSync(t, ctx, a, b)
func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
t.Helper()

// loop until connection notification has been received.
// under high load, this may not happen as immediately as we would like.
for a.routingTable.Find(b.self) == "" {
time.Sleep(time.Millisecond * 5)
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-time.After(time.Millisecond * 5):
}
}
}

for b.routingTable.Find(a.self) == "" {
time.Sleep(time.Millisecond * 5)
}
func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
t.Helper()
connectNoSync(t, ctx, a, b)
wait(t, ctx, a, b)
wait(t, ctx, b, a)
}

func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
Expand Down Expand Up @@ -1025,6 +1034,41 @@ func TestClientModeConnect(t *testing.T) {
if provs[0].ID != p {
t.Fatal("expected it to be our test peer")
}
if a.routingTable.Find(b.self) != "" {
t.Fatal("DHT clients should not be added to routing tables")
}
if b.routingTable.Find(a.self) == "" {
t.Fatal("DHT server should have been added to the dht client's routing table")
}
}

func TestClientModeFindPeer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

a := setupDHT(ctx, t, false)
b := setupDHT(ctx, t, true)
c := setupDHT(ctx, t, true)

connectNoSync(t, ctx, b, a)
connectNoSync(t, ctx, c, a)

// Can't use `connect` because b and c are only clients.
wait(t, ctx, b, a)
wait(t, ctx, c, a)

pi, err := c.FindPeer(ctx, b.self)
if err != nil {
t.Fatal(err)
}
if len(pi.Addrs) == 0 {
t.Fatal("should have found addresses for node b")
}

err = c.host.Connect(ctx, pi)
if err != nil {
t.Fatal(err)
}
}

func TestFindPeerQuery(t *testing.T) {
Expand Down Expand Up @@ -1098,7 +1142,7 @@ func TestFindPeerQuery(t *testing.T) {

sort.Sort(peer.IDSlice(allpeers[1:]))
sort.Sort(peer.IDSlice(outpeers))
fmt.Println("counts: ", count, notfromrtable)

actualclosest := kb.SortClosestPeers(allpeers[1:], rtval)
exp := actualclosest[:20]
got := kb.SortClosestPeers(outpeers, rtval)
Expand Down
21 changes: 20 additions & 1 deletion handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
u "github.com/ipfs/go-ipfs-util"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
lgbl "github.com/libp2p/go-libp2p-loggables"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
recpb "github.com/libp2p/go-libp2p-record/pb"
Expand Down Expand Up @@ -266,10 +267,28 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
var closest []peer.ID

// if looking for self... special case where we send it on CloserPeers.
if peer.ID(pmes.GetKey()) == dht.self {
targetPid := peer.ID(pmes.GetKey())
if targetPid == dht.self {
closest = []peer.ID{dht.self}
} else {
closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)

// Never tell a peer about itself.
if targetPid != p {
// If we're connected to the target peer, report their
// peer info. This makes FindPeer work even if the
// target peer isn't in our routing table.
//
// Alternatively, we could just check our peerstore.
// However, we don't want to return out of date
// information. We can change this in the future when we
// add a progressive, asynchronous `SearchPeer` function
// and improve peer routing in the host.
switch dht.host.Network().Connectedness(targetPid) {
case inet.Connected, inet.CanConnect:
closest = append(closest, targetPid)
}
}
}

if closest == nil {
Expand Down
2 changes: 1 addition & 1 deletion routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Opti
defer cancel()
err := dht.putValueToPeer(ctx, v.From, key, fixupRec)
if err != nil {
log.Error("Error correcting DHT entry: ", err)
log.Debug("Error correcting DHT entry: ", err)
}
}(v)
}
Expand Down

0 comments on commit 63b47bb

Please sign in to comment.