Skip to content

Commit

Permalink
fix(Swarm): detecting concurrent connections
Browse files Browse the repository at this point in the history
  • Loading branch information
richardschneider committed May 19, 2019
1 parent 65f941f commit 46d865c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 48 deletions.
36 changes: 21 additions & 15 deletions src/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using PeerTalk.Protocols;
using PeerTalk.Cryptography;
using Ipfs.CoreApi;
using Nito.AsyncEx;

namespace PeerTalk
{
Expand Down Expand Up @@ -122,7 +123,7 @@ public Peer LocalPeer
/// <summary>
/// Outstanding connection tasks.
/// </summary>
ConcurrentDictionary<Peer, Task<PeerConnection>> pendingConnections = new ConcurrentDictionary<Peer, Task<PeerConnection>>();
ConcurrentDictionary<Peer, AsyncLazy<PeerConnection>> pendingConnections = new ConcurrentDictionary<Peer, AsyncLazy<PeerConnection>>();

/// <summary>
/// Manages the swarm's peer connections.
Expand Down Expand Up @@ -316,7 +317,7 @@ public Peer RegisterPeer(Peer peer)
/// </returns>
public bool HasPendingConnection(Peer peer)
{
return pendingConnections.TryGetValue(peer, out Task<PeerConnection> _);
return pendingConnections.TryGetValue(peer, out AsyncLazy<PeerConnection> _);
}

/// <summary>
Expand Down Expand Up @@ -419,8 +420,8 @@ public async Task StopAsync()
/// </remarks>
public async Task<PeerConnection> ConnectAsync(MultiAddress address, CancellationToken cancel = default(CancellationToken))
{
var peer = await RegisterPeerAsync(address, cancel);
return await ConnectAsync(peer, cancel);
var peer = await RegisterPeerAsync(address, cancel).ConfigureAwait(false);
return await ConnectAsync(peer, cancel).ConfigureAwait(false);
}

/// <summary>
Expand All @@ -440,27 +441,30 @@ public async Task StopAsync()
/// If already connected to the peer and is active on any address, then
/// the existing connection is returned.
/// </remarks>
public Task<PeerConnection> ConnectAsync(Peer peer, CancellationToken cancel = default(CancellationToken))
public async Task<PeerConnection> ConnectAsync(Peer peer, CancellationToken cancel = default(CancellationToken))
{
peer = RegisterPeer(peer);

// If connected and still open, then use the existing connection.
if (Manager.TryGet(peer, out PeerConnection conn))
{
return Task.FromResult(conn);
return conn;
}

// Use a current connection attempt to the peer or create a new one.
return pendingConnections.AddOrUpdate(
peer,
(key) =>
try
{
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(swarmCancellation.Token, cancel))
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(swarmCancellation.Token, cancel);
var task = Dial(peer, peer.Addresses, cts.Token);
task.ContinueWith(x => pendingConnections.TryRemove(peer, out Task<PeerConnection> _));
return task;
},
(key, task) => task);
return await pendingConnections
.GetOrAdd(peer, (key) => new AsyncLazy<PeerConnection>(() => Dial(peer, peer.Addresses, cts.Token)))
.ConfigureAwait(false);
}
}
finally
{
pendingConnections.TryRemove(peer, out AsyncLazy<PeerConnection> _);
}
}

/// <summary>
Expand Down Expand Up @@ -524,6 +528,8 @@ public async Task StopAsync()
/// <returns></returns>
async Task<PeerConnection> Dial(Peer remote, IEnumerable<MultiAddress> addrs, CancellationToken cancel)
{
log.Debug($"Dialing {remote}");

if (remote == LocalPeer)
{
throw new Exception("Cannot dial self.");
Expand Down
33 changes: 0 additions & 33 deletions test/SwarmTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,39 +208,6 @@ public async Task Connect_Disconnect()
}
}

[TestMethod]
public async Task Concurent_Connect_SameTask()
{
var swarm = new Swarm { LocalPeer = self };
var venusA = new Peer
{
Id = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
Addresses = new MultiAddress[]
{
"/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
}
};
var venusB = new Peer
{
Id = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",
Addresses = new MultiAddress[]
{
"/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
}
};
await swarm.StartAsync();
try
{
var a = swarm.ConnectAsync(venusA);
var b = swarm.ConnectAsync(venusB);
Assert.AreSame(a, b);
}
finally
{
await swarm.StopAsync();
}
}

[TestMethod]
public async Task Connect_CancelsOnStop()
{
Expand Down

0 comments on commit 46d865c

Please sign in to comment.