Skip to content

Commit

Permalink
Merge pull request #45 from richardschneider/peer-manager
Browse files Browse the repository at this point in the history
Manage dead peers
  • Loading branch information
richardschneider authored Aug 21, 2019
2 parents d1a1bff + 844a2eb commit 63fde7f
Show file tree
Hide file tree
Showing 22 changed files with 707 additions and 237 deletions.
10 changes: 2 additions & 8 deletions src/BlackList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,10 @@ public class BlackList<T> : ConcurrentBag<T>, IPolicy<T>
where T : IEquatable<T>
{
/// <inheritdoc />
public Task<bool> IsAllowedAsync(T target, CancellationToken cancel = default(CancellationToken))
public bool IsAllowed(T target)
{
return Task.FromResult(!this.Contains(target));
return !this.Contains(target);
}

/// <inheritdoc />
public async Task<bool> IsNotAllowedAsync(T target, CancellationToken cancel = default(CancellationToken))
{
var q = await IsAllowedAsync(target, cancel).ConfigureAwait(false);
return !q;
}
}
}
24 changes: 3 additions & 21 deletions src/IPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,10 @@ interface IPolicy<T>
/// <param name="target">
/// An object to test against the rule.
/// </param>
/// <param name="cancel">
/// Is used to stop the task. When cancelled, the <see cref="TaskCanceledException"/> is raised.
/// </param>
/// <returns>
/// A task that represents the asynchronous operation. The task's result is
/// <b>true</b> if the <paramref name="target"/> passes the rule.
/// </returns>
Task<bool> IsAllowedAsync(T target, CancellationToken cancel = default(CancellationToken));

/// <summary>
/// Determines if the target fails the rule.
/// </summary>
/// <param name="target">
/// An object to test against the rule.
/// </param>
/// <param name="cancel">
/// Is used to stop the task. When cancelled, the <see cref="TaskCanceledException"/> is raised.
/// </param>
/// <returns>
/// A task that represents the asynchronous operation. The task's result is
/// <b>true</b> if the <paramref name="target"/> fails the rule.
/// <b>true</b> if the <paramref name="target"/> passes the rule;
/// otherwise <b>false</b>.
/// </returns>
Task<bool> IsNotAllowedAsync(T target, CancellationToken cancel = default(CancellationToken));
bool IsAllowed(T target);
}
}
41 changes: 33 additions & 8 deletions src/MultiAddressBlackList.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Ipfs;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -10,17 +11,19 @@
namespace PeerTalk
{
/// <summary>
/// A sequence of filters that are not approved.
/// A collection of filters that are not approved.
/// </summary>
/// <remarks>
/// Only targets that do match a filter will pass.
/// </remarks>
public class MultiAddressBlackList : ConcurrentBag<MultiAddress>, IPolicy<MultiAddress>
public class MultiAddressBlackList : ICollection<MultiAddress>, IPolicy<MultiAddress>
{
ConcurrentDictionary<MultiAddress, MultiAddress> filters = new ConcurrentDictionary<MultiAddress, MultiAddress>();

/// <inheritdoc />
public Task<bool> IsAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken))
public bool IsAllowed(MultiAddress target)
{
return Task.FromResult(!this.Any(filter => Matches(filter, target)));
return !filters.Any(kvp => Matches(kvp.Key, target));
}

bool Matches(MultiAddress filter, MultiAddress target)
Expand All @@ -31,9 +34,31 @@ bool Matches(MultiAddress filter, MultiAddress target)
}

/// <inheritdoc />
public async Task<bool> IsNotAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken))
{
return !await IsAllowedAsync(target, cancel).ConfigureAwait(false);
}
public bool Remove(MultiAddress item) => filters.TryRemove(item, out _);

/// <inheritdoc />
public int Count => filters.Count;

/// <inheritdoc />
public bool IsReadOnly => false;

/// <inheritdoc />
public void Add(MultiAddress item) => filters.TryAdd(item, item);

