Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribe to RelayResult messages in ConsensusService #1647

Merged
merged 2 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ internal ConsensusService(IActorRef localNode, IActorRef taskManager, ConsensusC
this.taskManager = taskManager;
this.context = context;
Context.System.EventStream.Subscribe(Self, typeof(Blockchain.PersistCompleted));
Context.System.EventStream.Subscribe(Self, typeof(Blockchain.RelayResult));
}

private bool AddTransaction(Transaction tx, bool verify)
Expand Down Expand Up @@ -506,15 +507,16 @@ protected override void OnReceive(object message)
case Timer timer:
OnTimer(timer);
break;
case ConsensusPayload payload:
OnConsensusPayload(payload);
break;
case Transaction transaction:
OnTransaction(transaction);
break;
case Blockchain.PersistCompleted completed:
OnPersistCompleted(completed.Block);
break;
case Blockchain.RelayResult rr:
if (rr.Result == VerifyResult.Succeed && rr.Inventory is ConsensusPayload payload)
OnConsensusPayload(payload);
break;
}
}
}
Expand Down
20 changes: 9 additions & 11 deletions src/neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class RelayResult { public IInventory Inventory; public VerifyResult Resu
private uint stored_header_count = 0;
private readonly Dictionary<UInt256, Block> block_cache = new Dictionary<UInt256, Block>();
private readonly Dictionary<uint, LinkedList<Block>> block_cache_unverified = new Dictionary<uint, LinkedList<Block>>();
internal readonly RelayCache ConsensusRelayCache = new RelayCache(100);
internal readonly RelayCache RelayCache = new RelayCache(100);
private SnapshotView currentSnapshot;

public IStore Store { get; }
Expand Down Expand Up @@ -299,8 +299,7 @@ private void OnInventory(IInventory inventory, bool relay = true)
{
Block block => OnNewBlock(block),
Transaction transaction => OnNewTransaction(transaction),
ConsensusPayload payload => OnNewConsensus(payload),
_ => VerifyResult.Unknown
_ => OnNewInventory(inventory)
}
};
if (relay && rr.Result == VerifyResult.Succeed)
Expand Down Expand Up @@ -388,14 +387,6 @@ private VerifyResult OnNewBlock(Block block)
return VerifyResult.Succeed;
}

private VerifyResult OnNewConsensus(ConsensusPayload payload)
{
if (!payload.Verify(currentSnapshot)) return VerifyResult.Invalid;
system.Consensus?.Tell(payload);
ConsensusRelayCache.Add(payload);
return VerifyResult.Succeed;
}

private void OnNewHeaders(Header[] headers)
{
using (SnapshotView snapshot = GetSnapshot())
Expand All @@ -417,6 +408,13 @@ private void OnNewHeaders(Header[] headers)
system.TaskManager.Tell(new TaskManager.HeaderTaskCompleted(), Sender);
}

private VerifyResult OnNewInventory(IInventory inventory)
{
if (!inventory.Verify(currentSnapshot)) return VerifyResult.Invalid;
RelayCache.Add(inventory);
return VerifyResult.Succeed;
}

private VerifyResult OnNewTransaction(Transaction transaction)
{
if (ContainsTransaction(transaction.Hash)) return VerifyResult.AlreadyExists;
Expand Down
6 changes: 3 additions & 3 deletions src/neo/Network/P2P/RemoteNode.ProtocolHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ private void OnGetDataMessageReceived(InvPayload payload)
notFound.Add(hash);
}
break;
case InventoryType.Consensus:
if (Blockchain.Singleton.ConsensusRelayCache.TryGet(hash, out IInventory inventoryConsensus))
EnqueueMessage(Message.Create(MessageCommand.Consensus, inventoryConsensus));
default:
if (Blockchain.Singleton.RelayCache.TryGet(hash, out IInventory inventory))
EnqueueMessage(Message.Create((MessageCommand)payload.Type, inventory));
break;
}
}
Expand Down
32 changes: 21 additions & 11 deletions tests/neo.UnitTests/Consensus/UT_Consensus.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Akka.Actor;
using Akka.TestKit;
using Akka.TestKit.Xunit2;
using FluentAssertions;
Expand Down Expand Up @@ -186,7 +187,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.PreparationPayloads[prepReq.ValidatorIndex] = null;

Console.WriteLine("will tell prepare request!");
actorConsensus.Tell(prepReq);
TellConsensusPayload(actorConsensus, prepReq);
Console.WriteLine("Waiting for something related to the PrepRequest...\nNothing happens...Recovery will come due to failed nodes");
var backupOnRecoveryDueToFailedNodesII = subscriber.ExpectMsg<LocalNode.SendDirectly>();
var recoveryPayloadII = (ConsensusPayload)backupOnRecoveryDueToFailedNodesII.Inventory;
Expand All @@ -201,7 +202,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
// cleaning old try with Self ValidatorIndex
mockContext.Object.PreparationPayloads[mockContext.Object.MyIndex] = null;

actorConsensus.Tell(prepReq);
TellConsensusPayload(actorConsensus, prepReq);
var OnPrepResponse = subscriber.ExpectMsg<LocalNode.SendDirectly>();
var prepResponsePayload = (ConsensusPayload)OnPrepResponse.Inventory;
PrepareResponse prm = (PrepareResponse)prepResponsePayload.ConsensusMessage;
Expand All @@ -212,7 +213,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.CountFailed.Should().Be(5);

// Simulating CN 3
actorConsensus.Tell(GetPayloadAndModifyValidator(prepResponsePayload, 2));
TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(prepResponsePayload, 2));
//Waiting for RecoveryRequest for a more deterministic UT
backupOnRecoveryDueToFailedNodes = subscriber.ExpectMsg<LocalNode.SendDirectly>();
recoveryPayload = (ConsensusPayload)backupOnRecoveryDueToFailedNodes.Inventory;
Expand All @@ -225,7 +226,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.CountFailed.Should().Be(4);

