Skip to content

Commit

Permalink
GetBlocks by block index (neo-project#1397)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShawnYun authored and Tommo-L committed Jun 22, 2020
1 parent 50684b8 commit 80ec37f
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 234 deletions.
108 changes: 20 additions & 88 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,19 @@ private void OnImport(IEnumerable<Block> blocks, bool verify)

private void AddUnverifiedBlockToCache(Block block)
{
// Check if any block proposal for height `block.Index` exists
if (!block_cache_unverified.TryGetValue(block.Index, out LinkedList<Block> blocks))
{
// There are no blocks, a new LinkedList is created and, consequently, the current block is added to the list
blocks = new LinkedList<Block>();
block_cache_unverified.Add(block.Index, blocks);
}

// Check if any block with the hash being added already exists on possible candidates to be processed
foreach (var unverifiedBlock in blocks)
{
if (block.Hash == unverifiedBlock.Hash)
return;
}
blocks.AddLast(block);
}

Expand Down Expand Up @@ -310,102 +317,31 @@ private VerifyResult OnNewBlock(Block block)
{
if (block.Index <= Height)
return VerifyResult.AlreadyExists;
if (block_cache.ContainsKey(block.Hash))
return VerifyResult.AlreadyExists;
if (block.Index - 1 >= header_index.Count)
if (block.Index - 1 > Height)
{
AddUnverifiedBlockToCache(block);
return VerifyResult.UnableToVerify;
}
if (block.Index == header_index.Count)
if (block.Index == Height + 1)
{
if (!block.Verify(currentSnapshot))
return VerifyResult.Invalid;
}
else
{
if (!block.Hash.Equals(header_index[(int)block.Index]))
return VerifyResult.Invalid;
}
block_cache.TryAdd(block.Hash, block);
if (block.Index == Height + 1)
{
Block block_persist = block;
List<Block> blocksToPersistList = new List<Block>();
while (true)
{
blocksToPersistList.Add(block_persist);
if (block_persist.Index + 1 >= header_index.Count) break;
UInt256 hash = header_index[(int)block_persist.Index + 1];
if (!block_cache.TryGetValue(hash, out block_persist)) break;
}

int blocksPersisted = 0;
foreach (Block blockToPersist in blocksToPersistList)
{
block_cache_unverified.Remove(blockToPersist.Index);
Persist(blockToPersist);

// 15000 is the default among of seconds per block, while MilliSecondsPerBlock is the current
uint extraBlocks = (15000 - MillisecondsPerBlock) / 1000;

if (blocksPersisted++ < blocksToPersistList.Count - (2 + Math.Max(0, extraBlocks))) continue;
// Empirically calibrated for relaying the most recent 2 blocks persisted with 15s network
// Increase in the rate of 1 block per second in configurations with faster blocks

if (blockToPersist.Index + 100 >= header_index.Count)
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = blockToPersist });
}
block_cache.TryAdd(block.Hash, block);
block_cache_unverified.Remove(block.Index);
// We can store the new block in block_cache and tell the new height to other nodes before Persist().
system.LocalNode.Tell(Message.Create(MessageCommand.Ping, PingPayload.Create(Singleton.Height + 1)));
Persist(block);
SaveHeaderHashList();

if (block_cache_unverified.TryGetValue(Height + 1, out LinkedList<Block> unverifiedBlocks))
{
foreach (var unverifiedBlock in unverifiedBlocks)
Self.Tell(unverifiedBlock, ActorRefs.NoSender);
block_cache_unverified.Remove(Height + 1);
}
}
else
{
if (block.Index + 100 >= header_index.Count)
system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = block });
if (block.Index == header_index.Count)
{
header_index.Add(block.Hash);
using (SnapshotView snapshot = GetSnapshot())
{
snapshot.Blocks.Add(block.Hash, block.Header.Trim());
snapshot.HeaderHashIndex.GetAndChange().Set(block);
SaveHeaderHashList(snapshot);
snapshot.Commit();
}
UpdateCurrentSnapshot();
}
}
return VerifyResult.Succeed;
}

private void OnNewHeaders(Header[] headers)
{
using (SnapshotView snapshot = GetSnapshot())
{
foreach (Header header in headers)
{
if (header.Index - 1 >= header_index.Count) break;
if (header.Index < header_index.Count) continue;
if (!header.Verify(snapshot)) break;
header_index.Add(header.Hash);
snapshot.Blocks.Add(header.Hash, header.Trim());
snapshot.HeaderHashIndex.GetAndChange().Hash = header.Hash;
snapshot.HeaderHashIndex.GetAndChange().Index = header.Index;
}
SaveHeaderHashList(snapshot);
snapshot.Commit();
}
UpdateCurrentSnapshot();
system.TaskManager.Tell(new TaskManager.HeaderTaskCompleted(), Sender);
}