/// <inheritdoc />
public void Clear() => filters.Clear();

/// <inheritdoc />
public bool Contains(MultiAddress item) => filters.Keys.Contains(item);

/// <inheritdoc />
public void CopyTo(MultiAddress[] array, int arrayIndex) => filters.Keys.CopyTo(array, arrayIndex);

/// <inheritdoc />
public IEnumerator<MultiAddress> GetEnumerator() => filters.Keys.GetEnumerator();

/// <inheritdoc />
IEnumerator IEnumerable.GetEnumerator() => filters.Keys.GetEnumerator();

}
}
42 changes: 33 additions & 9 deletions src/MultiAddressWhiteList.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Ipfs;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -16,15 +17,17 @@ namespace PeerTalk
/// Only targets that are a subset of any filters will pass. If no filters are defined, then anything
/// passes.
/// </remarks>
public class MultiAddressWhiteList : ConcurrentBag<MultiAddress>, IPolicy<MultiAddress>
public class MultiAddressWhiteList : ICollection<MultiAddress>, IPolicy<MultiAddress>
{
ConcurrentDictionary<MultiAddress, MultiAddress> filters = new ConcurrentDictionary<MultiAddress, MultiAddress>();

/// <inheritdoc />
public Task<bool> IsAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken))
public bool IsAllowed(MultiAddress target)
{
if (IsEmpty)
return Task.FromResult(true);
if (filters.IsEmpty)
return true;

return Task.FromResult(this.Any(filter => Matches(filter, target)));
return filters.Any(kvp => Matches(kvp.Key, target));
}

bool Matches(MultiAddress filter, MultiAddress target)
Expand All @@ -35,9 +38,30 @@ bool Matches(MultiAddress filter, MultiAddress target)
}

/// <inheritdoc />
public async Task<bool> IsNotAllowedAsync(MultiAddress target, CancellationToken cancel = default(CancellationToken))
{
return !await IsAllowedAsync(target, cancel).ConfigureAwait(false);
}
public bool Remove(MultiAddress item) => filters.TryRemove(item, out _);

/// <inheritdoc />
public int Count => filters.Count;

/// <inheritdoc />
public bool IsReadOnly => false;

/// <inheritdoc />
public void Add(MultiAddress item) => filters.TryAdd(item, item);

/// <inheritdoc />
public void Clear() => filters.Clear();

/// <inheritdoc />
public bool Contains(MultiAddress item) => filters.Keys.Contains(item);

/// <inheritdoc />
public void CopyTo(MultiAddress[] array, int arrayIndex) => filters.Keys.CopyTo(array, arrayIndex);

/// <inheritdoc />
public IEnumerator<MultiAddress> GetEnumerator() => filters.Keys.GetEnumerator();

