Skip to content

Commit

Permalink
Merge pull request #47 from richardschneider/40-threading
Browse files Browse the repository at this point in the history
Threading analyzer
  • Loading branch information
richardschneider authored Aug 23, 2019
2 parents ec7ba4a + 937d84b commit eed1a90
Show file tree
Hide file tree
Showing 17 changed files with 84 additions and 66 deletions.
62 changes: 30 additions & 32 deletions src/AutoDialer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void Dispose()
/// </remarks>
public int MinConnections { get; set; } = DefaultMinConnections;

#pragma warning disable VSTHRD100 // Avoid async void methods
/// <summary>
/// Called when the swarm has a new peer.
/// </summary>
Expand All @@ -91,31 +92,30 @@ public void Dispose()
/// If the <see cref="MinConnections"/> is not reached, then the
/// <paramref name="peer"/> is dialed.
/// </remarks>
void OnPeerDiscovered(object sender, Peer peer)
async void OnPeerDiscovered(object sender, Peer peer)
#pragma warning restore VSTHRD100 // Avoid async void methods
{
var n = swarm.Manager.Connections.Count() + pendingConnects;
if (swarm.IsRunning && n < MinConnections)
{
Interlocked.Increment(ref pendingConnects);
Task.Run(async () =>
log.Debug($"Dialing new {peer}");
try
{
await swarm.ConnectAsync(peer).ConfigureAwait(false);
}
catch(Exception)
{
log.Debug($"Dialing new {peer}");
try
{
await swarm.ConnectAsync(peer).ConfigureAwait(false);
}
catch(Exception)
{
log.Warn($"Failed to dial {peer}");
}
finally
{
Interlocked.Decrement(ref pendingConnects);
}
});
log.Warn($"Failed to dial {peer}");
}
finally
{
Interlocked.Decrement(ref pendingConnects);
}
}
}

#pragma warning disable VSTHRD100 // Avoid async void methods
/// <summary>
/// Called when the swarm has lost a connection to a peer.
/// </summary>
Expand All @@ -129,7 +129,8 @@ void OnPeerDiscovered(object sender, Peer peer)
/// If the <see cref="MinConnections"/> is not reached, then another
/// peer is dialed.
/// </remarks>
void OnPeerDisconnected(object sender, Peer disconnectedPeer)
async void OnPeerDisconnected(object sender, Peer disconnectedPeer)
#pragma warning restore VSTHRD100 // Avoid async void methods
{
var n = swarm.Manager.Connections.Count() + pendingConnects;
if (!swarm.IsRunning || n >= MinConnections)
Expand All @@ -146,22 +147,19 @@ void OnPeerDisconnected(object sender, Peer disconnectedPeer)
var peer = peers[rng.Next(peers.Count())];

Interlocked.Increment(ref pendingConnects);
Task.Run(async () =>
log.Debug($"Dialing {peer}");
try
{
log.Debug($"Dialing {peer}");
try
{
await swarm.ConnectAsync(peer).ConfigureAwait(false);
}
catch (Exception)
{
log.Warn($"Failed to dial {peer}");
}
finally
{
Interlocked.Decrement(ref pendingConnects);
}
});
await swarm.ConnectAsync(peer).ConfigureAwait(false);
}
catch (Exception)
{
log.Warn($"Failed to dial {peer}");
}
finally
{
Interlocked.Decrement(ref pendingConnects);
}
}

}
Expand Down
4 changes: 4 additions & 0 deletions src/Multiplex/Substream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ public void NoMoreData()
/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
{
#pragma warning disable VSTHRD002
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
}

/// <inheritdoc />
Expand Down Expand Up @@ -164,7 +166,9 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
/// <inheritdoc />
public override void Flush()
{
#pragma warning disable VSTHRD002
FlushAsync().GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
}

/// <inheritdoc />
Expand Down
8 changes: 4 additions & 4 deletions src/PeerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ public async Task InitiateAsync(
Initiator = true,
Connection = this
};
muxer.SubstreamCreated += (s, e) => ReadMessages(e, CancellationToken.None);
muxer.SubstreamCreated += (s, e) => _ = ReadMessagesAsync(e, CancellationToken.None);
this.MuxerEstablished.SetResult(muxer);