private VerifyResult OnNewInventory(IInventory inventory)
{
if (!inventory.Verify(currentSnapshot)) return VerifyResult.Invalid;
Expand Down Expand Up @@ -433,9 +369,6 @@ protected override void OnReceive(object message)
case FillMemoryPool fill:
OnFillMemoryPool(fill.Transactions);
break;
case Header[] headers:
OnNewHeaders(headers);
break;
case Block block:
OnInventory(block, false);
break;
Expand All @@ -459,6 +392,11 @@ private void Persist(Block block)
{
using (SnapshotView snapshot = GetSnapshot())
{
if (block.Index == header_index.Count)
{
header_index.Add(block.Hash);
snapshot.HeaderHashIndex.GetAndChange().Set(block);
}
List<ApplicationExecuted> all_application_executed = new List<ApplicationExecuted>();
snapshot.PersistingBlock = block;
if (block.Index > 0)
Expand Down Expand Up @@ -504,11 +442,6 @@ private void Persist(Block block)
}
}
snapshot.BlockHashIndex.GetAndChange().Set(block);
if (block.Index == header_index.Count)
{
header_index.Add(block.Hash);
snapshot.HeaderHashIndex.GetAndChange().Set(block);
}
foreach (IPersistencePlugin plugin in Plugin.PersistencePlugins)
plugin.OnPersist(snapshot, all_application_executed);
snapshot.Commit();
Expand Down Expand Up @@ -590,7 +523,6 @@ internal protected override bool IsHighPriority(object message)
{
switch (message)
{
case Header[] _:
case Block _:
case ConsensusPayload _:
case Terminated _:
Expand Down
7 changes: 3 additions & 4 deletions src/neo/Network/P2P/MessageCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public enum MessageCommand : byte
Pong = 0x19,

//synchronization
[ReflectionCache(typeof(GetBlocksPayload))]
[ReflectionCache(typeof(GetBlockByIndexPayload))]
GetHeaders = 0x20,
[ReflectionCache(typeof(HeadersPayload))]
Headers = 0x21,
Expand All @@ -31,9 +31,8 @@ public enum MessageCommand : byte
Inv = 0x27,
[ReflectionCache(typeof(InvPayload))]
GetData = 0x28,
[ReflectionCache(typeof(GetBlockDataPayload))]
GetBlockData = 0x29,
[ReflectionCache(typeof(InvPayload))]
[ReflectionCache(typeof(GetBlockByIndexPayload))]
GetBlockByIndex = 0x29,
NotFound = 0x2a,
[ReflectionCache(typeof(Transaction))]
Transaction = 0x2b,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@

namespace Neo.Network.P2P.Payloads
{
public class GetBlockDataPayload : ISerializable
public class GetBlockByIndexPayload : ISerializable
{
private const ushort MaxBlocksCount = 500;
public uint IndexStart;
public ushort Count;
public short Count;

public int Size => sizeof(uint) + sizeof(ushort);
public int Size => sizeof(uint) + sizeof(short);

public static GetBlockDataPayload Create(uint index_start, ushort count)
public static GetBlockByIndexPayload Create(uint index_start, short count = -1)
{
return new GetBlockDataPayload
return new GetBlockByIndexPayload
{
IndexStart = index_start,
Count = count
Expand All @@ -24,8 +23,9 @@ public static GetBlockDataPayload Create(uint index_start, ushort count)
void ISerializable.Deserialize(BinaryReader reader)
{
IndexStart = reader.ReadUInt32();
Count = reader.ReadUInt16();
if (Count == 0 || Count > MaxBlocksCount) throw new FormatException();
Count = reader.ReadInt16();
if (Count < -1 || Count == 0 || Count > HeadersPayload.MaxHeadersCount)
throw new FormatException();
}

void ISerializable.Serialize(BinaryWriter writer)
Expand Down
45 changes: 17 additions & 28 deletions src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,14 @@ private void OnMessage(Message msg)
case MessageCommand.GetBlocks:
OnGetBlocksMessageReceived((GetBlocksPayload)msg.Payload);
break;
case MessageCommand.GetBlockData:
OnGetBlockDataMessageReceived((GetBlockDataPayload)msg.Payload);
case MessageCommand.GetBlockByIndex:
OnGetBlockByIndexMessageReceived((GetBlockByIndexPayload)msg.Payload);
break;
case MessageCommand.GetData:
OnGetDataMessageReceived((InvPayload)msg.Payload);
break;
case MessageCommand.GetHeaders:
OnGetHeadersMessageReceived((GetBlocksPayload)msg.Payload);
break;
case MessageCommand.Headers:
OnHeadersMessageReceived((HeadersPayload)msg.Payload);
OnGetHeadersMessageReceived((GetBlockByIndexPayload)msg.Payload);
break;
case MessageCommand.Inv:
OnInvMessageReceived((InvPayload)msg.Payload);
Expand All @@ -114,6 +111,7 @@ private void OnMessage(Message msg)
case MessageCommand.Version:
throw new ProtocolViolationException();
case MessageCommand.Alert:
case MessageCommand.Headers:
case MessageCommand.MerkleBlock:
case MessageCommand.NotFound:
case MessageCommand.Reject:
Expand Down Expand Up @@ -189,9 +187,10 @@ private void OnGetBlocksMessageReceived(GetBlocksPayload payload)
EnqueueMessage(Message.Create(MessageCommand.Inv, InvPayload.Create(InventoryType.Block, hashes.ToArray())));
}

private void OnGetBlockDataMessageReceived(GetBlockDataPayload payload)
private void OnGetBlockByIndexMessageReceived(GetBlockByIndexPayload payload)
{
for (uint i = payload.IndexStart, max = payload.IndexStart + payload.Count; i < max; i++)
uint count = payload.Count == -1 ? InvPayload.MaxHashesCount : Math.Min((uint)payload.Count, InvPayload.MaxHashesCount);
for (uint i = payload.IndexStart, max = payload.IndexStart + count; i < max; i++)
{
Block block = Blockchain.Singleton.GetBlock(i);
if (block == null)
Expand Down Expand Up @@ -264,40 +263,30 @@ private void OnGetDataMessageReceived(InvPayload payload)

/// <summary>
/// Will be triggered when a MessageCommand.GetHeaders message is received.
/// Tell the specified number of blocks' headers starting with the requested HashStart to RemoteNode actor.
/// Tell the specified number of blocks' headers starting with the requested IndexStart to RemoteNode actor.
/// A limit set by HeadersPayload.MaxHeadersCount is also applied to the number of requested Headers, namely payload.Count.
/// </summary>
/// <param name="payload">A GetBlocksPayload including start block Hash and number of blocks' headers requested.</param>
private void OnGetHeadersMessageReceived(GetBlocksPayload payload)
/// <param name="payload">A GetBlocksPayload including start block index and number of blocks' headers requested.</param>
private void OnGetHeadersMessageReceived(GetBlockByIndexPayload payload)
{
UInt256 hash = payload.HashStart;
int count = payload.Count < 0 || payload.Count > HeadersPayload.MaxHeadersCount ? HeadersPayload.MaxHeadersCount : payload.Count;
DataCache<UInt256, TrimmedBlock> cache = Blockchain.Singleton.View.Blocks;
TrimmedBlock state = cache.TryGet(hash);
if (state == null) return;
uint index = payload.IndexStart;
uint count = payload.Count == -1 ? HeadersPayload.MaxHeadersCount : (uint)payload.Count;
if (index > Blockchain.Singleton.HeaderHeight)
return;
List<Header> headers = new List<Header>();
for (uint i = 1; i <= count; i++)
for (uint i = 0; i < count; i++)
{
uint index = state.Index + i;
hash = Blockchain.Singleton.GetBlockHash(index);
if (hash == null) break;
Header header = cache.TryGet(hash)?.Header;
var header = Blockchain.Singleton.GetHeader(index + i);
if (header == null) break;
headers.Add(header);
}
if (headers.Count == 0) return;
EnqueueMessage(Message.Create(MessageCommand.Headers, HeadersPayload.Create(headers.ToArray())));
}

private void OnHeadersMessageReceived(HeadersPayload payload)
{
if (payload.Headers.Length == 0) return;
system.Blockchain.Tell(payload.Headers);
}

private void OnInventoryReceived(IInventory inventory)
{
system.TaskManager.Tell(new TaskManager.TaskCompleted { Hash = inventory.Hash });
system.TaskManager.Tell(inventory);
if (inventory is Transaction transaction)
system.Consensus?.Tell(transaction);
system.Blockchain.Tell(inventory, ActorRefs.NoSender);
Expand Down
Loading

0 comments on commit 80ec37f

Please sign in to comment.