diff --git a/dht.go b/dht.go index 9d733fa1672..1a9488e00f1 100644 --- a/dht.go +++ b/dht.go @@ -23,6 +23,7 @@ import ( ci "github.com/libp2p/go-libp2p-crypto" host "github.com/libp2p/go-libp2p-host" kb "github.com/libp2p/go-libp2p-kbucket" + inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" protocol "github.com/libp2p/go-libp2p-protocol" @@ -272,11 +273,12 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) { // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo { - p := dht.routingTable.Find(id) - if p != "" { - return dht.peerstore.PeerInfo(p) + switch dht.host.Network().Connectedness(id) { + case inet.Connected, inet.CanConnect: + return dht.peerstore.PeerInfo(id) + default: + return pstore.PeerInfo{} } - return pstore.PeerInfo{} } // findPeerSingle asks peer 'p' if they know where the peer with id 'id' is diff --git a/dht_net.go b/dht_net.go index bd7bc76125e..e4dcf004df1 100644 --- a/dht_net.go +++ b/dht_net.go @@ -117,7 +117,11 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message } func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error { - dht.Update(ctx, p) + // Make sure that this node is actually a DHT server, not just a client. + protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...) + if err == nil && len(protos) > 0 { + dht.Update(ctx, p) + } return nil } diff --git a/dht_test.go b/dht_test.go index 4ba89a6e404..22eb952d275 100644 --- a/dht_test.go +++ b/dht_test.go @@ -114,6 +114,8 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer } func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) { + t.Helper() + idB := b.self addrB := b.peerstore.Addrs(idB) if len(addrB) == 0 { @@ -127,18 +129,25 @@ func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) { } } -func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) { - connectNoSync(t, ctx, a, b) +func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) { + t.Helper() // loop until connection notification has been received. // under high load, this may not happen as immediately as we would like. for a.routingTable.Find(b.self) == "" { - time.Sleep(time.Millisecond * 5) + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + case <-time.After(time.Millisecond * 5): + } } +} - for b.routingTable.Find(a.self) == "" { - time.Sleep(time.Millisecond * 5) - } +func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) { + t.Helper() + connectNoSync(t, ctx, a, b) + wait(t, ctx, a, b) + wait(t, ctx, b, a) } func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { @@ -1025,6 +1034,41 @@ func TestClientModeConnect(t *testing.T) { if provs[0].ID != p { t.Fatal("expected it to be our test peer") } + if a.routingTable.Find(b.self) != "" { + t.Fatal("DHT clients should not be added to routing tables") + } + if b.routingTable.Find(a.self) == "" { + t.Fatal("DHT server should have been added to the dht client's routing table") + } +} + +func TestClientModeFindPeer(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + a := setupDHT(ctx, t, false) + b := setupDHT(ctx, t, true) + c := setupDHT(ctx, t, true) + + connectNoSync(t, ctx, b, a) + connectNoSync(t, ctx, c, a) + + // Can't use `connect` because b and c are only clients. + wait(t, ctx, b, a) + wait(t, ctx, c, a) + + pi, err := c.FindPeer(ctx, b.self) + if err != nil { + t.Fatal(err) + } + if len(pi.Addrs) == 0 { + t.Fatal("should have found addresses for node b") + } + + err = c.host.Connect(ctx, pi) + if err != nil { + t.Fatal(err) + } } func TestFindPeerQuery(t *testing.T) { @@ -1098,7 +1142,7 @@ func TestFindPeerQuery(t *testing.T) { sort.Sort(peer.IDSlice(allpeers[1:])) sort.Sort(peer.IDSlice(outpeers)) - fmt.Println("counts: ", count, notfromrtable) + actualclosest := kb.SortClosestPeers(allpeers[1:], rtval) exp := actualclosest[:20] got := kb.SortClosestPeers(outpeers, rtval) diff --git a/handlers.go b/handlers.go index c7c98acff08..633aeda27fe 100644 --- a/handlers.go +++ b/handlers.go @@ -12,6 +12,7 @@ import ( u "github.com/ipfs/go-ipfs-util" pb "github.com/libp2p/go-libp2p-kad-dht/pb" lgbl "github.com/libp2p/go-libp2p-loggables" + inet "github.com/libp2p/go-libp2p-net" peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" recpb "github.com/libp2p/go-libp2p-record/pb" @@ -266,10 +267,28 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess var closest []peer.ID // if looking for self... special case where we send it on CloserPeers. - if peer.ID(pmes.GetKey()) == dht.self { + targetPid := peer.ID(pmes.GetKey()) + if targetPid == dht.self { closest = []peer.ID{dht.self} } else { closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount) + + // Never tell a peer about itself. + if targetPid != p { + // If we're connected to the target peer, report their + // peer info. This makes FindPeer work even if the + // target peer isn't in our routing table. + // + // Alternatively, we could just check our peerstore. + // However, we don't want to return out of date + // information. We can change this in the future when we + // add a progressive, asynchronous `SearchPeer` function + // and improve peer routing in the host. + switch dht.host.Network().Connectedness(targetPid) { + case inet.Connected, inet.CanConnect: + closest = append(closest, targetPid) + } + } } if closest == nil { diff --git a/routing.go b/routing.go index d75c0e0077b..e7afb180a23 100644 --- a/routing.go +++ b/routing.go @@ -176,7 +176,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Opti defer cancel() err := dht.putValueToPeer(ctx, v.From, key, fixupRec) if err != nil { - log.Error("Error correcting DHT entry: ", err) + log.Debug("Error correcting DHT entry: ", err) } }(v) }