diff --git a/src/PubSub/FloodRouter.cs b/src/PubSub/FloodRouter.cs index d111002..2ff32b1 100644 --- a/src/PubSub/FloodRouter.cs +++ b/src/PubSub/FloodRouter.cs @@ -5,6 +5,7 @@ using Semver; using System; using System.Collections.Generic; +using System.Collections.Concurrent; using System.IO; using System.Linq; using System.Text; @@ -21,7 +22,7 @@ public class FloodRouter : IPeerProtocol, IMessageRouter static ILog log = LogManager.GetLogger(typeof(FloodRouter)); MessageTracker tracker = new MessageTracker(); - List localTopics = new List(); + ConcurrentDictionary localTopics = new ConcurrentDictionary(); /// /// The topics of interest of other peers. @@ -138,7 +139,7 @@ public IEnumerable InterestedPeers(string topic) /// public async Task JoinTopicAsync(string topic, CancellationToken cancel) { - localTopics.Add(topic); + localTopics.TryAdd(topic, topic); var msg = new PubSubMessage { Subscriptions = new Subscription[] @@ -164,7 +165,7 @@ public async Task JoinTopicAsync(string topic, CancellationToken cancel) /// public async Task LeaveTopicAsync(string topic, CancellationToken cancel) { - localTopics.Remove(topic); + localTopics.TryRemove(topic, out _); var msg = new PubSubMessage { Subscriptions = new Subscription[] @@ -260,7 +261,7 @@ async void Swarm_ConnectionEstablished(object sender, PeerConnection connection) { var hello = new PubSubMessage { - Subscriptions = localTopics + Subscriptions = localTopics.Values .Select(topic => new Subscription { Subscribe = true, diff --git a/src/PubSub/NotificationService.cs b/src/PubSub/NotificationService.cs index 0454aa0..9d54380 100644 --- a/src/PubSub/NotificationService.cs +++ b/src/PubSub/NotificationService.cs @@ -3,6 +3,7 @@ using Ipfs.CoreApi; using System; using System.Collections.Generic; +using System.Collections.Concurrent; using System.IO; using System.Linq; using System.Net; @@ -30,7 +31,7 @@ class TopicHandler } long nextSequenceNumber; - List topicHandlers; + ConcurrentDictionary topicHandlers; MessageTracker tracker = new MessageTracker(); // TODO: A general purpose CancellationTokenSource that stops publishing of @@ -67,7 +68,7 @@ class TopicHandler /// public async Task StartAsync() { - topicHandlers = new List(); + topicHandlers = new ConcurrentDictionary(); // Resolution of 100 nanoseconds. nextSequenceNumber = DateTime.UtcNow.Ticks; @@ -133,7 +134,7 @@ public PublishedMessage CreateMessage(string topic, byte[] data) /// public Task> SubscribedTopicsAsync(CancellationToken cancel = default(CancellationToken)) { - var topics = topicHandlers + var topics = topicHandlers.Values .Select(t => t.Topic) .Distinct(); return Task.FromResult(topics); @@ -179,14 +180,14 @@ public PublishedMessage CreateMessage(string topic, byte[] data) public async Task SubscribeAsync(string topic, Action handler, CancellationToken cancellationToken) { var topicHandler = new TopicHandler { Topic = topic, Handler = handler }; - topicHandlers.Add(topicHandler); + topicHandlers.TryAdd(topicHandler, topicHandler); // TODO: need a better way. #pragma warning disable VSTHRD101 cancellationToken.Register(async () => { - topicHandlers.Remove(topicHandler); - if (topicHandlers.Count(t => t.Topic == topic) == 0) + topicHandlers.TryRemove(topicHandler, out _); + if (topicHandlers.Values.Count(t => t.Topic == topic) == 0) { await Task.WhenAll(Routers.Select(r => r.LeaveTopicAsync(topic, CancellationToken.None))).ConfigureAwait(false); } @@ -194,7 +195,7 @@ public async Task SubscribeAsync(string topic, Action handler #pragma warning restore VSTHRD101 // Tell routers if first time. - if (topicHandlers.Count(t => t.Topic == topic) == 1) + if (topicHandlers.Values.Count(t => t.Topic == topic) == 1) { await Task.WhenAll(Routers.Select(r => r.JoinTopicAsync(topic, CancellationToken.None))).ConfigureAwait(false); } @@ -224,7 +225,7 @@ void Router_MessageReceived(object sender, PublishedMessage msg) } // Call local topic handlers. - var handlers = topicHandlers + var handlers = topicHandlers.Values .Where(th => msg.Topics.Contains(th.Topic)); foreach (var handler in handlers) {