Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/Improve syncing (3x): Adding ping mechanism to TaskManager for replacing StartHeight and PendingKnownHashes strategy #899

Merged
merged 33 commits into from
Dec 1, 2019
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
99ca3f2
Drop the duplicate NewTasks for the TaskManagerMailbox
yongjiema Jul 9, 2019
2048e7b
Try to get headers from remote nodes if the current header is not upd…
yongjiema Jul 9, 2019
079a74b
Merge branch 'master' into fix-sync-stuck
shargon Jul 9, 2019
755ab15
Update TaskManager.cs
shargon Jul 9, 2019
4ed0e22
Update TaskManager.cs
shargon Jul 9, 2019
4b410ad
Merge branch 'master' into fix-sync-stuck
vncoelho Jul 11, 2019
64dd8bd
Merge branch 'master' into fix-sync-stuck
shargon Jul 30, 2019
58d2ff0
Merge branch 'master' into fix-sync-stuck
vncoelho Aug 7, 2019
9e271e1
Merge branch 'master' into fix-sync-stuck
vncoelho Aug 9, 2019
7b32b10
Merge branch 'master' into fix-sync-stuck
shargon Aug 10, 2019
ae46f80
Merge branch 'master' into fix-sync-stuck
shargon Aug 12, 2019
70b2072
Merge branch 'master' into fix-sync-stuck
vncoelho Aug 12, 2019
b44e1eb
Try to get headers from remote nodes even the current header height i…
yongjiema Aug 13, 2019
bf01bab
Correct the timestamp usage according to NEO3
yongjiema Aug 16, 2019
261e7ff
Merge branch 'master' into fix-sync-stuck
erikzhang Aug 18, 2019
152a7cb
Merge branch 'master' into fix-sync-stuck
vncoelho Aug 19, 2019
cd5cb5f
Merge branch 'master' into fix-sync-stuck
vncoelho Aug 28, 2019
692a9ad
Merge branch 'master' into fix-sync-stuck
vncoelho Sep 24, 2019
11e811c
Merge branch 'master' into fix-sync-stuck
lock9 Sep 26, 2019
8196dff
Improve syncing
yongjiema Sep 29, 2019
0e21d78
Merge branch 'master' into fix-sync-stuck
vncoelho Oct 23, 2019
bd3b3fc
Merge branch 'master' into fix-sync-stuck
vncoelho Nov 4, 2019
fe2d98a
Merge branch 'master' into fix-sync-stuck
vncoelho Nov 10, 2019
100b30d
Merge branch 'master' into fix-sync-stuck
vncoelho Nov 13, 2019
ebe25e2
Merge branch 'master' into fix-sync-stuck
vncoelho Nov 13, 2019
fd0e712
Merge branch 'master' into fix-sync-stuck
lock9 Nov 13, 2019
9a73bf8
Merge branch 'master' into fix-sync-stuck
erikzhang Nov 25, 2019
04fdfa8
Update ProtocolHandler.cs
erikzhang Nov 25, 2019
1d985e0
Update ProtocolHandler.cs
erikzhang Nov 25, 2019
daf7141
move projects
erikzhang Nov 26, 2019
75f4470
Merge branch 'master' into fix-sync-stuck
erikzhang Nov 26, 2019
e18184e
Merge branch 'master' into fix-sync-stuck
vncoelho Nov 29, 2019
98f11f8
Merge branch 'master' into fix-sync-stuck
erikzhang Dec 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions neo/Network/P2P/ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Net;

Expand All @@ -19,24 +20,52 @@ namespace Neo.Network.P2P
internal class ProtocolHandler : UntypedActor
{
public class SetFilter { public BloomFilter Filter; }
internal class Timer { }

private class PendingKnownHashesCollection : KeyedCollection<UInt256, KeyValuePair<UInt256, DateTime>>
{
protected override UInt256 GetKeyForItem(KeyValuePair<UInt256, DateTime> item)
{
return item.Key;
}
}

private readonly NeoSystem system;
private readonly PendingKnownHashesCollection pendingKnownHashes;
private readonly FIFOSet<UInt256> knownHashes;
private readonly FIFOSet<UInt256> sentHashes;
private VersionPayload version;
private bool verack = false;
private BloomFilter bloom_filter;

private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30);
private static readonly TimeSpan PendingTimeout = TimeSpan.FromMinutes(1);

private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender);

public ProtocolHandler(NeoSystem system)
{
this.system = system;
this.pendingKnownHashes = new PendingKnownHashesCollection();
this.knownHashes = new FIFOSet<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2);
this.sentHashes = new FIFOSet<UInt256>(Blockchain.Singleton.MemPool.Capacity * 2);
}

