Skip to content

Commit

Permalink
feat(Dht): process FindNode request
Browse files Browse the repository at this point in the history
  • Loading branch information
richardschneider committed Apr 23, 2019
1 parent fa41eaa commit 963affb
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 8 deletions.
97 changes: 89 additions & 8 deletions src/Routing/Dht1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ public class Dht1 : IPeerProtocol, IService, IPeerRouting, IContentRouting
/// </summary>
public RoutingTable RoutingTable;

/// <summary>
/// The number of closer peers to return.
/// </summary>
/// <value>
/// Defaults to 20.
/// </value>
public int CloserPeerCount { get; set; } = 20;

/// <inheritdoc />
public override string ToString()
{
Expand All @@ -52,12 +60,30 @@ public override string ToString()
{
var request = await ProtoBufHelper.ReadMessageAsync<DhtMessage>(stream, cancel);

log.Debug($"got message from {connection.RemotePeer}");
var response = new DhtMessage();
// TODO: process the request

ProtoBuf.Serializer.SerializeWithLengthPrefix(stream, response, PrefixStyle.Base128);
await stream.FlushAsync(cancel);
log.Debug($"got {request.Type} from {connection.RemotePeer}");
var response = new DhtMessage
{
Type = request.Type,
ClusterLevelRaw = request.ClusterLevelRaw
};
switch (request.Type)
{
case MessageType.Ping:
response = ProcessPing(request, response);
break;
case MessageType.FindNode:
response = ProcessFindNode(request, response);
break;
default:
log.Debug($"unknown {request.Type} from {connection.RemotePeer}");
// TODO: Should we close the stream?
continue;
}
if (response != null)
{
ProtoBuf.Serializer.SerializeWithLengthPrefix(stream, response, PrefixStyle.Base128);
await stream.FlushAsync(cancel);
}
}
}

Expand Down Expand Up @@ -132,8 +158,8 @@ void Swarm_PeerDiscovered(object sender, Peer e)

/// <inheritdoc />
public async Task<IEnumerable<Peer>> FindProvidersAsync(
Cid id,
int limit = 20,
Cid id,
int limit = 20,
Action<Peer> action = null,
CancellationToken cancel = default(CancellationToken))
{
Expand All @@ -152,5 +178,60 @@ public async Task<IEnumerable<Peer>> FindProvidersAsync(
return dquery.Answers.Take(limit);
}

/// <summary>
/// Process a ping request.
/// </summary>
/// <remarks>
/// Simply return the <paramref name="request"/>.
/// </remarks>
DhtMessage ProcessPing(DhtMessage request, DhtMessage response)
{
return request;
}

/// <summary>
/// Process a find node request.
/// </summary>
/// <remarks>
/// If the node is known
/// </remarks>
public DhtMessage ProcessFindNode(DhtMessage request, DhtMessage response)
{
var peerId = new MultiHash(request.Key);

// Do we know the peer?.
Peer found = null;
if (Swarm.LocalPeer.Id == peerId)
{
found = Swarm.LocalPeer;
}
else
{
found = Swarm.KnownPeers.FirstOrDefault(p => p.Id == peerId);
}

// Find the closer peers.
var closerPeers = new List<Peer>();
if (found != null)
{
closerPeers.Add(found);
}
else
{
closerPeers.AddRange(RoutingTable.NearestPeers(peerId).Take(CloserPeerCount));
}

// Build the response.
response.CloserPeers = closerPeers
.Select(peer => new DhtPeerMessage
{
Id = peer.Id.ToArray(),
Addresses = peer.Addresses.Select(a => a.WithoutPeerId().ToArray()).ToArray()
})
.ToArray();

return response;
}

}
}
143 changes: 143 additions & 0 deletions test/Routing/Dht1Test.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ public class Dht1Test
PublicKey = "CAASXjBcMA0GCSqGSIb3DQEBAQUAA0sAMEgCQQCC5r4nQBtnd9qgjnG8fBN5+gnqIeWEIcUFUdCG4su/vrbQ1py8XGKNUBuDjkyTv25Gd3hlrtNJV3eOKZVSL8ePAgMBAAE="
};

Peer other = new Peer
{
AgentVersion = "other",
Id = "QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h",
Addresses = new MultiAddress[]
{
new MultiAddress("/ip4/127.0.0.1/tcp/4001")
}
};