// Simulating CN 5
actorConsensus.Tell(GetPayloadAndModifyValidator(prepResponsePayload, 4));
TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(prepResponsePayload, 4));
//Waiting for RecoveryRequest for a more deterministic UT
backupOnRecoveryDueToFailedNodes = subscriber.ExpectMsg<LocalNode.SendDirectly>();
recoveryPayload = (ConsensusPayload)backupOnRecoveryDueToFailedNodes.Inventory;
Expand All @@ -238,7 +239,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.CountFailed.Should().Be(3);

// Simulating CN 4
actorConsensus.Tell(GetPayloadAndModifyValidator(prepResponsePayload, 3));
TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(prepResponsePayload, 3));
var onCommitPayload = subscriber.ExpectMsg<LocalNode.SendDirectly>();
var commitPayload = (ConsensusPayload)onCommitPayload.Inventory;
Commit cm = (Commit)commitPayload.ConsensusMessage;
Expand Down Expand Up @@ -300,7 +301,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm

Console.WriteLine("\n==========================");
Console.WriteLine("\nCN7 simulation time");
actorConsensus.Tell(cmPayloadTemp);
TellConsensusPayload(actorConsensus, cmPayloadTemp);
var tempPayloadToBlockAndWait = subscriber.ExpectMsg<LocalNode.SendDirectly>();
var rmPayload = (ConsensusPayload)tempPayloadToBlockAndWait.Inventory;
RecoveryMessage rmm = (RecoveryMessage)rmPayload.ConsensusMessage;
Expand All @@ -310,7 +311,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.CountFailed.Should().Be(1);

Console.WriteLine("\nCN6 simulation time");
actorConsensus.Tell(GetCommitPayloadModifiedAndSignedCopy(commitPayload, 5, kp_array[5], updatedBlockHashData));
TellConsensusPayload(actorConsensus, GetCommitPayloadModifiedAndSignedCopy(commitPayload, 5, kp_array[5], updatedBlockHashData));
tempPayloadToBlockAndWait = subscriber.ExpectMsg<LocalNode.SendDirectly>();
rmPayload = (ConsensusPayload)tempPayloadToBlockAndWait.Inventory;
rmm = (RecoveryMessage)rmPayload.ConsensusMessage;
Expand All @@ -320,7 +321,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.CountFailed.Should().Be(0);

Console.WriteLine("\nCN5 simulation time");
actorConsensus.Tell(GetCommitPayloadModifiedAndSignedCopy(commitPayload, 4, kp_array[4], updatedBlockHashData));
TellConsensusPayload(actorConsensus, GetCommitPayloadModifiedAndSignedCopy(commitPayload, 4, kp_array[4], updatedBlockHashData));
tempPayloadToBlockAndWait = subscriber.ExpectMsg<LocalNode.SendDirectly>();
Console.WriteLine("\nAsserting CountCommitted is 4...");
mockContext.Object.CountCommitted.Should().Be(4);
Expand All @@ -329,7 +330,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
// Testing commit with wrong signature not valid
// It will be invalid signature because we did not change ECPoint
Console.WriteLine("\nCN4 simulation time. Wrong signature, KeyPair is not known");
actorConsensus.Tell(GetPayloadAndModifyValidator(commitPayload, 3));
TellConsensusPayload(actorConsensus, GetPayloadAndModifyValidator(commitPayload, 3));
Console.WriteLine("\nWaiting for recovery due to failed nodes... ");
var backupOnRecoveryMessageAfterCommit = subscriber.ExpectMsg<LocalNode.SendDirectly>();
rmPayload = (ConsensusPayload)backupOnRecoveryMessageAfterCommit.Inventory;
Expand Down Expand Up @@ -358,7 +359,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
Console.WriteLine($"\nNew Hash is {mockContext.Object.Block.GetHashData().ToScriptHash()}");

Console.WriteLine("\nCN4 simulation time - Final needed signatures");
actorConsensus.Tell(GetCommitPayloadModifiedAndSignedCopy(commitPayload, 3, kp_array[3], mockContext.Object.Block.GetHashData()));
TellConsensusPayload(actorConsensus, GetCommitPayloadModifiedAndSignedCopy(commitPayload, 3, kp_array[3], mockContext.Object.Block.GetHashData()));

Console.WriteLine("\nWait for subscriber Local.Node Relay");
var onBlockRelay = subscriber.ExpectMsg<LocalNode.Relay>();
Expand Down Expand Up @@ -388,7 +389,7 @@ public void ConsensusService_SingleNodeActors_OnStart_PrepReq_PrepResponses_Comm
mockContext.Object.LastSeenMessage = new int[] { -1, -1, -1, -1, -1, -1, -1 };
mockContext.Object.CountFailed.Should().Be(7);

actorConsensus.Tell(rmPayload);
TellConsensusPayload(actorConsensus, rmPayload);

Console.WriteLine("\nWaiting for RecoveryRequest before final asserts...");
var onRecoveryRequestAfterRecovery = subscriber.ExpectMsg<LocalNode.SendDirectly>();
Expand Down Expand Up @@ -921,5 +922,14 @@ private StorageKey CreateStorageKeyForNativeNeo(byte prefix)
storageKey.Key[0] = prefix;
return storageKey;
}

private void TellConsensusPayload(IActorRef actor, ConsensusPayload payload)
{
actor.Tell(new Blockchain.RelayResult
{
Inventory = payload,
Result = VerifyResult.Succeed
});
}
}
}