From 46d865c2ab504bc1cec75d5f1c0680556ec39406 Mon Sep 17 00:00:00 2001 From: Richard Schneider Date: Sun, 19 May 2019 13:32:29 +1200 Subject: [PATCH] fix(Swarm): detecting concurrent connections --- src/Swarm.cs | 36 +++++++++++++++++++++--------------- test/SwarmTest.cs | 33 --------------------------------- 2 files changed, 21 insertions(+), 48 deletions(-) diff --git a/src/Swarm.cs b/src/Swarm.cs index 4e08ce3..28a38e5 100644 --- a/src/Swarm.cs +++ b/src/Swarm.cs @@ -14,6 +14,7 @@ using PeerTalk.Protocols; using PeerTalk.Cryptography; using Ipfs.CoreApi; +using Nito.AsyncEx; namespace PeerTalk { @@ -122,7 +123,7 @@ public Peer LocalPeer /// /// Outstanding connection tasks. /// - ConcurrentDictionary> pendingConnections = new ConcurrentDictionary>(); + ConcurrentDictionary> pendingConnections = new ConcurrentDictionary>(); /// /// Manages the swarm's peer connections. @@ -316,7 +317,7 @@ public Peer RegisterPeer(Peer peer) /// public bool HasPendingConnection(Peer peer) { - return pendingConnections.TryGetValue(peer, out Task _); + return pendingConnections.TryGetValue(peer, out AsyncLazy _); } /// @@ -419,8 +420,8 @@ public async Task StopAsync() /// public async Task ConnectAsync(MultiAddress address, CancellationToken cancel = default(CancellationToken)) { - var peer = await RegisterPeerAsync(address, cancel); - return await ConnectAsync(peer, cancel); + var peer = await RegisterPeerAsync(address, cancel).ConfigureAwait(false); + return await ConnectAsync(peer, cancel).ConfigureAwait(false); } /// @@ -440,27 +441,30 @@ public async Task StopAsync() /// If already connected to the peer and is active on any address, then /// the existing connection is returned. /// - public Task ConnectAsync(Peer peer, CancellationToken cancel = default(CancellationToken)) + public async Task ConnectAsync(Peer peer, CancellationToken cancel = default(CancellationToken)) { peer = RegisterPeer(peer); // If connected and still open, then use the existing connection. if (Manager.TryGet(peer, out PeerConnection conn)) { - return Task.FromResult(conn); + return conn; } // Use a current connection attempt to the peer or create a new one. - return pendingConnections.AddOrUpdate( - peer, - (key) => + try + { + using (var cts = CancellationTokenSource.CreateLinkedTokenSource(swarmCancellation.Token, cancel)) { - var cts = CancellationTokenSource.CreateLinkedTokenSource(swarmCancellation.Token, cancel); - var task = Dial(peer, peer.Addresses, cts.Token); - task.ContinueWith(x => pendingConnections.TryRemove(peer, out Task _)); - return task; - }, - (key, task) => task); + return await pendingConnections + .GetOrAdd(peer, (key) => new AsyncLazy(() => Dial(peer, peer.Addresses, cts.Token))) + .ConfigureAwait(false); + } + } + finally + { + pendingConnections.TryRemove(peer, out AsyncLazy _); + } } /// @@ -524,6 +528,8 @@ public async Task StopAsync() /// async Task Dial(Peer remote, IEnumerable addrs, CancellationToken cancel) { + log.Debug($"Dialing {remote}"); + if (remote == LocalPeer) { throw new Exception("Cannot dial self."); diff --git a/test/SwarmTest.cs b/test/SwarmTest.cs index 7e8d48d..a0cfc3e 100644 --- a/test/SwarmTest.cs +++ b/test/SwarmTest.cs @@ -208,39 +208,6 @@ public async Task Connect_Disconnect() } } - [TestMethod] - public async Task Concurent_Connect_SameTask() - { - var swarm = new Swarm { LocalPeer = self }; - var venusA = new Peer - { - Id = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", - Addresses = new MultiAddress[] - { - "/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io - } - }; - var venusB = new Peer - { - Id = "QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", - Addresses = new MultiAddress[] - { - "/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io - } - }; - await swarm.StartAsync(); - try - { - var a = swarm.ConnectAsync(venusA); - var b = swarm.ConnectAsync(venusB); - Assert.AreSame(a, b); - } - finally - { - await swarm.StopAsync(); - } - } - [TestMethod] public async Task Connect_CancelsOnStop() {