Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore MemoryPool transactions from saved consensus context. #575

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
999251c
Save consensus context each time we change view (so we will be close …
Jan 25, 2019
2ce8b71
Add extension methods for loading and saving the ConsensusContext to …
Jan 25, 2019
ae56b38
Load MemoryPool initially with transactions from saved consensus.
Jan 25, 2019
232e2fd
Fix tests.
Jan 25, 2019
d930e2c
Merge branch 'consensus/improved_dbft' into consensus/recoveryLog2
vncoelho Jan 25, 2019
4ae84f7
Remove unused 'using' statements.
Jan 25, 2019
947aa53
Merge branch 'consensus/improved_dbft' into consensus/recoveryLog2
Jan 27, 2019
91a2e3a
Merge branch 'consensus/improved_dbft' into consensus/recoveryLog2
shargon Jan 28, 2019
dc7482c
Pass consensus store into the NeoSystem constructor instead of StartC…
Jan 29, 2019
411f905
Add one extra check when restoring transactions from saved consensus …
Jan 29, 2019
f43a5ed
Move loading transactions from the last saved ConsensusContext out of…
Jan 29, 2019
28f05eb
More clean-up.
Jan 29, 2019
91f31de
Further decoupling, don't expose the consensus store from NeoSystem.
Jan 29, 2019
eedeffb
Handle saving and restoring consensus conetext when RequestSent, Requ…
Jan 30, 2019
8eaf669
Make unit tests pass again. Consensus unit tests still need attention…
Jan 30, 2019
240c5bb
Don't write the consensus state after resposne sent to reduce writes.
Jan 30, 2019
92e6987
Remove unneeded commented code.
Jan 31, 2019
3b06474
Fix saving context to include the miner transaction after receiving t…
Jan 31, 2019
323384c
Ensure consensus context has snapshot and keypair set upon loading th…
Jan 31, 2019
d312b2c
Fix loading transactions into the MemoryPool from consensus context.
Jan 31, 2019
6c33b62
More precise check when restoring consensus context.
Jan 31, 2019
1c744ea
Don't need to save consensus state on request sent or received since …
Feb 4, 2019
1d81e54
Merge branch 'consensus/improved_dbft' into consensus/recoveryLog2
Feb 16, 2019
700a0aa
Fix test compile.
Feb 16, 2019
62d5e5a
Fix tests.
Feb 16, 2019
5e29fdc
Don't separate the code for obtaining transactions into a separte met…
Feb 16, 2019
48035df
Merge branch 'consensus/improved_dbft' into consensus/recoveryLog2
erikzhang Feb 17, 2019
9b16fb8
Merge branch 'consensus/improved_dbft' into consensus/recoveryLog2
Feb 19, 2019
f955db5
Write consensus state synchronously upon commit. Move constant for co…
Feb 19, 2019
8aeaf80
Use the passed writeOptions for Put to the store.
Feb 19, 2019
82a0f2b
Decouple restoring memory pool from the consensus state from the Bloc…
Feb 19, 2019
3dd2928
Remove unused using statement.
Feb 19, 2019
2fc2139
Clean-up.
Feb 19, 2019
fc4cfe8
More clean-up. Remove unused using statements.
Feb 19, 2019
14a0a2c
Remove one more unused using statement.
Feb 19, 2019
7e0f518
Merge branch 'consensus/improved_dbft' into consensus/recoveryLog2
jsolman Feb 20, 2019
3d6a2b1
Minor changes
erikzhang Feb 20, 2019
438c105
Add method `PutSync()` to class `Store`
erikzhang Feb 20, 2019
e362f55
Merge branch 'consensus/improved_dbft' into consensus/recoveryLog2
Feb 20, 2019
465e587
Merge branch 'consensus/improved_dbft' into consensus/recoveryLog2
erikzhang Feb 21, 2019
3ebb75e
Renaming
erikzhang Feb 21, 2019
75c81d9
Remove unused variable.
Feb 21, 2019
becf0f5
Optimize the serialization of `ConsensusContext`
erikzhang Feb 21, 2019
0f9856e
Merge branch 'consensus/improved_dbft' into consensus/recoveryLog2
erikzhang Feb 21, 2019
b957d1b
Merge branch 'consensus/improved_dbft' into consensus/recoveryLog2
erikzhang Feb 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions neo.UnitTests/TestBlockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Neo.Ledger;
using Neo.Persistence;
using System;
using Neo.Persistence.LevelDB;

namespace Neo.UnitTests
{
Expand Down Expand Up @@ -32,6 +33,7 @@ public static NeoSystem InitializeMockNeoSystem()
mockSnapshot.SetupGet(p => p.HeaderHashIndex).Returns(new TestMetaDataCache<HashIndexState>());

var mockStore = new Mock<Store>();
mockStore.Setup(p => p.Get(Prefixes.CN_Context, It.IsAny<byte[]>())).Returns((byte[]) null);

var defaultTx = TestUtils.CreateRandomHashInvocationMockTransaction().Object;
mockStore.Setup(p => p.GetBlocks()).Returns(new TestDataCache<UInt256, BlockState>());
Expand Down
2 changes: 1 addition & 1 deletion neo.UnitTests/UT_Consensus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Moq;
using Neo.Consensus;
using Neo.Cryptography;
using Neo.IO;
using Neo.Ledger;
using Neo.Network.P2P;
using Neo.Network.P2P.Payloads;
Expand All @@ -16,6 +15,7 @@
using System.Linq;
using System.Numerics;
using System.Security.Cryptography;
using Neo.Persistence.LevelDB;
using ECPoint = Neo.Cryptography.ECC.ECPoint;

namespace Neo.UnitTests
Expand Down
89 changes: 44 additions & 45 deletions neo/Consensus/ConsensusService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
using Neo.Wallets;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;

namespace Neo.Consensus
Expand All @@ -22,8 +21,6 @@ public class Start { }
public class SetViewNumber { public byte ViewNumber; }
internal class Timer { public uint Height; public byte ViewNumber; }

private const byte ContextSerializationPrefix = 0xf4;

private readonly IConsensusContext context;
private readonly IActorRef localNode;
private readonly IActorRef taskManager;
Expand Down Expand Up @@ -122,7 +119,7 @@ private void CheckPreparations()
ConsensusPayload payload = context.MakeCommit();
Log($"send commit");
context.State |= ConsensusState.CommitSent;
store.Put(ContextSerializationPrefix, new byte[0], context.ToArray());
context.WriteContextToStore(store);
localNode.Tell(new LocalNode.SendDirectly { Inventory = payload });
// Set timer, so we will resend the commit in case of a networking issue
ChangeTimer(TimeSpan.FromSeconds(Blockchain.SecondsPerBlock));
Expand Down Expand Up @@ -272,6 +269,38 @@ private void OnPersistCompleted(Block block)
InitializeConsensus(0);
}

private void ObtainTransactionsForConsensus()
{
Dictionary<UInt256, Transaction> mempoolVerified =
Blockchain.Singleton.MemPool.GetVerifiedTransactions().ToDictionary(p => p.Hash);
List<Transaction> unverified = new List<Transaction>();
foreach (UInt256 hash in context.TransactionHashes.Skip(1))
{
if (mempoolVerified.TryGetValue(hash, out Transaction tx))
{
if (!AddTransaction(tx, false))
return;
}
else
{
if (Blockchain.Singleton.MemPool.TryGetValue(hash, out tx))
unverified.Add(tx);
}
}

foreach (Transaction tx in unverified)
if (!AddTransaction(tx, true))
return;
if (context.Transactions.Count < context.TransactionHashes.Length)
{
UInt256[] hashes = context.TransactionHashes.Where(i => !context.Transactions.ContainsKey(i)).ToArray();
taskManager.Tell(new TaskManager.RestartTasks
{
Payload = InvPayload.Create(InventoryType.TX, hashes)
});
}
}

private void OnRecoveryMessageReceived(ConsensusPayload payload, RecoveryMessage message)
{
if (message.ViewNumber < context.ViewNumber) return;
Expand Down Expand Up @@ -329,38 +358,15 @@ private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest m
context.PreparationPayloads[i] = null;
context.PreparationPayloads[payload.ValidatorIndex] = payload;
byte[] hashData = context.MakeHeader().GetHashData();

for (int i = 0; i < context.CommitPayloads.Length; i++)
if (context.CommitPayloads[i] != null)
if (!Crypto.Default.VerifySignature(hashData, context.CommitPayloads[i].GetDeserializedMessage<Commit>().Signature, context.Validators[i].EncodePoint(false)))
context.CommitPayloads[i] = null;
Dictionary<UInt256, Transaction> mempoolVerified = Blockchain.Singleton.MemPool.GetVerifiedTransactions().ToDictionary(p => p.Hash);

List<Transaction> unverified = new List<Transaction>();
foreach (UInt256 hash in context.TransactionHashes.Skip(1))
{
if (mempoolVerified.TryGetValue(hash, out Transaction tx))
{
if (!AddTransaction(tx, false))
return;
}
else
{
if (Blockchain.Singleton.MemPool.TryGetValue(hash, out tx))
unverified.Add(tx);
}
}
foreach (Transaction tx in unverified)
if (!AddTransaction(tx, true))
return;
if (!AddTransaction(message.MinerTransaction, true)) return;
if (context.Transactions.Count < context.TransactionHashes.Length)
{
UInt256[] hashes = context.TransactionHashes.Where(i => !context.Transactions.ContainsKey(i)).ToArray();
taskManager.Tell(new TaskManager.RestartTasks
{
Payload = InvPayload.Create(InventoryType.TX, hashes)
});
}

ObtainTransactionsForConsensus();
}

private void OnPrepareResponseReceived(ConsensusPayload payload, PrepareResponse message)
Expand Down Expand Up @@ -412,24 +418,17 @@ private void OnStart()
{
Log("OnStart");
started = true;
byte[] data = store.Get(ContextSerializationPrefix, new byte[0]);
if (data != null)
bool loadedState = context.LoadContextFromStore(store);
if (loadedState && context.State.HasFlag(ConsensusState.CommitSent) && Blockchain.Singleton.Height + 1 == context.BlockIndex)
{
using (MemoryStream ms = new MemoryStream(data, false))
using (BinaryReader reader = new BinaryReader(ms))
{
context.Deserialize(reader);
}
}
if (context.State.HasFlag(ConsensusState.CommitSent) && context.BlockIndex == Blockchain.Singleton.Height + 1)
CheckPreparations();
else
{
InitializeConsensus(0);
// Issue a ChangeView with NewViewNumber of 0 to request recovery messages on start-up.
if (context.BlockIndex == Blockchain.Singleton.HeaderHeight + 1)
localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeChangeView(0) });
return;
}

