Skip to content

Commit

Permalink
fix(PubSub): thread safety #48
Browse files Browse the repository at this point in the history
  • Loading branch information
richardschneider committed Aug 24, 2019
1 parent eed1a90 commit a1554af
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
9 changes: 5 additions & 4 deletions src/PubSub/FloodRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Semver;
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Text;
Expand All @@ -21,7 +22,7 @@ public class FloodRouter : IPeerProtocol, IMessageRouter
static ILog log = LogManager.GetLogger(typeof(FloodRouter));

MessageTracker tracker = new MessageTracker();
List<string> localTopics = new List<string>();
ConcurrentDictionary<string, string> localTopics = new ConcurrentDictionary<string, string>();

/// <summary>
/// The topics of interest of other peers.
Expand Down Expand Up @@ -138,7 +139,7 @@ public IEnumerable<Peer> InterestedPeers(string topic)
/// <inheritdoc />
public async Task JoinTopicAsync(string topic, CancellationToken cancel)
{
localTopics.Add(topic);
localTopics.TryAdd(topic, topic);
var msg = new PubSubMessage
{
Subscriptions = new Subscription[]
Expand All @@ -164,7 +165,7 @@ public async Task JoinTopicAsync(string topic, CancellationToken cancel)
/// <inheritdoc />
public async Task LeaveTopicAsync(string topic, CancellationToken cancel)
{
localTopics.Remove(topic);
localTopics.TryRemove(topic, out _);
var msg = new PubSubMessage
{
Subscriptions = new Subscription[]
Expand Down Expand Up @@ -260,7 +261,7 @@ async void Swarm_ConnectionEstablished(object sender, PeerConnection connection)
{
var hello = new PubSubMessage
{
Subscriptions = localTopics
Subscriptions = localTopics.Values
.Select(topic => new Subscription
{
Subscribe = true,
Expand Down
17 changes: 9 additions & 8 deletions src/PubSub/NotificationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Ipfs.CoreApi;
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.IO;
using System.Linq;
using System.Net;
Expand Down Expand Up @@ -30,7 +31,7 @@ class TopicHandler
}

long nextSequenceNumber;
List<TopicHandler> topicHandlers;
ConcurrentDictionary<TopicHandler, TopicHandler> topicHandlers;
MessageTracker tracker = new MessageTracker();

// TODO: A general purpose CancellationTokenSource that stops publishing of
Expand Down Expand Up @@ -67,7 +68,7 @@ class TopicHandler
/// <inheritdoc />
public async Task StartAsync()
{
topicHandlers = new List<TopicHandler>();
topicHandlers = new ConcurrentDictionary<TopicHandler, TopicHandler>();

// Resolution of 100 nanoseconds.
nextSequenceNumber = DateTime.UtcNow.Ticks;
Expand Down Expand Up @@ -133,7 +134,7 @@ public PublishedMessage CreateMessage(string topic, byte[] data)
/// <inheritdoc />
public Task<IEnumerable<string>> SubscribedTopicsAsync(CancellationToken cancel = default(CancellationToken))
{
var topics = topicHandlers
var topics = topicHandlers.Values
.Select(t => t.Topic)
.Distinct();
return Task.FromResult(topics);
Expand Down Expand Up @@ -179,22 +180,22 @@ public PublishedMessage CreateMessage(string topic, byte[] data)
public async Task SubscribeAsync(string topic, Action<IPublishedMessage> handler, CancellationToken cancellationToken)
{
var topicHandler = new TopicHandler { Topic = topic, Handler = handler };
topicHandlers.Add(topicHandler);
topicHandlers.TryAdd(topicHandler, topicHandler);

// TODO: need a better way.
#pragma warning disable VSTHRD101
cancellationToken.Register(async () =>
{
topicHandlers.Remove(topicHandler);
if (topicHandlers.Count(t => t.Topic == topic) == 0)
topicHandlers.TryRemove(topicHandler, out _);
if (topicHandlers.Values.Count(t => t.Topic == topic) == 0)
{
await Task.WhenAll(Routers.Select(r => r.LeaveTopicAsync(topic, CancellationToken.None))).ConfigureAwait(false);
}
});
#pragma warning restore VSTHRD101

// Tell routers if first time.
if (topicHandlers.Count(t => t.Topic == topic) == 1)
if (topicHandlers.Values.Count(t => t.Topic == topic) == 1)
{
await Task.WhenAll(Routers.Select(r => r.JoinTopicAsync(topic, CancellationToken.None))).ConfigureAwait(false);
}
Expand Down Expand Up @@ -224,7 +225,7 @@ void Router_MessageReceived(object sender, PublishedMessage msg)
}

// Call local topic handlers.
var handlers = topicHandlers
var handlers = topicHandlers.Values
.Where(th => msg.Topics.Contains(th.Topic));
foreach (var handler in handlers)
{
Expand Down

0 comments on commit a1554af

Please sign in to comment.