/// <inheritdoc />
IEnumerator IEnumerable.GetEnumerator() => filters.Keys.GetEnumerator();
}
}
200 changes: 200 additions & 0 deletions src/PeerManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
using Common.Logging;
using Ipfs;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace PeerTalk
{
/// <summary>
/// Manages the peers.
/// </summary>
/// <remarks>
/// Listens to the <see cref="Swarm"/> events to determine the state
/// of a peer.
/// </remarks>
public class PeerManager : IService
{
static ILog log = LogManager.GetLogger(typeof(PeerManager));
CancellationTokenSource cancel;

/// <summary>
/// Initial time to wait before attempting a reconnection
/// to a dead peer.
/// </summary>
/// <value>
/// Defaults to 1 minute.
/// </value>
public TimeSpan InitialBackoff = TimeSpan.FromMinutes(1);

/// <summary>
/// When reached, the peer is considered permanently dead.
/// </summary>
/// <value>
/// Defaults to 64 minutes.
/// </value>
public TimeSpan MaxBackoff = TimeSpan.FromMinutes(64);

/// <summary>
/// Provides access to other peers.
/// </summary>
public Swarm Swarm { get; set; }

/// <summary>
/// The peers that are reachable.
/// </summary>
public ConcurrentDictionary<Peer, DeadPeer> DeadPeers = new ConcurrentDictionary<Peer, DeadPeer>();

/// <inheritdoc />
public Task StartAsync()
{
Swarm.ConnectionEstablished += Swarm_ConnectionEstablished;
Swarm.PeerNotReachable += Swarm_PeerNotReachable;

var thread = new Thread(Phoenix)
{
IsBackground = true
};
cancel = new CancellationTokenSource();
thread.Start();

log.Debug("started");
return Task.CompletedTask;
}

/// <inheritdoc />
public Task StopAsync()
{
Swarm.ConnectionEstablished -= Swarm_ConnectionEstablished;
Swarm.PeerNotReachable -= Swarm_PeerNotReachable;
DeadPeers.Clear();

cancel.Cancel();
cancel.Dispose();

log.Debug("stopped");
return Task.CompletedTask;
}

/// <summary>
/// Indicates that the peer can not be connected to.
/// </summary>
/// <param name="peer"></param>
public void SetNotReachable(Peer peer)
{
var dead = DeadPeers.AddOrUpdate(peer,
new DeadPeer
{
Peer = peer,
Backoff = InitialBackoff,
NextAttempt = DateTime.Now + InitialBackoff
},
(key, existing) =>
{
existing.Backoff += existing.Backoff;
existing.NextAttempt = existing.Backoff <= MaxBackoff
? DateTime.Now + existing.Backoff
: DateTime.MaxValue;
return existing;
});

Swarm.BlackList.Add($"/p2p/{peer.Id}");
if (dead.NextAttempt != DateTime.MaxValue)
{
log.DebugFormat("Dead '{0}' for {1} minutes.", dead.Peer, dead.Backoff.TotalMinutes);
}
else
{
Swarm.DeregisterPeer(dead.Peer);
log.DebugFormat("Permanently dead '{0}'.", dead.Peer);
}
}

/// <summary>
/// Indicates that the peer can be connected to.
/// </summary>
/// <param name="peer"></param>
public void SetReachable(Peer peer)
{
log.DebugFormat("Alive '{0}'.", peer);

DeadPeers.TryRemove(peer, out DeadPeer _);
Swarm.BlackList.Remove($"/p2p/{peer.Id}");
}

/// <summary>
/// Is invoked by the <see cref="Swarm"/> when a peer can not be connected to.
/// </summary>
void Swarm_PeerNotReachable(object sender, Peer peer)
{
SetNotReachable(peer);
}

/// <summary>
/// Is invoked by the <see cref="Swarm"/> when a peer is connected to.
/// </summary>
void Swarm_ConnectionEstablished(object sender, PeerConnection connection)
{
SetReachable(connection.RemotePeer);
}

/// <summary>
/// Background process to try reconnecting to a dead peer.
/// </summary>
async void Phoenix()
{
while (!cancel.IsCancellationRequested)
{
try
{
await Task.Delay(InitialBackoff);
var now = DateTime.Now;
await DeadPeers.Values
.Where(p => p.NextAttempt < now)
.ParallelForEachAsync(async dead =>
{
log.DebugFormat("Attempt reconnect to {0}", dead.Peer);
Swarm.BlackList.Remove($"/p2p/{dead.Peer.Id}");
try
{
await Swarm.ConnectAsync(dead.Peer, cancel.Token);
}
catch
{
// eat it
}
}, maxDoP: 10);
}
catch
{
// eat it.
}
}
}
}

/// <summary>
/// Information on a peer that is not reachable.
/// </summary>
public class DeadPeer
{
/// <summary>
/// The peer that does not respond.
/// </summary>
public Peer Peer { get; set; }

/// <summary>
/// How long to wait before attempting another connect.
/// </summary>
public TimeSpan Backoff { get; set; }

/// <summary>
/// When another connect should be tried.
/// </summary>
public DateTime NextAttempt { get; set; }
}
}
Loading

0 comments on commit 63fde7f

Please sign in to comment.