diff --git a/src/AutoDialer.cs b/src/AutoDialer.cs index 4a3a086..6382602 100644 --- a/src/AutoDialer.cs +++ b/src/AutoDialer.cs @@ -78,6 +78,7 @@ public void Dispose() /// public int MinConnections { get; set; } = DefaultMinConnections; +#pragma warning disable VSTHRD100 // Avoid async void methods /// /// Called when the swarm has a new peer. /// @@ -91,31 +92,30 @@ public void Dispose() /// If the is not reached, then the /// is dialed. /// - void OnPeerDiscovered(object sender, Peer peer) + async void OnPeerDiscovered(object sender, Peer peer) +#pragma warning restore VSTHRD100 // Avoid async void methods { var n = swarm.Manager.Connections.Count() + pendingConnects; if (swarm.IsRunning && n < MinConnections) { Interlocked.Increment(ref pendingConnects); - Task.Run(async () => + log.Debug($"Dialing new {peer}"); + try + { + await swarm.ConnectAsync(peer).ConfigureAwait(false); + } + catch(Exception) { - log.Debug($"Dialing new {peer}"); - try - { - await swarm.ConnectAsync(peer).ConfigureAwait(false); - } - catch(Exception) - { - log.Warn($"Failed to dial {peer}"); - } - finally - { - Interlocked.Decrement(ref pendingConnects); - } - }); + log.Warn($"Failed to dial {peer}"); + } + finally + { + Interlocked.Decrement(ref pendingConnects); + } } } +#pragma warning disable VSTHRD100 // Avoid async void methods /// /// Called when the swarm has lost a connection to a peer. /// @@ -129,7 +129,8 @@ void OnPeerDiscovered(object sender, Peer peer) /// If the is not reached, then another /// peer is dialed. /// - void OnPeerDisconnected(object sender, Peer disconnectedPeer) + async void OnPeerDisconnected(object sender, Peer disconnectedPeer) +#pragma warning restore VSTHRD100 // Avoid async void methods { var n = swarm.Manager.Connections.Count() + pendingConnects; if (!swarm.IsRunning || n >= MinConnections) @@ -146,22 +147,19 @@ void OnPeerDisconnected(object sender, Peer disconnectedPeer) var peer = peers[rng.Next(peers.Count())]; Interlocked.Increment(ref pendingConnects); - Task.Run(async () => + log.Debug($"Dialing {peer}"); + try { - log.Debug($"Dialing {peer}"); - try - { - await swarm.ConnectAsync(peer).ConfigureAwait(false); - } - catch (Exception) - { - log.Warn($"Failed to dial {peer}"); - } - finally - { - Interlocked.Decrement(ref pendingConnects); - } - }); + await swarm.ConnectAsync(peer).ConfigureAwait(false); + } + catch (Exception) + { + log.Warn($"Failed to dial {peer}"); + } + finally + { + Interlocked.Decrement(ref pendingConnects); + } } } diff --git a/src/Multiplex/Substream.cs b/src/Multiplex/Substream.cs index defd73e..2e5d7fa 100644 --- a/src/Multiplex/Substream.cs +++ b/src/Multiplex/Substream.cs @@ -125,7 +125,9 @@ public void NoMoreData() /// public override int Read(byte[] buffer, int offset, int count) { +#pragma warning disable VSTHRD002 return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); +#pragma warning restore VSTHRD002 } /// @@ -164,7 +166,9 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, /// public override void Flush() { +#pragma warning disable VSTHRD002 FlushAsync().GetAwaiter().GetResult(); +#pragma warning restore VSTHRD002 } /// diff --git a/src/PeerConnection.cs b/src/PeerConnection.cs index 779d8c4..e5337c6 100644 --- a/src/PeerConnection.cs +++ b/src/PeerConnection.cs @@ -217,10 +217,10 @@ public async Task InitiateAsync( Initiator = true, Connection = this }; - muxer.SubstreamCreated += (s, e) => ReadMessages(e, CancellationToken.None); + muxer.SubstreamCreated += (s, e) => _ = ReadMessagesAsync(e, CancellationToken.None); this.MuxerEstablished.SetResult(muxer); - var _ = muxer.ProcessRequestsAsync(); + _ = muxer.ProcessRequestsAsync(); } /// @@ -267,7 +267,7 @@ public Task EstablishProtocolAsync(string name, CancellationToken cancel) /// /// Starts reading messages from the remote peer. /// - public async void ReadMessages(CancellationToken cancel) + public async Task ReadMessagesAsync(CancellationToken cancel) { log.Debug($"start reading messsages from {RemoteAddress}"); @@ -310,7 +310,7 @@ public async void ReadMessages(CancellationToken cancel) /// /// Starts reading messages from the remote peer on the specified stream. /// - public async void ReadMessages(Stream stream, CancellationToken cancel) + public async Task ReadMessagesAsync(Stream stream, CancellationToken cancel) { IPeerProtocol protocol = new Multistream1(); try diff --git a/src/PeerManager.cs b/src/PeerManager.cs index b286258..96ffd75 100644 --- a/src/PeerManager.cs +++ b/src/PeerManager.cs @@ -55,12 +55,8 @@ public Task StartAsync() Swarm.ConnectionEstablished += Swarm_ConnectionEstablished; Swarm.PeerNotReachable += Swarm_PeerNotReachable; - var thread = new Thread(Phoenix) - { - IsBackground = true - }; cancel = new CancellationTokenSource(); - thread.Start(); + var _ = PhoenixAsync(cancel.Token); log.Debug("started"); return Task.CompletedTask; @@ -145,9 +141,9 @@ void Swarm_ConnectionEstablished(object sender, PeerConnection connection) /// /// Background process to try reconnecting to a dead peer. /// - async void Phoenix() + async Task PhoenixAsync(CancellationToken cancellation) { - while (!cancel.IsCancellationRequested) + while (!cancellation.IsCancellationRequested) { try { diff --git a/src/PeerTalk.csproj b/src/PeerTalk.csproj index 3523c3a..d53a244 100644 --- a/src/PeerTalk.csproj +++ b/src/PeerTalk.csproj @@ -6,6 +6,7 @@ PeerTalk bin\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml portable + true 0.42 @@ -59,10 +60,12 @@ - - all - + + + + + diff --git a/src/Protocols/Identify1.cs b/src/Protocols/Identify1.cs index c71377c..57a4e3b 100644 --- a/src/Protocols/Identify1.cs +++ b/src/Protocols/Identify1.cs @@ -65,7 +65,7 @@ public override string ToString() /// /// /// - public async Task GetRemotePeer(PeerConnection connection, CancellationToken cancel) + public async Task GetRemotePeerAsync(PeerConnection connection, CancellationToken cancel) { var muxer = await connection.MuxerEstablished.Task.ConfigureAwait(false); log.Debug("Get remote identity"); diff --git a/src/Protocols/Mplex67.cs b/src/Protocols/Mplex67.cs index 0b6ab7c..2581d27 100644 --- a/src/Protocols/Mplex67.cs +++ b/src/Protocols/Mplex67.cs @@ -42,7 +42,7 @@ public override string ToString() Connection = connection, Receiver = true }; - muxer.SubstreamCreated += (s, e) => connection.ReadMessages(e, CancellationToken.None); + muxer.SubstreamCreated += (s, e) => _ = connection.ReadMessagesAsync(e, CancellationToken.None); // Attach muxer to the connection. It now becomes the message reader. connection.MuxerEstablished.SetResult(muxer); diff --git a/src/PubSub/FloodRouter.cs b/src/PubSub/FloodRouter.cs index e202016..d111002 100644 --- a/src/PubSub/FloodRouter.cs +++ b/src/PubSub/FloodRouter.cs @@ -240,6 +240,7 @@ async Task SendAsync(byte[] message, Peer peer, CancellationToken cancel) } } +#pragma warning disable VSTHRD100 // Avoid async void methods /// /// Raised when a connection is established to a remote peer. /// @@ -250,6 +251,7 @@ async Task SendAsync(byte[] message, Peer peer, CancellationToken cancel) /// all topics that are of interest to the local peer. /// async void Swarm_ConnectionEstablished(object sender, PeerConnection connection) +#pragma warning restore VSTHRD100 // Avoid async void methods { if (localTopics.Count == 0) return; diff --git a/src/PubSub/NotificationService.cs b/src/PubSub/NotificationService.cs index f979e38..0454aa0 100644 --- a/src/PubSub/NotificationService.cs +++ b/src/PubSub/NotificationService.cs @@ -159,7 +159,9 @@ public PublishedMessage CreateMessage(string topic, byte[] data) { using (var ms = new MemoryStream()) { +#pragma warning disable VSTHRD103 message.CopyTo(ms); +#pragma warning disable VSTHRD103 return PublishAsync(topic, ms.ToArray(), cancel); } } @@ -178,6 +180,9 @@ public async Task SubscribeAsync(string topic, Action handler { var topicHandler = new TopicHandler { Topic = topic, Handler = handler }; topicHandlers.Add(topicHandler); + + // TODO: need a better way. +#pragma warning disable VSTHRD101 cancellationToken.Register(async () => { topicHandlers.Remove(topicHandler); @@ -186,6 +191,7 @@ public async Task SubscribeAsync(string topic, Action handler await Task.WhenAll(Routers.Select(r => r.LeaveTopicAsync(topic, CancellationToken.None))).ConfigureAwait(false); } }); +#pragma warning restore VSTHRD101 // Tell routers if first time. if (topicHandlers.Count(t => t.Topic == topic) == 1) @@ -233,7 +239,7 @@ void Router_MessageReceived(object sender, PublishedMessage msg) } // Tell other message routers. - Task.WhenAll(Routers + _ = Task.WhenAll(Routers .Where(r => r != sender) .Select(r => r.PublishAsync(msg, CancellationToken.None)) ); diff --git a/src/Routing/Dht1.cs b/src/Routing/Dht1.cs index a8e05d1..d4dddfe 100644 --- a/src/Routing/Dht1.cs +++ b/src/Routing/Dht1.cs @@ -250,7 +250,7 @@ public async Task> FindProvidersAsync( /// public void Advertise(Cid cid) { - Task.Run(async () => + _ = Task.Run(async () => { int advertsNeeded = 4; var message = new DhtMessage @@ -278,7 +278,7 @@ public void Advertise(Cid cid) using (var stream = await Swarm.DialAsync(peer, this.ToString())) { ProtoBuf.Serializer.SerializeWithLengthPrefix(stream, message, PrefixStyle.Base128); - stream.Flush(); + await stream.FlushAsync(); } if (--advertsNeeded == 0) break; diff --git a/src/SecureCommunication/Secio1.cs b/src/SecureCommunication/Secio1.cs index 2bb9958..232314c 100644 --- a/src/SecureCommunication/Secio1.cs +++ b/src/SecureCommunication/Secio1.cs @@ -41,6 +41,7 @@ public override string ToString() await EncryptAsync(connection, cancel).ConfigureAwait(false); } +#pragma warning disable VSTHRD103 /// public async Task EncryptAsync(PeerConnection connection, CancellationToken cancel = default(CancellationToken)) { diff --git a/src/SecureCommunication/Secio1Stream.cs b/src/SecureCommunication/Secio1Stream.cs index 4bd886d..f814c9f 100644 --- a/src/SecureCommunication/Secio1Stream.cs +++ b/src/SecureCommunication/Secio1Stream.cs @@ -121,7 +121,9 @@ public override void SetLength(long value) /// public override int Read(byte[] buffer, int offset, int count) { +#pragma warning disable VSTHRD002 return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); +#pragma warning restore VSTHRD002 } /// @@ -202,7 +204,9 @@ async Task ReadPacketBytesAsync(int count, CancellationToken cancel) /// public override void Flush() { +#pragma warning disable VSTHRD002 FlushAsync().GetAwaiter().GetResult(); +#pragma warning restore VSTHRD002 } /// @@ -225,8 +229,8 @@ public override async Task FlushAsync(CancellationToken cancel) stream.WriteByte((byte)(length >> 16)); stream.WriteByte((byte)(length >> 8)); stream.WriteByte((byte)(length)); - stream.Write(data, 0, data.Length); - stream.Write(mac, 0, mac.Length); + await stream.WriteAsync(data, 0, data.Length); + await stream.WriteAsync(mac, 0, mac.Length); await stream.FlushAsync(cancel).ConfigureAwait(false); outStream.SetLength(0); diff --git a/src/StatsStream.cs b/src/StatsStream.cs index 0efd0ca..63f7d28 100644 --- a/src/StatsStream.cs +++ b/src/StatsStream.cs @@ -30,7 +30,7 @@ public class StatsStream : Stream static StatsStream() { - Task.Run(async () => + _ = Task.Run(async () => { while (true) { diff --git a/src/Swarm.cs b/src/Swarm.cs index 994a052..e523be9 100644 --- a/src/Swarm.cs +++ b/src/Swarm.cs @@ -495,7 +495,7 @@ public async Task StopAsync() using (var cts = CancellationTokenSource.CreateLinkedTokenSource(swarmCancellation.Token, cancel)) { return await pendingConnections - .GetOrAdd(peer, (key) => new AsyncLazy(() => Dial(peer, peer.Addresses, cts.Token))) + .GetOrAdd(peer, (key) => new AsyncLazy(() => DialAsync(peer, peer.Addresses, cts.Token))) .ConfigureAwait(false); } } @@ -569,7 +569,7 @@ public async Task StopAsync() /// /// /// - async Task Dial(Peer remote, IEnumerable addrs, CancellationToken cancel) + async Task DialAsync(Peer remote, IEnumerable addrs, CancellationToken cancel) { log.Debug($"Dialing {remote}"); @@ -611,7 +611,7 @@ async Task Dial(Peer remote, IEnumerable addrs, Ca { var attempts = possibleAddresses .Select(a => DialAsync(remote, a, cts.Token)); - connection = await TaskHelper.WhenAnyResult(attempts, cts.Token).ConfigureAwait(false); + connection = await TaskHelper.WhenAnyResultAsync(attempts, cts.Token).ConfigureAwait(false); cts.Cancel(); // stop other dialing tasks. } } @@ -638,7 +638,7 @@ async Task Dial(Peer remote, IEnumerable addrs, Ca { identify = protocols.OfType().First(); } - await identify.GetRemotePeer(connection, cancel).ConfigureAwait(false); + await identify.GetRemotePeerAsync(connection, cancel).ConfigureAwait(false); } catch (Exception) { @@ -843,6 +843,7 @@ public Task StartListeningAsync(MultiAddress address) return Task.FromResult(addresses.First()); } +#pragma warning disable VSTHRD100 // Avoid async void methods /// /// Called when a remote peer is connecting to the local peer. /// @@ -860,6 +861,7 @@ public Task StartListeningAsync(MultiAddress address) /// logged as warning. /// async void OnRemoteConnect(Stream stream, MultiAddress local, MultiAddress remote) +#pragma warning restore VSTHRD100 // Avoid async void methods { if (!IsRunning) { @@ -920,7 +922,7 @@ async void OnRemoteConnect(Stream stream, MultiAddress local, MultiAddress remot // Start the handshake // TODO: Isn't connection cancel token required. - connection.ReadMessages(default(CancellationToken)); + _ = connection.ReadMessagesAsync(default(CancellationToken)); // Wait for security to be established. await connection.SecurityEstablished.Task.ConfigureAwait(false); @@ -935,7 +937,7 @@ async void OnRemoteConnect(Stream stream, MultiAddress local, MultiAddress remot { identify = protocols.OfType().First(); } - connection.RemotePeer = await identify.GetRemotePeer(connection, default(CancellationToken)).ConfigureAwait(false); + connection.RemotePeer = await identify.GetRemotePeerAsync(connection, default(CancellationToken)).ConfigureAwait(false); connection.RemotePeer = RegisterPeer(connection.RemotePeer); connection.RemoteAddress = new MultiAddress($"{remote}/ipfs/{connection.RemotePeer.Id}"); diff --git a/src/TaskHelper.cs b/src/TaskHelper.cs index 4152233..06df447 100644 --- a/src/TaskHelper.cs +++ b/src/TaskHelper.cs @@ -33,7 +33,7 @@ public static class TaskHelper /// Returns the result of the first task that is not /// faulted or canceled. /// - public static async Task WhenAnyResult( + public static async Task WhenAnyResultAsync( IEnumerable> tasks, CancellationToken cancel) { @@ -45,7 +45,7 @@ public static async Task WhenAnyResult( var winner = await Task.WhenAny(running).ConfigureAwait(false); if (!winner.IsCanceled && !winner.IsFaulted) { - return winner.Result; + return await winner; } if (winner.IsFaulted) { diff --git a/src/Transports/DatagramStream.cs b/src/Transports/DatagramStream.cs index 9459561..e543fa4 100644 --- a/src/Transports/DatagramStream.cs +++ b/src/Transports/DatagramStream.cs @@ -66,7 +66,9 @@ protected override void Dispose(bool disposing) public override void Flush() { +#pragma warning disable VSTHRD002 FlushAsync().GetAwaiter().GetResult(); +#pragma warning restore VSTHRD002 } public override async Task FlushAsync(CancellationToken cancellationToken) @@ -81,7 +83,9 @@ public override async Task FlushAsync(CancellationToken cancellationToken) public override int Read(byte[] buffer, int offset, int count) { +#pragma warning disable VSTHRD002 return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); +#pragma warning restore VSTHRD002 } public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) @@ -93,7 +97,7 @@ public override async Task ReadAsync(byte[] buffer, int offset, int count, receiveBuffer.Position = 0; receiveBuffer.SetLength(0); var size = socket.Receive(datagram); - receiveBuffer.Write(datagram, 0, size); + await receiveBuffer.WriteAsync(datagram, 0, size); receiveBuffer.Position = 0; } return receiveBuffer.Read(buffer, offset, count); diff --git a/src/Transports/Tcp.cs b/src/Transports/Tcp.cs index 6f97ef1..82e418d 100644 --- a/src/Transports/Tcp.cs +++ b/src/Transports/Tcp.cs @@ -160,9 +160,7 @@ public MultiAddress Listen(MultiAddress address, Action ProcessConnection(socket, address, handler, cancel)); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + _ = Task.Run(() => ProcessConnection(socket, address, handler, cancel)); return address; }