Skip to content

Commit

Permalink
Combine RemoteNode and ProtocolHandler (#1520)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikzhang authored Apr 9, 2020
1 parent aa10ca2 commit 8b558d6
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 431 deletions.
1 change: 0 additions & 1 deletion src/neo/NeoSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public class NeoSystem : IDisposable
$"blockchain-mailbox {{ mailbox-type: \"{typeof(BlockchainMailbox).AssemblyQualifiedName}\" }}" +
$"task-manager-mailbox {{ mailbox-type: \"{typeof(TaskManagerMailbox).AssemblyQualifiedName}\" }}" +
$"remote-node-mailbox {{ mailbox-type: \"{typeof(RemoteNodeMailbox).AssemblyQualifiedName}\" }}" +
$"protocol-handler-mailbox {{ mailbox-type: \"{typeof(ProtocolHandlerMailbox).AssemblyQualifiedName}\" }}" +
$"consensus-service-mailbox {{ mailbox-type: \"{typeof(ConsensusServiceMailbox).AssemblyQualifiedName}\" }}");
public IActorRef Blockchain { get; }
public IActorRef LocalNode { get; }
Expand Down
10 changes: 5 additions & 5 deletions src/neo/Network/P2P/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Neo.Network.P2P
{
public abstract class Connection : UntypedActor
{
internal class Timer { public static Timer Instance = new Timer(); }
internal class Close { public bool Abort; }
internal class Ack : Tcp.Event { public static Ack Instance = new Ack(); }

/// <summary>
Expand All @@ -32,7 +32,7 @@ protected Connection(object connection, IPEndPoint remote, IPEndPoint local)
{
this.Remote = remote;
this.Local = local;
this.timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimitStart), Self, Timer.Instance, ActorRefs.NoSender);
this.timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimitStart), Self, new Close { Abort = true }, ActorRefs.NoSender);
switch (connection)
{
case IActorRef tcp:
Expand Down Expand Up @@ -89,8 +89,8 @@ protected override void OnReceive(object message)
{
switch (message)
{
case Timer _:
Disconnect(true);
case Close close:
Disconnect(close.Abort);
break;
case Ack _:
OnAck();
Expand All @@ -107,7 +107,7 @@ protected override void OnReceive(object message)
private void OnReceived(ByteString data)
{
timer.CancelIfNotNull();
timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimit), Self, Timer.Instance, ActorRefs.NoSender);
timer = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromSeconds(connectionTimeoutLimit), Self, new Close { Abort = true }, ActorRefs.NoSender);
try
{
OnData(data);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
using Akka.Actor;
using Akka.Configuration;
using Neo.Cryptography;
using Neo.IO;
using Neo.IO.Actors;
using Neo.IO.Caching;
using Neo.Ledger;
using Neo.Network.P2P.Capabilities;
using Neo.Network.P2P.Payloads;
using Neo.Persistence;
using Neo.Plugins;
Expand All @@ -17,11 +15,9 @@

namespace Neo.Network.P2P
{
internal class ProtocolHandler : UntypedActor
partial class RemoteNode
{
public class SetFilter { public BloomFilter Filter; }
internal class Timer { }

private class Timer { }
private class PendingKnownHashesCollection : KeyedCollection<UInt256, (UInt256, DateTime)>
{
protected override UInt256 GetKeyForItem((UInt256, DateTime) item)
Expand All @@ -30,11 +26,9 @@ protected override UInt256 GetKeyForItem((UInt256, DateTime) item)
}
}

private readonly NeoSystem system;
private readonly PendingKnownHashesCollection pendingKnownHashes;
private readonly HashSetCache<UInt256> knownHashes;
private readonly HashSetCache<UInt256> sentHashes;
private VersionPayload version;
private readonly PendingKnownHashesCollection pendingKnownHashes = new PendingKnownHashesCollection();
private readonly HashSetCache<UInt256> knownHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
private readonly HashSetCache<UInt256> sentHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
private bool verack = false;
private BloomFilter bloom_filter;

Expand All @@ -43,33 +37,12 @@ protected override UInt256 GetKeyForItem((UInt256, DateTime) item)

private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender);

public ProtocolHandler(NeoSystem system)
{
this.system = system;
this.pendingKnownHashes = new PendingKnownHashesCollection();
this.knownHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
this.sentHashes = new HashSetCache<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2 / 5);
}

protected override void OnReceive(object message)
{
switch (message)
{
case Message msg:
OnMessage(msg);
break;
case Timer _:
OnTimer();
break;
}
}

private void OnMessage(Message msg)
{
foreach (IP2PPlugin plugin in Plugin.P2PPlugins)
if (!plugin.OnP2PMessage(msg))
return;
if (version == null)
if (Version == null)
{
if (msg.Command != MessageCommand.Version)
throw new ProtocolViolationException();
Expand Down Expand Up @@ -158,20 +131,17 @@ private void OnAddrMessageReceived(AddrPayload payload)

private void OnFilterAddMessageReceived(FilterAddPayload payload)
{
if (bloom_filter != null)
bloom_filter.Add(payload.Data);
bloom_filter?.Add(payload.Data);
}

private void OnFilterClearMessageReceived()
{
bloom_filter = null;
Context.Parent.Tell(new SetFilter { Filter = null });
}

private void OnFilterLoadMessageReceived(FilterLoadPayload payload)
{
bloom_filter = new BloomFilter(payload.Filter.Length * 8, payload.K, payload.Tweak, payload.Filter);
Context.Parent.Tell(new SetFilter { Filter = bloom_filter });
}

/// <summary>
Expand All @@ -189,7 +159,7 @@ private void OnGetAddrMessageReceived()
.Take(AddrPayload.MaxCountToSend);
NetworkAddressWithTime[] networkAddresses = peers.Select(p => NetworkAddressWithTime.Create(p.Listener.Address, p.Version.Timestamp, p.Version.Capabilities)).ToArray();
if (networkAddresses.Length == 0) return;
Context.Parent.Tell(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses)));
EnqueueMessage(Message.Create(MessageCommand.Addr, AddrPayload.Create(networkAddresses)));
}