InitializeConsensus(0);
// Issue a ChangeView with NewViewNumber of 0 to request recovery messages on start-up.
if (context.BlockIndex == Blockchain.Singleton.HeaderHeight + 1)
localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeChangeView(0) });
}

private void OnTimer(Timer timer)
Expand Down
44 changes: 44 additions & 0 deletions neo/Consensus/Helper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System.IO;
using Neo.IO;
using Neo.Ledger;
using Neo.Persistence;
using Neo.Persistence.LevelDB;

namespace Neo.Consensus
{
public static class Helper
{
internal static void WriteContextToStore(this IConsensusContext context, Store store)
{
store.Put(Prefixes.CN_Context, new byte[0], context.ToArray());
}

internal static bool LoadContextFromStore(this IConsensusContext context, Store store, bool shouldReset=true)
{
byte[] data = store.Get(Prefixes.CN_Context, new byte[0]);
if (data != null)
{
if (shouldReset) context.Reset(0);
using (MemoryStream ms = new MemoryStream(data, false))
using (BinaryReader reader = new BinaryReader(ms))
{
context.Deserialize(reader);
return true;
}
}
return false;
}

internal static void LoadTransactionsToMemoryPoolFromSavedConsensusContext(MemoryPool memoryPool, Store store, Store consensusStore)
{
IConsensusContext context = new ConsensusContext(null);
context.LoadContextFromStore(consensusStore, false);
if (context.Transactions == null) return;
foreach (var tx in context.Transactions.Values)
{
if (store.ContainsTransaction(tx.Hash)) continue;
memoryPool.TryAdd(tx.Hash, tx);
}
}
}
}
10 changes: 6 additions & 4 deletions neo/Ledger/Blockchain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,13 @@ static Blockchain()
GenesisBlock.RebuildMerkleRoot();
}