protected override void OnReceive(object message)
{
if (!(message is Message msg)) return;
switch (message)
{
case Message msg:
OnMessage(msg);
break;
case Timer _:
OnTimer();
break;
}
}

private void OnMessage(Message msg)
{
foreach (IP2PPlugin plugin in Plugin.P2PPlugins)
if (!plugin.OnP2PMessage(msg))
return;
Expand Down Expand Up @@ -264,11 +293,13 @@ private void OnInventoryReceived(IInventory inventory)
{
system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }, Context.Parent);
system.LocalNode.Tell(new LocalNode.Relay { Inventory = inventory });
pendingKnownHashes.Remove(inventory.Hash);
knownHashes.Add(inventory.Hash);
}

private void OnInvMessageReceived(InvPayload payload)
{
UInt256[] hashes = payload.Hashes.Where(p => knownHashes.Add(p) && !sentHashes.Contains(p)).ToArray();
UInt256[] hashes = payload.Hashes.Where(p => !pendingKnownHashes.Contains(p) && !knownHashes.Contains(p) && !sentHashes.Contains(p)).ToArray();
if (hashes.Length == 0) return;
switch (payload.Type)
{
Expand All @@ -282,6 +313,8 @@ private void OnInvMessageReceived(InvPayload payload)
break;
}
if (hashes.Length == 0) return;
foreach (UInt256 hash in hashes)
pendingKnownHashes.Add(new KeyValuePair<UInt256, DateTime>(hash, DateTime.UtcNow));
system.TaskManager.Tell(new TaskManager.NewTasks { Payload = InvPayload.Create(payload.Type, hashes) }, Context.Parent);
}

Expand Down Expand Up @@ -314,6 +347,28 @@ private void OnVersionMessageReceived(VersionPayload payload)
Context.Parent.Tell(payload);
}

private void OnTimer()
{
RefreshPendingKnownHashes();
}

protected override void PostStop()
{
timer.CancelIfNotNull();
base.PostStop();
}

private void RefreshPendingKnownHashes()
{
while (pendingKnownHashes.Count > 0)
{
KeyValuePair<UInt256, DateTime> item = pendingKnownHashes.First();
if (DateTime.UtcNow - item.Value <= PendingTimeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use TimeProvider

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think the code is from the existing project, I did want to change it, but I found many places are using DateTime.UtcNow, so I keep it as the original version.

break;
pendingKnownHashes.Remove(item);
}
}

public static Props Props(NeoSystem system)
{
return Akka.Actor.Props.Create(() => new ProtocolHandler(system)).WithMailbox("protocol-handler-mailbox");
Expand Down Expand Up @@ -347,6 +402,7 @@ internal protected override bool IsHighPriority(object message)

internal protected override bool ShallDrop(object message, IEnumerable queue)
{
if (message is ProtocolHandler.Timer) return false;
if (!(message is Message msg)) return true;
switch (msg.Command)
{
Expand Down
3 changes: 3 additions & 0 deletions neo/Network/P2P/RemoteNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ protected override void OnReceive(object message)
private void OnPingPayload(PingPayload payload)
{
if (payload.LastBlockIndex > LastBlockIndex)
{
LastBlockIndex = payload.LastBlockIndex;
system.TaskManager.Tell(new TaskManager.Update { LastBlockIndex = LastBlockIndex });
}
}

private void OnRelay(IInventory inventory)
Expand Down
31 changes: 29 additions & 2 deletions neo/Network/P2P/TaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Neo.Ledger;
using Neo.Network.P2P.Payloads;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
Expand All @@ -14,6 +15,7 @@ namespace Neo.Network.P2P
internal class TaskManager : UntypedActor
{
public class Register { public VersionPayload Version; }
public class Update { public uint LastBlockIndex; }
public class NewTasks { public InvPayload Payload; }
public class TaskCompleted { public UInt256 Hash; }
public class HeaderTaskCompleted { }
Expand All @@ -25,6 +27,7 @@ private class Timer { }

private readonly NeoSystem system;
private const int MaxConncurrentTasks = 3;
private const int PingCoolingOffPeriod = 60; // in secconds.
private readonly FIFOSet<UInt256> knownHashes;
private readonly Dictionary<UInt256, int> globalTasks = new Dictionary<UInt256, int>();
private readonly Dictionary<IActorRef, TaskSession> sessions = new Dictionary<IActorRef, TaskSession>();
Expand Down Expand Up @@ -86,6 +89,9 @@ protected override void OnReceive(object message)
case Register register:
OnRegister(register.Version);
break;
case Update update:
OnUpdate(update.LastBlockIndex);
break;
case NewTasks tasks:
OnNewTasks(tasks.Payload);
break;
Expand Down Expand Up @@ -115,6 +121,13 @@ private void OnRegister(VersionPayload version)
RequestTasks(session);
}

private void OnUpdate(uint lastBlockIndex)
{
if (!sessions.TryGetValue(Sender, out TaskSession session))
return;
session.LastBlockIndex = lastBlockIndex;
}

private void OnRestartTasks(InvPayload payload)
{
knownHashes.ExceptWith(payload.Hashes);
Expand Down Expand Up @@ -221,13 +234,13 @@ private void RequestTasks(TaskSession session)
return;
}
}
if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.StartHeight)
if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.LastBlockIndex)
{
session.Tasks[HeaderTaskHash] = DateTime.UtcNow;
IncrementGlobalTask(HeaderTaskHash);
session.RemoteNode.Tell(Message.Create(MessageCommand.GetHeaders, GetBlocksPayload.Create(Blockchain.Singleton.CurrentHeaderHash)));
}
else if (Blockchain.Singleton.Height < session.StartHeight)
else if (Blockchain.Singleton.Height < session.LastBlockIndex)
{
UInt256 hash = Blockchain.Singleton.CurrentBlockHash;
for (uint i = Blockchain.Singleton.Height + 1; i <= Blockchain.Singleton.HeaderHeight; i++)
Expand All @@ -241,6 +254,11 @@ private void RequestTasks(TaskSession session)
}
session.RemoteNode.Tell(Message.Create(MessageCommand.GetBlocks, GetBlocksPayload.Create(hash)));
}
else if (Blockchain.Singleton.HeaderHeight >= session.LastBlockIndex
&& TimeProvider.Current.UtcNow.ToTimestamp() - PingCoolingOffPeriod >= Blockchain.Singleton.GetBlock(Blockchain.Singleton.CurrentHeaderHash)?.Timestamp)
{
session.RemoteNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Blockchain.Singleton.Height)));
}
}
}