/// <summary>
Expand All @@ -216,7 +186,7 @@ private void OnGetBlocksMessageReceived(GetBlocksPayload payload)
hashes.Add(hash);
}
if (hashes.Count == 0) return;
Context.Parent.Tell(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray())));
EnqueueMessage(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray())));
}

private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload)
Expand All @@ -229,12 +199,12 @@ private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload)

if (bloom_filter == null)
{
Context.Parent.Tell(Message.Create(MessageCommand.Block, block));
EnqueueMessage(Message.Create(MessageCommand.Block, block));
}
else
{
BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray());
Context.Parent.Tell(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags)));
EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags)));
}
}
}
Expand All @@ -255,26 +225,26 @@ private void OnGetDataMessageReceived(InvPayload payload)
case InventoryType.TX:
Transaction tx = Blockchain.Singleton.GetTransaction(hash);
if (tx != null)
Context.Parent.Tell(Message.Create(MessageCommand.Transaction, tx));
EnqueueMessage(Message.Create(MessageCommand.Transaction, tx));
break;
case InventoryType.Block:
Block block = Blockchain.Singleton.GetBlock(hash);
if (block != null)
{
if (bloom_filter == null)
{
Context.Parent.Tell(Message.Create(MessageCommand.Block, block));
EnqueueMessage(Message.Create(MessageCommand.Block, block));
}
else
{
BitArray flags = new BitArray(block.Transactions.Select(p => bloom_filter.Test(p)).ToArray());
Context.Parent.Tell(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags)));
EnqueueMessage(Message.Create(MessageCommand.MerkleBlock, MerkleBlockPayload.Create(block, flags)));
}
}
break;
case InventoryType.Consensus:
if (Blockchain.Singleton.ConsensusRelayCache.TryGet(hash, out IInventory inventoryConsensus))
Context.Parent.Tell(Message.Create(MessageCommand.Consensus, inventoryConsensus));
EnqueueMessage(Message.Create(MessageCommand.Consensus, inventoryConsensus));
break;
}
}
Expand Down Expand Up @@ -304,18 +274,18 @@ private void OnGetHeadersMessageReceived(GetBlocksPayload payload)
headers.Add(header);
}
if (headers.Count == 0) return;
Context.Parent.Tell(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray())));
EnqueueMessage(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray())));
}

private void OnHeadersMessageReceived(HeadersPayload payload)
{
if (payload.Headers.Length == 0) return;
system.Blockchain.Tell(payload.Headers, Context.Parent);
system.Blockchain.Tell(payload.Headers);
}