var _ = muxer.ProcessRequestsAsync();
_ = muxer.ProcessRequestsAsync();
}

/// <summary>
Expand Down Expand Up @@ -267,7 +267,7 @@ public Task EstablishProtocolAsync(string name, CancellationToken cancel)
/// <summary>
/// Starts reading messages from the remote peer.
/// </summary>
public async void ReadMessages(CancellationToken cancel)
public async Task ReadMessagesAsync(CancellationToken cancel)
{
log.Debug($"start reading messsages from {RemoteAddress}");

Expand Down Expand Up @@ -310,7 +310,7 @@ public async void ReadMessages(CancellationToken cancel)
/// <summary>
/// Starts reading messages from the remote peer on the specified stream.
/// </summary>
public async void ReadMessages(Stream stream, CancellationToken cancel)
public async Task ReadMessagesAsync(Stream stream, CancellationToken cancel)
{
IPeerProtocol protocol = new Multistream1();
try
Expand Down
10 changes: 3 additions & 7 deletions src/PeerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,8 @@ public Task StartAsync()
Swarm.ConnectionEstablished += Swarm_ConnectionEstablished;
Swarm.PeerNotReachable += Swarm_PeerNotReachable;

var thread = new Thread(Phoenix)
{
IsBackground = true
};
cancel = new CancellationTokenSource();
thread.Start();
var _ = PhoenixAsync(cancel.Token);

log.Debug("started");
return Task.CompletedTask;
Expand Down Expand Up @@ -145,9 +141,9 @@ void Swarm_ConnectionEstablished(object sender, PeerConnection connection)
/// <summary>
/// Background process to try reconnecting to a dead peer.
/// </summary>
async void Phoenix()
async Task PhoenixAsync(CancellationToken cancellation)
{
while (!cancel.IsCancellationRequested)
while (!cancellation.IsCancellationRequested)
{
try
{
Expand Down
9 changes: 6 additions & 3 deletions src/PeerTalk.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<RootNamespace>PeerTalk</RootNamespace>
<DocumentationFile>bin\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml</DocumentationFile>
<DebugType>portable</DebugType>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>

<!-- developer build is always 0.42 -->
<AssemblyVersion>0.42</AssemblyVersion>
Expand Down Expand Up @@ -59,10 +60,12 @@
<PackageReference Include="semver" Version="2.0.4" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
<PackageReference Include="Portable.BouncyCastle" Version="1.8.5" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-19270-01">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-19270-01" PrivateAssets="All" />
<PackageReference Include="Microsoft.VisualStudio.Threading.Analyzers" Version="16.3.13" PrivateAssets="All" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netstandard14'">
<PackageReference Include="System.Net.NameResolution" Version="4.3.0.0" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion src/Protocols/Identify1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public override string ToString()
/// </param>
/// <param name="cancel"></param>
/// <returns></returns>
public async Task<Peer> GetRemotePeer(PeerConnection connection, CancellationToken cancel)
public async Task<Peer> GetRemotePeerAsync(PeerConnection connection, CancellationToken cancel)
{
var muxer = await connection.MuxerEstablished.Task.ConfigureAwait(false);
log.Debug("Get remote identity");
Expand Down
2 changes: 1 addition & 1 deletion src/Protocols/Mplex67.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public override string ToString()
Connection = connection,
Receiver = true
};
muxer.SubstreamCreated += (s, e) => connection.ReadMessages(e, CancellationToken.None);
muxer.SubstreamCreated += (s, e) => _ = connection.ReadMessagesAsync(e, CancellationToken.None);

// Attach muxer to the connection. It now becomes the message reader.
connection.MuxerEstablished.SetResult(muxer);
Expand Down
2 changes: 2 additions & 0 deletions src/PubSub/FloodRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ async Task SendAsync(byte[] message, Peer peer, CancellationToken cancel)
}
}

#pragma warning disable VSTHRD100 // Avoid async void methods
/// <summary>
/// Raised when a connection is established to a remote peer.
/// </summary>
Expand All @@ -250,6 +251,7 @@ async Task SendAsync(byte[] message, Peer peer, CancellationToken cancel)
/// all topics that are of interest to the local peer.
/// </remarks>
async void Swarm_ConnectionEstablished(object sender, PeerConnection connection)
#pragma warning restore VSTHRD100 // Avoid async void methods
{
if (localTopics.Count == 0)
return;
Expand Down
8 changes: 7 additions & 1 deletion src/PubSub/NotificationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ public PublishedMessage CreateMessage(string topic, byte[] data)
{
using (var ms = new MemoryStream())
{
#pragma warning disable VSTHRD103
message.CopyTo(ms);
#pragma warning disable VSTHRD103
return PublishAsync(topic, ms.ToArray(), cancel);
}
}
Expand All @@ -178,6 +180,9 @@ public async Task SubscribeAsync(string topic, Action<IPublishedMessage> handler
{
var topicHandler = new TopicHandler { Topic = topic, Handler = handler };
topicHandlers.Add(topicHandler);

// TODO: need a better way.
#pragma warning disable VSTHRD101
cancellationToken.Register(async () =>
{
topicHandlers.Remove(topicHandler);
Expand All @@ -186,6 +191,7 @@ public async Task SubscribeAsync(string topic, Action<IPublishedMessage> handler
await Task.WhenAll(Routers.Select(r => r.LeaveTopicAsync(topic, CancellationToken.None))).ConfigureAwait(false);
}
});
#pragma warning restore VSTHRD101

// Tell routers if first time.
if (topicHandlers.Count(t => t.Topic == topic) == 1)
Expand Down Expand Up @@ -233,7 +239,7 @@ void Router_MessageReceived(object sender, PublishedMessage msg)
}

// Tell other message routers.
Task.WhenAll(Routers
_ = Task.WhenAll(Routers
.Where(r => r != sender)
.Select(r => r.PublishAsync(msg, CancellationToken.None))
);
Expand Down
4 changes: 2 additions & 2 deletions src/Routing/Dht1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public async Task<IEnumerable<Peer>> FindProvidersAsync(
/// </remarks>
public void Advertise(Cid cid)
{
Task.Run(async () =>
_ = Task.Run(async () =>
{
int advertsNeeded = 4;
var message = new DhtMessage
Expand Down Expand Up @@ -278,7 +278,7 @@ public void Advertise(Cid cid)
using (var stream = await Swarm.DialAsync(peer, this.ToString()))
{
ProtoBuf.Serializer.SerializeWithLengthPrefix(stream, message, PrefixStyle.Base128);
stream.Flush();
await stream.FlushAsync();
}
if (--advertsNeeded == 0)
break;
Expand Down
1 change: 1 addition & 0 deletions src/SecureCommunication/Secio1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public override string ToString()
await EncryptAsync(connection, cancel).ConfigureAwait(false);
}

#pragma warning disable VSTHRD103
/// <inheritdoc />
public async Task<Stream> EncryptAsync(PeerConnection connection, CancellationToken cancel = default(CancellationToken))
{
Expand Down
8 changes: 6 additions & 2 deletions src/SecureCommunication/Secio1Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ public override void SetLength(long value)
/// <inheritdoc />
public override int Read(byte[] buffer, int offset, int count)
{
#pragma warning disable VSTHRD002
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
}

/// <inheritdoc />
Expand Down Expand Up @@ -202,7 +204,9 @@ async Task<byte[]> ReadPacketBytesAsync(int count, CancellationToken cancel)
/// <inheritdoc />
public override void Flush()
{
#pragma warning disable VSTHRD002
FlushAsync().GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
}

/// <inheritdoc />
Expand All @@ -225,8 +229,8 @@ public override async Task FlushAsync(CancellationToken cancel)
stream.WriteByte((byte)(length >> 16));
stream.WriteByte((byte)(length >> 8));
stream.WriteByte((byte)(length));
stream.Write(data, 0, data.Length);
stream.Write(mac, 0, mac.Length);
await stream.WriteAsync(data, 0, data.Length);
await stream.WriteAsync(mac, 0, mac.Length);
await stream.FlushAsync(cancel).ConfigureAwait(false);

outStream.SetLength(0);
Expand Down
2 changes: 1 addition & 1 deletion src/StatsStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class StatsStream : Stream

static StatsStream()
{
Task.Run(async () =>
_ = Task.Run(async () =>
{
while (true)
{
Expand Down
Loading

0 comments on commit eed1a90

Please sign in to comment.