Expand All @@ -256,6 +274,7 @@ internal protected override bool IsHighPriority(object message)
switch (message)
{
case TaskManager.Register _:
case TaskManager.Update _:
case TaskManager.RestartTasks _:
return true;
case TaskManager.NewTasks tasks:
Expand All @@ -266,5 +285,13 @@ internal protected override bool IsHighPriority(object message)
return false;
}
}

internal protected override bool ShallDrop(object message, IEnumerable queue)
{
if (!(message is TaskManager.NewTasks tasks)) return false;
// Remove duplicate tasks
if (queue.OfType<TaskManager.NewTasks>().Any(x => x.Payload.Type == tasks.Payload.Type && x.Payload.Hashes.SequenceEqual(tasks.Payload.Hashes))) return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should think more about this line.

Node A sends an inv for a block, and node B sends the same. In this case, the second message will be dropped. But if node A doesn't send block, we will stop sync.

Copy link
Member

@vncoelho vncoelho Aug 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O.o, makes sense, @erikzhang. Nice catch and visualization!
We are destroying master2x... Just kidding, I think it will be ok there for now! ahueahuea

Can we personalize this IEnumerable queue a little bit more? If we set timestamps for the objects. Maybe also some improvements could come on the management of ProtocolHandler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then maybe we need the duplicates, i think that with the FIFO's patch and the own messages patch, will be enought, it was tested with this both patches and without this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am testing a change for known hashes behaviour in ProtocolHandler related it, will share the code after the testing is done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the PR #1024 which is for 2x.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should remove this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, let's remove this and open it in another PR.

I think that we added it before on master2.x. But we can separate it here and analyse in another moment.

Copy link
Member

@vncoelho vncoelho Nov 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should think more about this line.

Node A sends an inv for a block, and node B sends the same. In this case, the second message will be dropped. But if node A doesn't send block, we will stop sync.

@erikzhang, however, maybe, with PendingHashes it will not happen infinitely, since the hash will expire.

return false;
}
}
}
2 changes: 2 additions & 0 deletions neo/Network/P2P/TaskSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class TaskSession

public bool HasTask => Tasks.Count > 0;
public uint StartHeight { get; }
public uint LastBlockIndex { get; set; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a summary for these properties?


public TaskSession(IActorRef node, VersionPayload version)
{
Expand All @@ -24,6 +25,7 @@ public TaskSession(IActorRef node, VersionPayload version)
this.StartHeight = version.Capabilities
.OfType<FullNodeCapability>()
.FirstOrDefault()?.StartHeight ?? 0;
this.LastBlockIndex = this.StartHeight;
}
}
}