public Blockchain(NeoSystem system, Store store)
public Blockchain(NeoSystem system, Store store, Store consensusStore)
{
this.system = system;
this.MemPool = new MemoryPool(system, MemoryPoolMaxTransactions);
this.Store = store;
this.MemPool = new MemoryPool(system, MemoryPoolMaxTransactions);
Consensus.Helper.LoadTransactionsToMemoryPoolFromSavedConsensusContext(MemPool, store, consensusStore);

lock (lockObj)
{
if (singleton != null)
Expand Down Expand Up @@ -683,9 +685,9 @@ internal static void ProcessValidatorStateDescriptor(StateDescriptor descriptor,
}
}

public static Props Props(NeoSystem system, Store store)
public static Props Props(NeoSystem system, Store store, Store consensusStore)
{
return Akka.Actor.Props.Create(() => new Blockchain(system, store)).WithMailbox("blockchain-mailbox");
return Akka.Actor.Props.Create(() => new Blockchain(system, store, consensusStore)).WithMailbox("blockchain-mailbox");
}

private void SaveHeaderHashList(Snapshot snapshot = null)
Expand Down
12 changes: 6 additions & 6 deletions neo/NeoSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ public class NeoSystem : IDisposable
public IActorRef Consensus { get; private set; }
public RpcServer RpcServer { get; private set; }

private readonly Store store;
private readonly Store consensusStore;
private Peer.Start start_message = null;
private bool suspend = false;

public NeoSystem(Store store)
public NeoSystem(Store store, Store consensusStore = null)
{
this.store = store;
this.Blockchain = ActorSystem.ActorOf(Ledger.Blockchain.Props(this, store));
this.consensusStore = consensusStore ?? store;
this.Blockchain = ActorSystem.ActorOf(Ledger.Blockchain.Props(this, store, this.consensusStore));
this.LocalNode = ActorSystem.ActorOf(Network.P2P.LocalNode.Props(this));
this.TaskManager = ActorSystem.ActorOf(Network.P2P.TaskManager.Props(this));
Plugin.LoadPlugins(this);
Expand All @@ -56,9 +56,9 @@ internal void ResumeNodeStartup()
}
}

public void StartConsensus(Wallet wallet, Store consensus_store = null)
public void StartConsensus(Wallet wallet)
{
Consensus = ActorSystem.ActorOf(ConsensusService.Props(this.LocalNode, this.TaskManager, consensus_store ?? store, wallet));
Consensus = ActorSystem.ActorOf(ConsensusService.Props(this.LocalNode, this.TaskManager, consensusStore, wallet));
Consensus.Tell(new ConsensusService.Start());
}

Expand Down
1 change: 1 addition & 0 deletions neo/Persistence/LevelDB/Prefixes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ internal static class Prefixes
public const byte IX_CurrentHeader = 0xc1;

public const byte SYS_Version = 0xf0;
public const byte CN_Context = 0xf4;
}
}