private void OnInventoryReceived(IInventory inventory)
{
system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }, Context.Parent);
system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash });
system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory });
pendingKnownHashes.Remove(inventory.Hash);
knownHashes.Add(inventory.Hash);
Expand All @@ -339,47 +309,61 @@ private void OnInvMessageReceived(InvPayload payload)
if (hashes.Length == 0) return;
foreach (UInt256 hash in hashes)
pendingKnownHashes.Add((hash, DateTime.UtcNow));
system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) }, Context.Parent);
system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) });
}

private void OnMemPoolMessageReceived()
{
foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, Blockchain.Singleton.MemPool.GetVerifiedTransactions().Select(p => p.Hash).ToArray()))
Context.Parent.Tell(Message.Create(MessageCommand.Inv, payload));
EnqueueMessage(Message.Create(MessageCommand.Inv, payload));
}

private void OnPingMessageReceived(PingPayload payload)
{
Context.Parent.Tell(payload);
Context.Parent.Tell(Message.Create(MessageCommand.Pong, PingPayload.Create(Blockchain.Singleton.Height, payload.Nonce)));
UpdateLastBlockIndex(payload);
EnqueueMessage(Message.Create(MessageCommand.Pong, PingPayload.Create(Blockchain.Singleton.Height, payload.Nonce)));
}

private void OnPongMessageReceived(PingPayload payload)
{
Context.Parent.Tell(payload);
UpdateLastBlockIndex(payload);
}

private void OnVerackMessageReceived()
{
verack = true;
Context.Parent.Tell(MessageCommand.Verack);
system.TaskManager.Tell(new TaskManager.Register { Version = Version });
CheckMessageQueue();
}

private void OnVersionMessageReceived(VersionPayload payload)
{
version = payload;
Context.Parent.Tell(payload);
}

private void OnTimer()
{
RefreshPendingKnownHashes();
}

protected override void PostStop()
{
timer.CancelIfNotNull();
base.PostStop();
Version = payload;
foreach (NodeCapability capability in payload.Capabilities)
{
switch (capability)
{
case FullNodeCapability fullNodeCapability:
IsFullNode = true;
LastBlockIndex = fullNodeCapability.StartHeight;
break;
case ServerCapability serverCapability:
if (serverCapability.Type == NodeCapabilityType.TcpServer)
ListenerTcpPort = serverCapability.Port;
break;
}
}
if (payload.Nonce == LocalNode.Nonce || payload.Magic != ProtocolSettings.Default.Magic)
{
Disconnect(true);
return;
}
if (LocalNode.Singleton.RemoteNodes.Values.Where(p => p != this).Any(p => p.Remote.Address.Equals(Remote.Address) && p.Version?.Nonce == payload.Nonce))
{
Disconnect(true);
return;
}
SendMessage(Message.Create(MessageCommand.Verack));
}

private void RefreshPendingKnownHashes()
Expand All @@ -393,50 +377,12 @@ private void RefreshPendingKnownHashes()
}
}

public static Props Props(NeoSystem system)
{
return Akka.Actor.Props.Create(() => new ProtocolHandler(system)).WithMailbox("protocol-handler-mailbox");
}
}

internal class ProtocolHandlerMailbox : PriorityMailbox
{
public ProtocolHandlerMailbox(Settings settings, Config config)
: base(settings, config)
{
}

internal protected override bool IsHighPriority(object message)
private void UpdateLastBlockIndex(PingPayload payload)
{
if (!(message is Message msg)) return false;
switch (msg.Command)
if (payload.LastBlockIndex > LastBlockIndex)
{
case MessageCommand.Consensus:
case MessageCommand.FilterAdd:
case MessageCommand.FilterClear:
case MessageCommand.FilterLoad:
case MessageCommand.Verack:
case MessageCommand.Version:
case MessageCommand.Alert:
return true;
default:
return false;
}
}

internal protected override bool ShallDrop(object message, IEnumerable queue)
{
if (message is ProtocolHandler.Timer) return false;
if (!(message is Message msg)) return true;
switch (msg.Command)
{
case MessageCommand.GetAddr:
case MessageCommand.GetBlocks:
case MessageCommand.GetHeaders:
case MessageCommand.Mempool:
return queue.OfType<Message>().Any(p => p.Command == msg.Command);
default:
return false;
LastBlockIndex = payload.LastBlockIndex;
system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex });
}
}
}
Expand Down
Loading

0 comments on commit 8b558d6

Please sign in to comment.