[TestMethod]
public async Task SeedsRoutingTableFromSwarm()
{
Expand Down Expand Up @@ -51,5 +61,138 @@ public async Task AddDiscoveredPeerToRoutingTable()
await dht.StopAsync();
}
}

[TestMethod]
public async Task ProcessFindNodeMessage_Self()
{
var swarm = new Swarm { LocalPeer = self };
var dht = new Dht1 { Swarm = swarm };
await dht.StartAsync();
try
{
var request = new DhtMessage
{
Type = MessageType.FindNode,
Key = self.Id.ToArray()
};
var response = dht.ProcessFindNode(request, new DhtMessage());
Assert.AreEqual(1, response.CloserPeers.Length);
var ok = response.CloserPeers[0].TryToPeer(out Peer found);
Assert.IsTrue(ok);
Assert.AreEqual(self, found);
}
finally
{
await dht.StopAsync();
}
}

[TestMethod]
public async Task ProcessFindNodeMessage_InRoutingTable()
{
var swarm = new Swarm { LocalPeer = self };
var dht = new Dht1 { Swarm = swarm };
await dht.StartAsync();
try
{
dht.RoutingTable.Add(other);
var request = new DhtMessage
{
Type = MessageType.FindNode,
Key = other.Id.ToArray()
};
var response = dht.ProcessFindNode(request, new DhtMessage());
Assert.AreEqual(1, response.CloserPeers.Length);
var ok = response.CloserPeers[0].TryToPeer(out Peer found);
Assert.IsTrue(ok);
Assert.AreEqual(other, found);
CollectionAssert.AreEqual(other.Addresses.ToArray(),
found.Addresses.Select(a => a.WithoutPeerId()).ToArray());
}
finally
{
await dht.StopAsync();
}
}

[TestMethod]
public async Task ProcessFindNodeMessage_InSwarm()
{
var swarm = new Swarm { LocalPeer = self };
var other = await swarm.RegisterPeerAsync("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h");
var dht = new Dht1 { Swarm = swarm };
await dht.StartAsync();
try
{
dht.RoutingTable.Add(other);
var request = new DhtMessage
{
Type = MessageType.FindNode,
Key = other.Id.ToArray()
};
var response = dht.ProcessFindNode(request, new DhtMessage());
Assert.AreEqual(1, response.CloserPeers.Length);
var ok = response.CloserPeers[0].TryToPeer(out Peer found);
Assert.IsTrue(ok);
Assert.AreEqual(other, found);
CollectionAssert.AreEqual(
other.Addresses.Select(a => a.WithoutPeerId()).ToArray(),
found.Addresses.Select(a => a.WithoutPeerId()).ToArray());
}
finally
{
await dht.StopAsync();
}
}

[TestMethod]
public async Task ProcessFindNodeMessage_Closest()
{
var swarm = new Swarm { LocalPeer = self };
await swarm.RegisterPeerAsync("/ip4/127.0.0.1/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1a");
await swarm.RegisterPeerAsync("/ip4/127.0.0.2/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1b");
await swarm.RegisterPeerAsync("/ip4/127.0.0.3/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1c");
await swarm.RegisterPeerAsync("/ip4/127.0.0.4/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1d");
await swarm.RegisterPeerAsync("/ip4/127.0.0.5/tcp/4001/ipfs/QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1e");
var dht = new Dht1 { Swarm = swarm, CloserPeerCount = 3 };
await dht.StartAsync();
try
{
dht.RoutingTable.Add(other);
var request = new DhtMessage
{
Type = MessageType.FindNode,
Key = other.Id.ToArray()
};
var response = dht.ProcessFindNode(request, new DhtMessage());
Assert.AreEqual(3, response.CloserPeers.Length);
}
finally
{
await dht.StopAsync();
}
}

[TestMethod]
public async Task ProcessFindNodeMessage_NoOtherPeers()
{
var swarm = new Swarm { LocalPeer = self };
var dht = new Dht1 { Swarm = swarm };
await dht.StartAsync();
try
{
var request = new DhtMessage
{
Type = MessageType.FindNode,
Key = new MultiHash("QmdpwjdB94eNm2Lcvp9JqoCxswo3AKQqjLuNZyLixmCM1h").ToArray()
};
var response = dht.ProcessFindNode(request, new DhtMessage());
Assert.AreEqual(0, response.CloserPeers.Length);
}
finally
{
await dht.StopAsync();
}
}
}
}

0 comments on commit 963affb

Please sign in to comment.