diff --git a/src/neo/Ledger/Blockchain.cs b/src/neo/Ledger/Blockchain.cs index 2d2fafab04..b10a60e870 100644 --- a/src/neo/Ledger/Blockchain.cs +++ b/src/neo/Ledger/Blockchain.cs @@ -254,12 +254,19 @@ private void OnImport(IEnumerable blocks, bool verify) private void AddUnverifiedBlockToCache(Block block) { + // Check if any block proposal for height `block.Index` exists if (!block_cache_unverified.TryGetValue(block.Index, out LinkedList blocks)) { + // There are no blocks, a new LinkedList is created and, consequently, the current block is added to the list blocks = new LinkedList(); block_cache_unverified.Add(block.Index, blocks); } - + // Check if any block with the hash being added already exists on possible candidates to be processed + foreach (var unverifiedBlock in blocks) + { + if (block.Hash == unverifiedBlock.Hash) + return; + } blocks.AddLast(block); } @@ -310,54 +317,21 @@ private VerifyResult OnNewBlock(Block block) { if (block.Index <= Height) return VerifyResult.AlreadyExists; - if (block_cache.ContainsKey(block.Hash)) - return VerifyResult.AlreadyExists; - if (block.Index - 1 >= header_index.Count) + if (block.Index - 1 > Height) { AddUnverifiedBlockToCache(block); return VerifyResult.UnableToVerify; } - if (block.Index == header_index.Count) + if (block.Index == Height + 1) { if (!block.Verify(currentSnapshot)) return VerifyResult.Invalid; - } - else - { - if (!block.Hash.Equals(header_index[(int)block.Index])) - return VerifyResult.Invalid; - } - block_cache.TryAdd(block.Hash, block); - if (block.Index == Height + 1) - { - Block block_persist = block; - List blocksToPersistList = new List(); - while (true) - { - blocksToPersistList.Add(block_persist); - if (block_persist.Index + 1 >= header_index.Count) break; - UInt256 hash = header_index[(int)block_persist.Index + 1]; - if (!block_cache.TryGetValue(hash, out block_persist)) break; - } - - int blocksPersisted = 0; - foreach (Block blockToPersist in blocksToPersistList) - { - block_cache_unverified.Remove(blockToPersist.Index); - Persist(blockToPersist); - - // 15000 is the default among of seconds per block, while MilliSecondsPerBlock is the current - uint extraBlocks = (15000 - MillisecondsPerBlock) / 1000; - - if (blocksPersisted++ < blocksToPersistList.Count - (2 + Math.Max(0, extraBlocks))) continue; - // Empirically calibrated for relaying the most recent 2 blocks persisted with 15s network - // Increase in the rate of 1 block per second in configurations with faster blocks - - if (blockToPersist.Index + 100 >= header_index.Count) - system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = blockToPersist }); - } + block_cache.TryAdd(block.Hash, block); + block_cache_unverified.Remove(block.Index); + // We can store the new block in block_cache and tell the new height to other nodes before Persist(). + system.LocalNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Singleton.Height + 1))); + Persist(block); SaveHeaderHashList(); - if (block_cache_unverified.TryGetValue(Height + 1, out LinkedList unverifiedBlocks)) { foreach (var unverifiedBlock in unverifiedBlocks) @@ -365,47 +339,9 @@ private VerifyResult OnNewBlock(Block block) block_cache_unverified.Remove(Height + 1); } } - else - { - if (block.Index + 100 >= header_index.Count) - system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = block }); - if (block.Index == header_index.Count) - { - header_index.Add(block.Hash); - using (SnapshotView snapshot = GetSnapshot()) - { - snapshot.Blocks.Add(block.Hash, block.Header.Trim()); - snapshot.HeaderHashIndex.GetAndChange().Set(block); - SaveHeaderHashList(snapshot); - snapshot.Commit(); - } - UpdateCurrentSnapshot(); - } - } return VerifyResult.Succeed; } - private void OnNewHeaders(Header[] headers) - { - using (SnapshotView snapshot = GetSnapshot()) - { - foreach (Header header in headers) - { - if (header.Index - 1 >= header_index.Count) break; - if (header.Index < header_index.Count) continue; - if (!header.Verify(snapshot)) break; - header_index.Add(header.Hash); - snapshot.Blocks.Add(header.Hash, header.Trim()); - snapshot.HeaderHashIndex.GetAndChange().Hash = header.Hash; - snapshot.HeaderHashIndex.GetAndChange().Index = header.Index; - } - SaveHeaderHashList(snapshot); - snapshot.Commit(); - } - UpdateCurrentSnapshot(); - system.TaskManager.Tell(new TaskManager.HeaderTaskCompleted(), Sender); - } - private VerifyResult OnNewInventory(IInventory inventory) { if (!inventory.Verify(currentSnapshot)) return VerifyResult.Invalid; @@ -433,9 +369,6 @@ protected override void OnReceive(object message) case FillMemoryPool fill: OnFillMemoryPool(fill.Transactions); break; - case Header[] headers: - OnNewHeaders(headers); - break; case Block block: OnInventory(block, false); break; @@ -459,6 +392,11 @@ private void Persist(Block block) { using (SnapshotView snapshot = GetSnapshot()) { + if (block.Index == header_index.Count) + { + header_index.Add(block.Hash); + snapshot.HeaderHashIndex.GetAndChange().Set(block); + } List all_application_executed = new List(); snapshot.PersistingBlock = block; if (block.Index > 0) @@ -504,11 +442,6 @@ private void Persist(Block block) } } snapshot.BlockHashIndex.GetAndChange().Set(block); - if (block.Index == header_index.Count) - { - header_index.Add(block.Hash); - snapshot.HeaderHashIndex.GetAndChange().Set(block); - } foreach (IPersistencePlugin plugin in Plugin.PersistencePlugins) plugin.OnPersist(snapshot, all_application_executed); snapshot.Commit(); @@ -590,7 +523,6 @@ internal protected override bool IsHighPriority(object message) { switch (message) { - case Header[] _: case Block _: case ConsensusPayload _: case Terminated _: diff --git a/src/neo/Network/P2P/MessageCommand.cs b/src/neo/Network/P2P/MessageCommand.cs index 771a98a459..89c21597cf 100644 --- a/src/neo/Network/P2P/MessageCommand.cs +++ b/src/neo/Network/P2P/MessageCommand.cs @@ -20,7 +20,7 @@ public enum MessageCommand : byte Pong = 0x19, //synchronization - [ReflectionCache(typeof(GetBlocksPayload))] + [ReflectionCache(typeof(GetBlockByIndexPayload))] GetHeaders = 0x20, [ReflectionCache(typeof(HeadersPayload))] Headers = 0x21, @@ -31,9 +31,8 @@ public enum MessageCommand : byte Inv = 0x27, [ReflectionCache(typeof(InvPayload))] GetData = 0x28, - [ReflectionCache(typeof(GetBlockDataPayload))] - GetBlockData = 0x29, - [ReflectionCache(typeof(InvPayload))] + [ReflectionCache(typeof(GetBlockByIndexPayload))] + GetBlockByIndex = 0x29, NotFound = 0x2a, [ReflectionCache(typeof(Transaction))] Transaction = 0x2b, diff --git a/src/neo/Network/P2P/Payloads/GetBlockDataPayload.cs b/src/neo/Network/P2P/Payloads/GetBlockByIndexPayload.cs similarity index 54% rename from src/neo/Network/P2P/Payloads/GetBlockDataPayload.cs rename to src/neo/Network/P2P/Payloads/GetBlockByIndexPayload.cs index 9ccd534f34..c07a8a8c0a 100644 --- a/src/neo/Network/P2P/Payloads/GetBlockDataPayload.cs +++ b/src/neo/Network/P2P/Payloads/GetBlockByIndexPayload.cs @@ -4,17 +4,16 @@ namespace Neo.Network.P2P.Payloads { - public class GetBlockDataPayload : ISerializable + public class GetBlockByIndexPayload : ISerializable { - private const ushort MaxBlocksCount = 500; public uint IndexStart; - public ushort Count; + public short Count; - public int Size => sizeof(uint) + sizeof(ushort); + public int Size => sizeof(uint) + sizeof(short); - public static GetBlockDataPayload Create(uint index_start, ushort count) + public static GetBlockByIndexPayload Create(uint index_start, short count = -1) { - return new GetBlockDataPayload + return new GetBlockByIndexPayload { IndexStart = index_start, Count = count @@ -24,8 +23,9 @@ public static GetBlockDataPayload Create(uint index_start, ushort count) void ISerializable.Deserialize(BinaryReader reader) { IndexStart = reader.ReadUInt32(); - Count = reader.ReadUInt16(); - if (Count == 0 || Count > MaxBlocksCount) throw new FormatException(); + Count = reader.ReadInt16(); + if (Count < -1 || Count == 0 || Count > HeadersPayload.MaxHeadersCount) + throw new FormatException(); } void ISerializable.Serialize(BinaryWriter writer) diff --git a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs index bdf235bdc0..3a55d84347 100644 --- a/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs +++ b/src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs @@ -82,17 +82,14 @@ private void OnMessage(Message msg) case MessageCommand.GetBlocks: OnGetBlocksMessageReceived((GetBlocksPayload)msg.Payload); break; - case MessageCommand.GetBlockData: - OnGetBlockDataMessageReceived((GetBlockDataPayload)msg.Payload); + case MessageCommand.GetBlockByIndex: + OnGetBlockByIndexMessageReceived((GetBlockByIndexPayload)msg.Payload); break; case MessageCommand.GetData: OnGetDataMessageReceived((InvPayload)msg.Payload); break; case MessageCommand.GetHeaders: - OnGetHeadersMessageReceived((GetBlocksPayload)msg.Payload); - break; - case MessageCommand.Headers: - OnHeadersMessageReceived((HeadersPayload)msg.Payload); + OnGetHeadersMessageReceived((GetBlockByIndexPayload)msg.Payload); break; case MessageCommand.Inv: OnInvMessageReceived((InvPayload)msg.Payload); @@ -114,6 +111,7 @@ private void OnMessage(Message msg) case MessageCommand.Version: throw new ProtocolViolationException(); case MessageCommand.Alert: + case MessageCommand.Headers: case MessageCommand.MerkleBlock: case MessageCommand.NotFound: case MessageCommand.Reject: @@ -189,9 +187,10 @@ private void OnGetBlocksMessageReceived(GetBlocksPayload payload) EnqueueMessage(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray()))); } - private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload) + private void OnGetBlockByIndexMessageReceived(GetBlockByIndexPayload payload) { - for (uint i = payload.IndexStart, max = payload.IndexStart + payload.Count; i < max; i++) + uint count = payload.Count == -1 ? InvPayload.MaxHashesCount : Math.Min((uint)payload.Count, InvPayload.MaxHashesCount); + for (uint i = payload.IndexStart, max = payload.IndexStart + count; i < max; i++) { Block block = Blockchain.Singleton.GetBlock(i); if (block == null) @@ -264,24 +263,20 @@ private void OnGetDataMessageReceived(InvPayload payload) /// /// Will be triggered when a MessageCommand.GetHeaders message is received. - /// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor. + /// Tell the specified number of blocks' headers starting with the requested IndexStart to RemoteNode actor. /// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count. /// - /// A GetBlocksPayload including start block Hash and number of blocks' headers requested. - private void OnGetHeadersMessageReceived(GetBlocksPayload payload) + /// A GetBlocksPayload including start block index and number of blocks' headers requested. + private void OnGetHeadersMessageReceived(GetBlockByIndexPayload payload) { - UInt256 hash = payload.HashStart; - int count = payload.Count < 0 || payload.Count > HeadersPayload.MaxHeadersCount ? HeadersPayload.MaxHeadersCount : payload.Count; - DataCache cache = Blockchain.Singleton.View.Blocks; - TrimmedBlock state = cache.TryGet(hash); - if (state == null) return; + uint index = payload.IndexStart; + uint count = payload.Count == -1 ? HeadersPayload.MaxHeadersCount : (uint)payload.Count; + if (index > Blockchain.Singleton.HeaderHeight) + return; List
headers = new List
(); - for (uint i = 1; i <= count; i++) + for (uint i = 0; i < count; i++) { - uint index = state.Index + i; - hash = Blockchain.Singleton.GetBlockHash(index); - if (hash == null) break; - Header header = cache.TryGet(hash)?.Header; + var header = Blockchain.Singleton.GetHeader(index + i); if (header == null) break; headers.Add(header); } @@ -289,15 +284,9 @@ private void OnGetHeadersMessageReceived(GetBlocksPayload payload) EnqueueMessage(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray()))); } - private void OnHeadersMessageReceived(HeadersPayload payload) - { - if (payload.Headers.Length == 0) return; - system.Blockchain.Tell(payload.Headers); - } - private void OnInventoryReceived(IInventory inventory) { - system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash }); + system.TaskManager.Tell(inventory); if (inventory is Transaction transaction) system.Consensus?.Tell(transaction); system.Blockchain.Tell(inventory, ActorRefs.NoSender); diff --git a/src/neo/Network/P2P/TaskManager.cs b/src/neo/Network/P2P/TaskManager.cs index 442efc8a68..3eab30ba2e 100644 --- a/src/neo/Network/P2P/TaskManager.cs +++ b/src/neo/Network/P2P/TaskManager.cs @@ -17,43 +17,75 @@ internal class TaskManager : UntypedActor public class Register { public VersionPayload Version; } public class Update { public uint LastBlockIndex; } public class NewTasks { public InvPayload Payload; } - public class TaskCompleted { public UInt256 Hash; } - public class HeaderTaskCompleted { } public class RestartTasks { public InvPayload Payload; } private class Timer { } private static readonly TimeSpan TimerInterval = TimeSpan.FromSeconds(30); private static readonly TimeSpan TaskTimeout = TimeSpan.FromMinutes(1); - private static readonly UInt256 HeaderTaskHash = UInt256.Zero; private static readonly UInt256 MemPoolTaskHash = UInt256.Parse("0x0000000000000000000000000000000000000000000000000000000000000001"); - private readonly NeoSystem system; private const int MaxConncurrentTasks = 3; - + private const int MaxSyncTasksCount = 50; private const int PingCoolingOffPeriod = 60_000; // in ms. + + private readonly NeoSystem system; /// /// A set of known hashes, of inventories or payloads, already received. - /// + /// private readonly HashSetCache knownHashes; private readonly Dictionary globalTasks = new Dictionary(); + private readonly Dictionary receivedBlockIndex = new Dictionary(); + private readonly HashSet failedSyncTasks = new HashSet(); private readonly Dictionary sessions = new Dictionary(); private readonly ICancelable timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimerInterval, TimerInterval, Context.Self, new Timer(), ActorRefs.NoSender); - - private bool HasHeaderTask => globalTasks.ContainsKey(HeaderTaskHash); + private uint lastTaskIndex = 0; public TaskManager(NeoSystem system) { this.system = system; this.knownHashes = new HashSetCache(Blockchain.Singleton.MemPool.Capacity * 2 / 5); + this.lastTaskIndex = Blockchain.Singleton.Height; + Context.System.EventStream.Subscribe(Self, typeof(Blockchain.PersistCompleted)); } - private void OnHeaderTaskCompleted() + private bool AssignSyncTask(uint index, TaskSession filterSession = null) { - if (!sessions.TryGetValue(Sender, out TaskSession session)) - return; - session.Tasks.Remove(HeaderTaskHash); - DecrementGlobalTask(HeaderTaskHash); - RequestTasks(session); + if (index <= Blockchain.Singleton.Height || sessions.Values.Any(p => p != filterSession && p.IndexTasks.ContainsKey(index))) + return true; + Random rand = new Random(); + KeyValuePair remoteNode = sessions.Where(p => p.Value != filterSession && p.Value.LastBlockIndex >= index) + .OrderBy(p => p.Value.IndexTasks.Count) + .ThenBy(s => rand.Next()) + .FirstOrDefault(); + if (remoteNode.Value == null) + { + failedSyncTasks.Add(index); + return false; + } + TaskSession session = remoteNode.Value; + session.IndexTasks.TryAdd(index, TimeProvider.Current.UtcNow); + remoteNode.Key.Tell(Message.Create(MessageCommand.GetBlockByIndex, GetBlockByIndexPayload.Create(index, 1))); + failedSyncTasks.Remove(index); + return true; + } + + private void OnBlock(Block block) + { + var session = sessions.Values.FirstOrDefault(p => p.IndexTasks.ContainsKey(block.Index)); + if (session is null) return; + session.IndexTasks.Remove(block.Index); + receivedBlockIndex.TryAdd(block.Index, session); + RequestTasks(); + } + + private void OnInvalidBlock(Block invalidBlock) + { + receivedBlockIndex.TryGetValue(invalidBlock.Index, out TaskSession session); + if (session is null) return; + session.InvalidBlockCount++; + session.IndexTasks.Remove(invalidBlock.Index); + receivedBlockIndex.Remove(invalidBlock.Index); + AssignSyncTask(invalidBlock.Index, session); } private void OnNewTasks(InvPayload payload) @@ -61,37 +93,33 @@ private void OnNewTasks(InvPayload payload) if (!sessions.TryGetValue(Sender, out TaskSession session)) return; // Do not accept payload of type InventoryType.TX if not synced on best known HeaderHeight - if (payload.Type == InventoryType.TX && Blockchain.Singleton.Height < Blockchain.Singleton.HeaderHeight) - { - RequestTasks(session); + if (payload.Type == InventoryType.TX && Blockchain.Singleton.Height < sessions.Values.Max(p => p.LastBlockIndex)) return; - } HashSet hashes = new HashSet(payload.Hashes); // Remove all previously processed knownHashes from the list that is being requested hashes.Remove(knownHashes); - // Add to AvailableTasks the ones, of type InventoryType.Block, that are global (already under process by other sessions) - if (payload.Type == InventoryType.Block) - session.AvailableTasks.UnionWith(hashes.Where(p => globalTasks.ContainsKey(p))); // Remove those that are already in process by other sessions hashes.Remove(globalTasks); if (hashes.Count == 0) - { - RequestTasks(session); return; - } // Update globalTasks with the ones that will be requested within this current session foreach (UInt256 hash in hashes) { IncrementGlobalTask(hash); - session.Tasks[hash] = DateTime.UtcNow; + session.InvTasks[hash] = DateTime.UtcNow; } foreach (InvPayload group in InvPayload.CreateGroup(payload.Type, hashes.ToArray())) Sender.Tell(Message.Create(MessageCommand.GetData, group)); } + private void OnPersistCompleted(Block block) + { + receivedBlockIndex.Remove(block.Index); + } + protected override void OnReceive(object message) { switch (message) @@ -105,15 +133,22 @@ protected override void OnReceive(object message) case NewTasks tasks: OnNewTasks(tasks.Payload); break; - case TaskCompleted completed: - OnTaskCompleted(completed.Hash); - break; - case HeaderTaskCompleted _: - OnHeaderTaskCompleted(); - break; case RestartTasks restart: OnRestartTasks(restart.Payload); break; + case Block block: + OnBlock(block); + break; + case IInventory inventory: + OnTaskCompleted(inventory.Hash); + break; + case Blockchain.PersistCompleted pc: + OnPersistCompleted(pc.Block); + break; + case Blockchain.RelayResult rr: + if (rr.Inventory is Block invalidBlock && rr.Result == VerifyResult.Invalid) + OnInvalidBlock(invalidBlock); + break; case Timer _: OnTimer(); break; @@ -126,11 +161,11 @@ protected override void OnReceive(object message) private void OnRegister(VersionPayload version) { Context.Watch(Sender); - TaskSession session = new TaskSession(Sender, version); + TaskSession session = new TaskSession(version); if (session.IsFullNode) - session.AvailableTasks.Add(TaskManager.MemPoolTaskHash); - sessions.Add(Sender, session); - RequestTasks(session); + session.InvTasks.TryAdd(MemPoolTaskHash, TimeProvider.Current.UtcNow); + sessions.TryAdd(Sender, session); + RequestTasks(); } private void OnUpdate(uint lastBlockIndex) @@ -138,6 +173,7 @@ private void OnUpdate(uint lastBlockIndex) if (!sessions.TryGetValue(Sender, out TaskSession session)) return; session.LastBlockIndex = lastBlockIndex; + RequestTasks(); } private void OnRestartTasks(InvPayload payload) @@ -153,13 +189,8 @@ private void OnTaskCompleted(UInt256 hash) { knownHashes.Add(hash); globalTasks.Remove(hash); - foreach (TaskSession ms in sessions.Values) - ms.AvailableTasks.Remove(hash); if (sessions.TryGetValue(Sender, out TaskSession session)) - { - session.Tasks.Remove(hash); - RequestTasks(session); - } + session.InvTasks.Remove(hash); } [MethodImpl(MethodImplOptions.AggressiveInlining)] @@ -193,22 +224,38 @@ private void OnTerminated(IActorRef actor) { if (!sessions.TryGetValue(actor, out TaskSession session)) return; - sessions.Remove(actor); - foreach (UInt256 hash in session.Tasks.Keys) + foreach (uint index in session.IndexTasks.Keys) + AssignSyncTask(index, session); + + foreach (UInt256 hash in session.InvTasks.Keys) DecrementGlobalTask(hash); + sessions.Remove(actor); } private void OnTimer() { foreach (TaskSession session in sessions.Values) - foreach (var task in session.Tasks.ToArray()) + { + foreach (KeyValuePair kvp in session.IndexTasks) + { + if (TimeProvider.Current.UtcNow - kvp.Value > TaskTimeout) + { + session.IndexTasks.Remove(kvp.Key); + session.TimeoutTimes++; + AssignSyncTask(kvp.Key, session); + } + } + + foreach (var task in session.InvTasks.ToArray()) + { if (DateTime.UtcNow - task.Value > TaskTimeout) { - if (session.Tasks.Remove(task.Key)) + if (session.InvTasks.Remove(task.Key)) DecrementGlobalTask(task.Key); } - foreach (TaskSession session in sessions.Values) - RequestTasks(session); + } + } + RequestTasks(); } protected override void PostStop() @@ -222,64 +269,47 @@ public static Props Props(NeoSystem system) return Akka.Actor.Props.Create(() => new TaskManager(system)).WithMailbox("task-manager-mailbox"); } - private void RequestTasks(TaskSession session) + private void RequestTasks() { - if (session.HasTask) return; - // If there are pending tasks of InventoryType.Block we should process them - if (session.AvailableTasks.Count > 0) + if (sessions.Count() == 0) return; + + SendPingMessage(); + + while (failedSyncTasks.Count() > 0) { - session.AvailableTasks.Remove(knownHashes); - // Search any similar hash that is on Singleton's knowledge, which means, on the way or already processed - session.AvailableTasks.RemoveWhere(p => Blockchain.Singleton.ContainsBlock(p)); - HashSet hashes = new HashSet(session.AvailableTasks); - hashes.Remove(MemPoolTaskHash); - if (hashes.Count > 0) + var failedTask = failedSyncTasks.First(); + if (failedTask <= Blockchain.Singleton.Height) { - foreach (UInt256 hash in hashes.ToArray()) - { - if (!IncrementGlobalTask(hash)) - hashes.Remove(hash); - } - session.AvailableTasks.Remove(hashes); - foreach (UInt256 hash in hashes) - session.Tasks[hash] = DateTime.UtcNow; - foreach (InvPayload group in InvPayload.CreateGroup(InventoryType.Block, hashes.ToArray())) - session.RemoteNode.Tell(Message.Create(MessageCommand.GetData, group)); - return; + failedSyncTasks.Remove(failedTask); + continue; } + if (!AssignSyncTask(failedTask)) return; } - // When the number of AvailableTasks is no more than 0, no pending tasks of InventoryType.Block, it should process pending the tasks of headers - // If not HeaderTask pending to be processed it should ask for more Blocks - if ((!HasHeaderTask || globalTasks[HeaderTaskHash] < MaxConncurrentTasks) && Blockchain.Singleton.HeaderHeight < session.LastBlockIndex) + int taskCounts = sessions.Values.Sum(p => p.IndexTasks.Count); + var highestBlockIndex = sessions.Values.Max(p => p.LastBlockIndex); + for (; taskCounts < MaxSyncTasksCount; taskCounts++) { - session.Tasks[HeaderTaskHash] = DateTime.UtcNow; - IncrementGlobalTask(HeaderTaskHash); - session.RemoteNode.Tell(Message.Create(MessageCommand.GetHeaders, GetBlocksPayload.Create(Blockchain.Singleton.CurrentHeaderHash))); + if (lastTaskIndex >= highestBlockIndex) break; + if (!AssignSyncTask(++lastTaskIndex)) break; } - else if (Blockchain.Singleton.Height < session.LastBlockIndex) + } + + private void SendPingMessage() + { + foreach (KeyValuePair item in sessions) { - UInt256 hash = Blockchain.Singleton.CurrentBlockHash; - for (uint i = Blockchain.Singleton.Height + 1; i <= Blockchain.Singleton.HeaderHeight; i++) + var node = item.Key; + var session = item.Value; + if (Blockchain.Singleton.Height >= session.LastBlockIndex + && TimeProvider.Current.UtcNow.ToTimestampMS() - PingCoolingOffPeriod >= Blockchain.Singleton.GetBlock(Blockchain.Singleton.CurrentBlockHash)?.Timestamp) { - hash = Blockchain.Singleton.GetBlockHash(i); - if (!globalTasks.ContainsKey(hash)) + if (session.InvTasks.Remove(MemPoolTaskHash)) { - hash = Blockchain.Singleton.GetBlockHash(i - 1); - break; + node.Tell(Message.Create(MessageCommand.Mempool)); } + node.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Blockchain.Singleton.Height))); } - session.RemoteNode.Tell(Message.Create(MessageCommand.GetBlocks, GetBlocksPayload.Create(hash))); - } - else if (Blockchain.Singleton.HeaderHeight >= session.LastBlockIndex - && TimeProvider.Current.UtcNow.ToTimestampMS() - PingCoolingOffPeriod >= Blockchain.Singleton.GetBlock(Blockchain.Singleton.CurrentHeaderHash)?.Timestamp) - { - if (session.AvailableTasks.Remove(MemPoolTaskHash)) - { - session.RemoteNode.Tell(Message.Create(MessageCommand.Mempool)); - } - - session.RemoteNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Blockchain.Singleton.Height))); } } } @@ -296,7 +326,6 @@ internal protected override bool IsHighPriority(object message) switch (message) { case TaskManager.Register _: - case TaskManager.Update _: case TaskManager.RestartTasks _: return true; case TaskManager.NewTasks tasks: diff --git a/src/neo/Network/P2P/TaskSession.cs b/src/neo/Network/P2P/TaskSession.cs index d13c7c82f5..859f282d35 100644 --- a/src/neo/Network/P2P/TaskSession.cs +++ b/src/neo/Network/P2P/TaskSession.cs @@ -1,4 +1,3 @@ -using Akka.Actor; using Neo.Network.P2P.Capabilities; using Neo.Network.P2P.Payloads; using System; @@ -9,25 +8,19 @@ namespace Neo.Network.P2P { internal class TaskSession { - public readonly IActorRef RemoteNode; - public readonly VersionPayload Version; - public readonly Dictionary Tasks = new Dictionary(); - public readonly HashSet AvailableTasks = new HashSet(); + public readonly Dictionary InvTasks = new Dictionary(); + public readonly Dictionary IndexTasks = new Dictionary(); - public bool HasTask => Tasks.Count > 0; - public uint StartHeight { get; } public bool IsFullNode { get; } public uint LastBlockIndex { get; set; } + public uint TimeoutTimes = 0; + public uint InvalidBlockCount = 0; - public TaskSession(IActorRef node, VersionPayload version) + public TaskSession(VersionPayload version) { var fullNode = version.Capabilities.OfType().FirstOrDefault(); - this.IsFullNode = fullNode != null; - this.RemoteNode = node; - this.Version = version; - this.StartHeight = fullNode?.StartHeight ?? 0; - this.LastBlockIndex = this.StartHeight; + this.LastBlockIndex = fullNode.StartHeight; } } }