From e153188e800d01195b3bd3a7ed7d7a47e8836083 Mon Sep 17 00:00:00 2001 From: Shargon Date: Sat, 24 Aug 2019 11:16:51 +0200 Subject: [PATCH 1/4] Fix consensus (CRLF) --- neo/Consensus/ConsensusContext.cs | 795 +++++++++-------- neo/Consensus/ConsensusService.cs | 1332 +++++++++++++++-------------- 2 files changed, 1070 insertions(+), 1057 deletions(-) diff --git a/neo/Consensus/ConsensusContext.cs b/neo/Consensus/ConsensusContext.cs index a06c6c965b..0bd773889f 100644 --- a/neo/Consensus/ConsensusContext.cs +++ b/neo/Consensus/ConsensusContext.cs @@ -1,221 +1,221 @@ -using Neo.Cryptography; -using Neo.Cryptography.ECC; -using Neo.IO; -using Neo.Ledger; -using Neo.Network.P2P.Payloads; -using Neo.Persistence; -using Neo.SmartContract; -using Neo.SmartContract.Native; +using Neo.Cryptography; +using Neo.Cryptography.ECC; +using Neo.IO; +using Neo.Ledger; +using Neo.Network.P2P.Payloads; +using Neo.Persistence; +using Neo.SmartContract; +using Neo.SmartContract.Native; using Neo.VM; -using Neo.Wallets; -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Runtime.CompilerServices; - -namespace Neo.Consensus -{ - internal class ConsensusContext : IDisposable, ISerializable - { - /// - /// Prefix for saving consensus state. - /// - public const byte CN_Context = 0xf4; - - public Block Block; - public byte ViewNumber; - public ECPoint[] Validators; - public int MyIndex; - public UInt256[] TransactionHashes; - public Dictionary Transactions; - public ConsensusPayload[] PreparationPayloads; - public ConsensusPayload[] CommitPayloads; - public ConsensusPayload[] ChangeViewPayloads; - public ConsensusPayload[] LastChangeViewPayloads; - // LastSeenMessage array stores the height of the last seen message, for each validator. - // if this node never heard from validator i, LastSeenMessage[i] will be -1. - public int[] LastSeenMessage; - - public Snapshot Snapshot { get; private set; } - private KeyPair keyPair; +using Neo.Wallets; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Runtime.CompilerServices; + +namespace Neo.Consensus +{ + internal class ConsensusContext : IDisposable, ISerializable + { + /// + /// Prefix for saving consensus state. + /// + public const byte CN_Context = 0xf4; + + public Block Block; + public byte ViewNumber; + public ECPoint[] Validators; + public int MyIndex; + public UInt256[] TransactionHashes; + public Dictionary Transactions; + public ConsensusPayload[] PreparationPayloads; + public ConsensusPayload[] CommitPayloads; + public ConsensusPayload[] ChangeViewPayloads; + public ConsensusPayload[] LastChangeViewPayloads; + // LastSeenMessage array stores the height of the last seen message, for each validator. + // if this node never heard from validator i, LastSeenMessage[i] will be -1. + public int[] LastSeenMessage; + + public Snapshot Snapshot { get; private set; } + private KeyPair keyPair; private int _witnessSize; - private readonly Wallet wallet; - private readonly Store store; - private readonly Random random = new Random(); - - public int F => (Validators.Length - 1) / 3; - public int M => Validators.Length - F; - public bool IsPrimary => MyIndex == Block.ConsensusData.PrimaryIndex; - public bool IsBackup => MyIndex >= 0 && MyIndex != Block.ConsensusData.PrimaryIndex; - public bool WatchOnly => MyIndex < 0; - public Header PrevHeader => Snapshot.GetHeader(Block.PrevHash); - public int CountCommitted => CommitPayloads.Count(p => p != null); - public int CountFailed => LastSeenMessage.Count(p => p < (((int)Block.Index) - 1)); - - #region Consensus States - public bool RequestSentOrReceived => PreparationPayloads[Block.ConsensusData.PrimaryIndex] != null; - public bool ResponseSent => !WatchOnly && PreparationPayloads[MyIndex] != null; - public bool CommitSent => !WatchOnly && CommitPayloads[MyIndex] != null; - public bool BlockSent => Block.Transactions != null; - public bool ViewChanging => !WatchOnly && ChangeViewPayloads[MyIndex]?.GetDeserializedMessage().NewViewNumber > ViewNumber; - public bool NotAcceptingPayloadsDueToViewChanging => ViewChanging && !MoreThanFNodesCommittedOrLost; - // A possible attack can happen if the last node to commit is malicious and either sends change view after his - // commit to stall nodes in a higher view, or if he refuses to send recovery messages. In addition, if a node - // asking change views loses network or crashes and comes back when nodes are committed in more than one higher - // numbered view, it is possible for the node accepting recovery to commit in any of the higher views, thus - // potentially splitting nodes among views and stalling the network. - public bool MoreThanFNodesCommittedOrLost => (CountCommitted + CountFailed) > F; - #endregion - - public int Size => throw new NotImplementedException(); - - public ConsensusContext(Wallet wallet, Store store) - { - this.wallet = wallet; - this.store = store; - } - - public Block CreateBlock() - { - Contract contract = Contract.CreateMultiSigContract(M, Validators); - ContractParametersContext sc = new ContractParametersContext(Block); - for (int i = 0, j = 0; i < Validators.Length && j < M; i++) - { - if (CommitPayloads[i]?.ConsensusMessage.ViewNumber != ViewNumber) continue; - sc.AddSignature(contract, Validators[i], CommitPayloads[i].GetDeserializedMessage().Signature); - j++; - } - Block.Witness = sc.GetWitnesses()[0]; - Block.Transactions = TransactionHashes.Select(p => Transactions[p]).ToArray(); - return Block; - } - - public void Deserialize(BinaryReader reader) - { - Reset(0); - if (reader.ReadUInt32() != Block.Version) throw new FormatException(); - if (reader.ReadUInt32() != Block.Index) throw new InvalidOperationException(); - Block.Timestamp = reader.ReadUInt64(); - Block.NextConsensus = reader.ReadSerializable(); - if (Block.NextConsensus.Equals(UInt160.Zero)) - Block.NextConsensus = null; - Block.ConsensusData = reader.ReadSerializable(); - ViewNumber = reader.ReadByte(); - TransactionHashes = reader.ReadSerializableArray(); - if (TransactionHashes.Length == 0) - TransactionHashes = null; - Transaction[] transactions = reader.ReadSerializableArray(Block.MaxTransactionsPerBlock); - Transactions = transactions.Length == 0 ? null : transactions.ToDictionary(p => p.Hash); - PreparationPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; - for (int i = 0; i < PreparationPayloads.Length; i++) - PreparationPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; - CommitPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; - for (int i = 0; i < CommitPayloads.Length; i++) - CommitPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; - ChangeViewPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; - for (int i = 0; i < ChangeViewPayloads.Length; i++) - ChangeViewPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; - LastChangeViewPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; - for (int i = 0; i < LastChangeViewPayloads.Length; i++) - LastChangeViewPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; - } - - public void Dispose() - { - Snapshot?.Dispose(); - } - - public Block EnsureHeader() - { - if (TransactionHashes == null) return null; - if (Block.MerkleRoot is null) - Block.MerkleRoot = Block.CalculateMerkleRoot(Block.ConsensusData.Hash, TransactionHashes); - return Block; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public uint GetPrimaryIndex(byte viewNumber) - { - int p = ((int)Block.Index - viewNumber) % Validators.Length; - return p >= 0 ? (uint)p : (uint)(p + Validators.Length); - } - - public bool Load() - { - byte[] data = store.Get(CN_Context, new byte[0]); - if (data is null || data.Length == 0) return false; - using (MemoryStream ms = new MemoryStream(data, false)) - using (BinaryReader reader = new BinaryReader(ms)) - { - try - { - Deserialize(reader); - } - catch - { - return false; - } - return true; - } - } - - public ConsensusPayload MakeChangeView(ChangeViewReason reason) - { - return ChangeViewPayloads[MyIndex] = MakeSignedPayload(new ChangeView - { - Reason = reason, - Timestamp = TimeProvider.Current.UtcNow.ToTimestampMS() - }); - } - - public ConsensusPayload MakeCommit() - { - return CommitPayloads[MyIndex] ?? (CommitPayloads[MyIndex] = MakeSignedPayload(new Commit - { - Signature = EnsureHeader().Sign(keyPair) - })); - } - - private ConsensusPayload MakeSignedPayload(ConsensusMessage message) - { - message.ViewNumber = ViewNumber; - ConsensusPayload payload = new ConsensusPayload - { - Version = Block.Version, - PrevHash = Block.PrevHash, - BlockIndex = Block.Index, - ValidatorIndex = (ushort)MyIndex, - ConsensusMessage = message - }; - SignPayload(payload); - return payload; - } - - private void SignPayload(ConsensusPayload payload) - { - ContractParametersContext sc; - try - { - sc = new ContractParametersContext(payload); - wallet.Sign(sc); - } - catch (InvalidOperationException) - { - return; - } - payload.Witness = sc.GetWitnesses()[0]; + private readonly Wallet wallet; + private readonly Store store; + private readonly Random random = new Random(); + + public int F => (Validators.Length - 1) / 3; + public int M => Validators.Length - F; + public bool IsPrimary => MyIndex == Block.ConsensusData.PrimaryIndex; + public bool IsBackup => MyIndex >= 0 && MyIndex != Block.ConsensusData.PrimaryIndex; + public bool WatchOnly => MyIndex < 0; + public Header PrevHeader => Snapshot.GetHeader(Block.PrevHash); + public int CountCommitted => CommitPayloads.Count(p => p != null); + public int CountFailed => LastSeenMessage.Count(p => p < (((int)Block.Index) - 1)); + + #region Consensus States + public bool RequestSentOrReceived => PreparationPayloads[Block.ConsensusData.PrimaryIndex] != null; + public bool ResponseSent => !WatchOnly && PreparationPayloads[MyIndex] != null; + public bool CommitSent => !WatchOnly && CommitPayloads[MyIndex] != null; + public bool BlockSent => Block.Transactions != null; + public bool ViewChanging => !WatchOnly && ChangeViewPayloads[MyIndex]?.GetDeserializedMessage().NewViewNumber > ViewNumber; + public bool NotAcceptingPayloadsDueToViewChanging => ViewChanging && !MoreThanFNodesCommittedOrLost; + // A possible attack can happen if the last node to commit is malicious and either sends change view after his + // commit to stall nodes in a higher view, or if he refuses to send recovery messages. In addition, if a node + // asking change views loses network or crashes and comes back when nodes are committed in more than one higher + // numbered view, it is possible for the node accepting recovery to commit in any of the higher views, thus + // potentially splitting nodes among views and stalling the network. + public bool MoreThanFNodesCommittedOrLost => (CountCommitted + CountFailed) > F; + #endregion + + public int Size => throw new NotImplementedException(); + + public ConsensusContext(Wallet wallet, Store store) + { + this.wallet = wallet; + this.store = store; } - /// - /// Return the expected block size - /// + public Block CreateBlock() + { + Contract contract = Contract.CreateMultiSigContract(M, Validators); + ContractParametersContext sc = new ContractParametersContext(Block); + for (int i = 0, j = 0; i < Validators.Length && j < M; i++) + { + if (CommitPayloads[i]?.ConsensusMessage.ViewNumber != ViewNumber) continue; + sc.AddSignature(contract, Validators[i], CommitPayloads[i].GetDeserializedMessage().Signature); + j++; + } + Block.Witness = sc.GetWitnesses()[0]; + Block.Transactions = TransactionHashes.Select(p => Transactions[p]).ToArray(); + return Block; + } + + public void Deserialize(BinaryReader reader) + { + Reset(0); + if (reader.ReadUInt32() != Block.Version) throw new FormatException(); + if (reader.ReadUInt32() != Block.Index) throw new InvalidOperationException(); + Block.Timestamp = reader.ReadUInt64(); + Block.NextConsensus = reader.ReadSerializable(); + if (Block.NextConsensus.Equals(UInt160.Zero)) + Block.NextConsensus = null; + Block.ConsensusData = reader.ReadSerializable(); + ViewNumber = reader.ReadByte(); + TransactionHashes = reader.ReadSerializableArray(); + if (TransactionHashes.Length == 0) + TransactionHashes = null; + Transaction[] transactions = reader.ReadSerializableArray(Block.MaxTransactionsPerBlock); + Transactions = transactions.Length == 0 ? null : transactions.ToDictionary(p => p.Hash); + PreparationPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; + for (int i = 0; i < PreparationPayloads.Length; i++) + PreparationPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; + CommitPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; + for (int i = 0; i < CommitPayloads.Length; i++) + CommitPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; + ChangeViewPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; + for (int i = 0; i < ChangeViewPayloads.Length; i++) + ChangeViewPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; + LastChangeViewPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; + for (int i = 0; i < LastChangeViewPayloads.Length; i++) + LastChangeViewPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; + } + + public void Dispose() + { + Snapshot?.Dispose(); + } + + public Block EnsureHeader() + { + if (TransactionHashes == null) return null; + if (Block.MerkleRoot is null) + Block.MerkleRoot = Block.CalculateMerkleRoot(Block.ConsensusData.Hash, TransactionHashes); + return Block; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public uint GetPrimaryIndex(byte viewNumber) + { + int p = ((int)Block.Index - viewNumber) % Validators.Length; + return p >= 0 ? (uint)p : (uint)(p + Validators.Length); + } + + public bool Load() + { + byte[] data = store.Get(CN_Context, new byte[0]); + if (data is null || data.Length == 0) return false; + using (MemoryStream ms = new MemoryStream(data, false)) + using (BinaryReader reader = new BinaryReader(ms)) + { + try + { + Deserialize(reader); + } + catch + { + return false; + } + return true; + } + } + + public ConsensusPayload MakeChangeView(ChangeViewReason reason) + { + return ChangeViewPayloads[MyIndex] = MakeSignedPayload(new ChangeView + { + Reason = reason, + Timestamp = TimeProvider.Current.UtcNow.ToTimestampMS() + }); + } + + public ConsensusPayload MakeCommit() + { + return CommitPayloads[MyIndex] ?? (CommitPayloads[MyIndex] = MakeSignedPayload(new Commit + { + Signature = EnsureHeader().Sign(keyPair) + })); + } + + private ConsensusPayload MakeSignedPayload(ConsensusMessage message) + { + message.ViewNumber = ViewNumber; + ConsensusPayload payload = new ConsensusPayload + { + Version = Block.Version, + PrevHash = Block.PrevHash, + BlockIndex = Block.Index, + ValidatorIndex = (ushort)MyIndex, + ConsensusMessage = message + }; + SignPayload(payload); + return payload; + } + + private void SignPayload(ConsensusPayload payload) + { + ContractParametersContext sc; + try + { + sc = new ContractParametersContext(payload); + wallet.Sign(sc); + } + catch (InvalidOperationException) + { + return; + } + payload.Witness = sc.GetWitnesses()[0]; + } + + /// + /// Return the expected block size + /// internal int GetExpectedBlockSize() { return GetExpectedBlockSizeWithoutTransactions(Transactions.Count) + // Base size Transactions.Values.Sum(u => u.Size); // Sum Txs - } - + } + /// /// Return the expected block size without txs /// @@ -239,115 +239,114 @@ internal int GetExpectedBlockSizeWithoutTransactions(int expectedTransactions) IO.Helper.GetVarSize(expectedTransactions + 1); //Transactions count return blockSize; - } - - /// - /// Prevent that block exceed the max size - /// - /// Ordered transactions - internal void EnsureMaxBlockSize(IEnumerable txs) - { + } + + /// + /// Prevent that block exceed the max size + /// + /// Ordered transactions + internal void EnsureMaxBlockSize(IEnumerable txs) + { uint maxBlockSize = NativeContract.Policy.GetMaxBlockSize(Snapshot); uint maxTransactionsPerBlock = NativeContract.Policy.GetMaxTransactionsPerBlock(Snapshot); - - // Limit Speaker proposal to the limit `MaxTransactionsPerBlock` or all available transactions of the mempool - txs = txs.Take((int)maxTransactionsPerBlock); - List hashes = new List(); - Transactions = new Dictionary(); - Block.Transactions = new Transaction[0]; + + // Limit Speaker proposal to the limit `MaxTransactionsPerBlock` or all available transactions of the mempool + txs = txs.Take((int)maxTransactionsPerBlock); + List hashes = new List(); + Transactions = new Dictionary(); // We need to know the expected block size var blockSize = GetExpectedBlockSizeWithoutTransactions(txs.Count()); - - // Iterate transaction until reach the size - - foreach (Transaction tx in txs) - { - // Check if maximum block size has been already exceeded with the current selected set - blockSize += tx.Size; - if (blockSize > maxBlockSize) break; - - hashes.Add(tx.Hash); - Transactions.Add(tx.Hash, tx); - } + + // Iterate transaction until reach the size + + foreach (Transaction tx in txs) + { + // Check if maximum block size has been already exceeded with the current selected set + blockSize += tx.Size; + if (blockSize > maxBlockSize) break; + + hashes.Add(tx.Hash); + Transactions.Add(tx.Hash, tx); + } TransactionHashes = hashes.ToArray(); - } - - public ConsensusPayload MakePrepareRequest() - { - byte[] buffer = new byte[sizeof(ulong)]; - random.NextBytes(buffer); - Block.ConsensusData.Nonce = BitConverter.ToUInt64(buffer, 0); - EnsureMaxBlockSize(Blockchain.Singleton.MemPool.GetSortedVerifiedTransactions()); + } + + public ConsensusPayload MakePrepareRequest() + { + byte[] buffer = new byte[sizeof(ulong)]; + random.NextBytes(buffer); + Block.ConsensusData.Nonce = BitConverter.ToUInt64(buffer, 0); + EnsureMaxBlockSize(Blockchain.Singleton.MemPool.GetSortedVerifiedTransactions()); Block.Timestamp = Math.Max(TimeProvider.Current.UtcNow.ToTimestampMS(), PrevHeader.Timestamp + 1); - return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareRequest - { - Timestamp = Block.Timestamp, - Nonce = Block.ConsensusData.Nonce, - TransactionHashes = TransactionHashes - }); - } - - public ConsensusPayload MakeRecoveryRequest() - { - return MakeSignedPayload(new RecoveryRequest - { - Timestamp = TimeProvider.Current.UtcNow.ToTimestampMS() - }); - } - - public ConsensusPayload MakeRecoveryMessage() - { - PrepareRequest prepareRequestMessage = null; - if (TransactionHashes != null) - { - prepareRequestMessage = new PrepareRequest - { - ViewNumber = ViewNumber, - Timestamp = Block.Timestamp, - Nonce = Block.ConsensusData.Nonce, - TransactionHashes = TransactionHashes - }; - } - return MakeSignedPayload(new RecoveryMessage() - { - ChangeViewMessages = LastChangeViewPayloads.Where(p => p != null).Select(p => RecoveryMessage.ChangeViewPayloadCompact.FromPayload(p)).Take(M).ToDictionary(p => (int)p.ValidatorIndex), - PrepareRequestMessage = prepareRequestMessage, - // We only need a PreparationHash set if we don't have the PrepareRequest information. - PreparationHash = TransactionHashes == null ? PreparationPayloads.Where(p => p != null).GroupBy(p => p.GetDeserializedMessage().PreparationHash, (k, g) => new { Hash = k, Count = g.Count() }).OrderByDescending(p => p.Count).Select(p => p.Hash).FirstOrDefault() : null, - PreparationMessages = PreparationPayloads.Where(p => p != null).Select(p => RecoveryMessage.PreparationPayloadCompact.FromPayload(p)).ToDictionary(p => (int)p.ValidatorIndex), - CommitMessages = CommitSent - ? CommitPayloads.Where(p => p != null).Select(p => RecoveryMessage.CommitPayloadCompact.FromPayload(p)).ToDictionary(p => (int)p.ValidatorIndex) - : new Dictionary() - }); - } - - public ConsensusPayload MakePrepareResponse() - { - return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareResponse - { - PreparationHash = PreparationPayloads[Block.ConsensusData.PrimaryIndex].Hash - }); - } - - public void Reset(byte viewNumber) - { - if (viewNumber == 0) - { - Snapshot?.Dispose(); - Snapshot = Blockchain.Singleton.GetSnapshot(); - Block = new Block - { - PrevHash = Snapshot.CurrentBlockHash, - Index = Snapshot.Height + 1, - NextConsensus = Blockchain.GetConsensusAddress(NativeContract.NEO.GetValidators(Snapshot).ToArray()), - ConsensusData = new ConsensusData() - }; - var pv = Validators; - Validators = NativeContract.NEO.GetNextBlockValidators(Snapshot); + return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareRequest + { + Timestamp = Block.Timestamp, + Nonce = Block.ConsensusData.Nonce, + TransactionHashes = TransactionHashes + }); + } + + public ConsensusPayload MakeRecoveryRequest() + { + return MakeSignedPayload(new RecoveryRequest + { + Timestamp = TimeProvider.Current.UtcNow.ToTimestampMS() + }); + } + + public ConsensusPayload MakeRecoveryMessage() + { + PrepareRequest prepareRequestMessage = null; + if (TransactionHashes != null) + { + prepareRequestMessage = new PrepareRequest + { + ViewNumber = ViewNumber, + Timestamp = Block.Timestamp, + Nonce = Block.ConsensusData.Nonce, + TransactionHashes = TransactionHashes + }; + } + return MakeSignedPayload(new RecoveryMessage() + { + ChangeViewMessages = LastChangeViewPayloads.Where(p => p != null).Select(p => RecoveryMessage.ChangeViewPayloadCompact.FromPayload(p)).Take(M).ToDictionary(p => (int)p.ValidatorIndex), + PrepareRequestMessage = prepareRequestMessage, + // We only need a PreparationHash set if we don't have the PrepareRequest information. + PreparationHash = TransactionHashes == null ? PreparationPayloads.Where(p => p != null).GroupBy(p => p.GetDeserializedMessage().PreparationHash, (k, g) => new { Hash = k, Count = g.Count() }).OrderByDescending(p => p.Count).Select(p => p.Hash).FirstOrDefault() : null, + PreparationMessages = PreparationPayloads.Where(p => p != null).Select(p => RecoveryMessage.PreparationPayloadCompact.FromPayload(p)).ToDictionary(p => (int)p.ValidatorIndex), + CommitMessages = CommitSent + ? CommitPayloads.Where(p => p != null).Select(p => RecoveryMessage.CommitPayloadCompact.FromPayload(p)).ToDictionary(p => (int)p.ValidatorIndex) + : new Dictionary() + }); + } + + public ConsensusPayload MakePrepareResponse() + { + return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareResponse + { + PreparationHash = PreparationPayloads[Block.ConsensusData.PrimaryIndex].Hash + }); + } + + public void Reset(byte viewNumber) + { + if (viewNumber == 0) + { + Snapshot?.Dispose(); + Snapshot = Blockchain.Singleton.GetSnapshot(); + Block = new Block + { + PrevHash = Snapshot.CurrentBlockHash, + Index = Snapshot.Height + 1, + NextConsensus = Blockchain.GetConsensusAddress(NativeContract.NEO.GetValidators(Snapshot).ToArray()), + ConsensusData = new ConsensusData() + }; + var pv = Validators; + Validators = NativeContract.NEO.GetNextBlockValidators(Snapshot); if (_witnessSize == 0 || (pv != null && pv.Length != Validators.Length)) { // Compute the expected size of the witness @@ -363,92 +362,92 @@ public void Reset(byte viewNumber) VerificationScript = Contract.CreateMultiSigRedeemScript(M, Validators) }.Size; } - } - MyIndex = -1; - ChangeViewPayloads = new ConsensusPayload[Validators.Length]; - LastChangeViewPayloads = new ConsensusPayload[Validators.Length]; - CommitPayloads = new ConsensusPayload[Validators.Length]; - if (LastSeenMessage == null) - { - LastSeenMessage = new int[Validators.Length]; - for (int i = 0; i < Validators.Length; i++) - LastSeenMessage[i] = -1; - } - keyPair = null; - for (int i = 0; i < Validators.Length; i++) - { - WalletAccount account = wallet?.GetAccount(Validators[i]); - if (account?.HasKey != true) continue; - MyIndex = i; - keyPair = account.GetKey(); - break; - } - } - else - { - for (int i = 0; i < LastChangeViewPayloads.Length; i++) - if (ChangeViewPayloads[i]?.GetDeserializedMessage().NewViewNumber >= viewNumber) - LastChangeViewPayloads[i] = ChangeViewPayloads[i]; - else - LastChangeViewPayloads[i] = null; - } - ViewNumber = viewNumber; - Block.ConsensusData.PrimaryIndex = GetPrimaryIndex(viewNumber); - Block.MerkleRoot = null; - Block.Timestamp = 0; - Block.Transactions = null; - TransactionHashes = null; - PreparationPayloads = new ConsensusPayload[Validators.Length]; - if (MyIndex >= 0) LastSeenMessage[MyIndex] = (int)Block.Index; - } - - public void Save() - { - store.PutSync(CN_Context, new byte[0], this.ToArray()); - } - - public void Serialize(BinaryWriter writer) - { - writer.Write(Block.Version); - writer.Write(Block.Index); - writer.Write(Block.Timestamp); - writer.Write(Block.NextConsensus ?? UInt160.Zero); - writer.Write(Block.ConsensusData); - writer.Write(ViewNumber); - writer.Write(TransactionHashes ?? new UInt256[0]); - writer.Write(Transactions?.Values.ToArray() ?? new Transaction[0]); - writer.WriteVarInt(PreparationPayloads.Length); - foreach (var payload in PreparationPayloads) - { - bool hasPayload = !(payload is null); - writer.Write(hasPayload); - if (!hasPayload) continue; - writer.Write(payload); - } - writer.WriteVarInt(CommitPayloads.Length); - foreach (var payload in CommitPayloads) - { - bool hasPayload = !(payload is null); - writer.Write(hasPayload); - if (!hasPayload) continue; - writer.Write(payload); - } - writer.WriteVarInt(ChangeViewPayloads.Length); - foreach (var payload in ChangeViewPayloads) - { - bool hasPayload = !(payload is null); - writer.Write(hasPayload); - if (!hasPayload) continue; - writer.Write(payload); - } - writer.WriteVarInt(LastChangeViewPayloads.Length); - foreach (var payload in LastChangeViewPayloads) - { - bool hasPayload = !(payload is null); - writer.Write(hasPayload); - if (!hasPayload) continue; - writer.Write(payload); - } - } - } -} + } + MyIndex = -1; + ChangeViewPayloads = new ConsensusPayload[Validators.Length]; + LastChangeViewPayloads = new ConsensusPayload[Validators.Length]; + CommitPayloads = new ConsensusPayload[Validators.Length]; + if (LastSeenMessage == null) + { + LastSeenMessage = new int[Validators.Length]; + for (int i = 0; i < Validators.Length; i++) + LastSeenMessage[i] = -1; + } + keyPair = null; + for (int i = 0; i < Validators.Length; i++) + { + WalletAccount account = wallet?.GetAccount(Validators[i]); + if (account?.HasKey != true) continue; + MyIndex = i; + keyPair = account.GetKey(); + break; + } + } + else + { + for (int i = 0; i < LastChangeViewPayloads.Length; i++) + if (ChangeViewPayloads[i]?.GetDeserializedMessage().NewViewNumber >= viewNumber) + LastChangeViewPayloads[i] = ChangeViewPayloads[i]; + else + LastChangeViewPayloads[i] = null; + } + ViewNumber = viewNumber; + Block.ConsensusData.PrimaryIndex = GetPrimaryIndex(viewNumber); + Block.MerkleRoot = null; + Block.Timestamp = 0; + Block.Transactions = null; + TransactionHashes = null; + PreparationPayloads = new ConsensusPayload[Validators.Length]; + if (MyIndex >= 0) LastSeenMessage[MyIndex] = (int)Block.Index; + } + + public void Save() + { + store.PutSync(CN_Context, new byte[0], this.ToArray()); + } + + public void Serialize(BinaryWriter writer) + { + writer.Write(Block.Version); + writer.Write(Block.Index); + writer.Write(Block.Timestamp); + writer.Write(Block.NextConsensus ?? UInt160.Zero); + writer.Write(Block.ConsensusData); + writer.Write(ViewNumber); + writer.Write(TransactionHashes ?? new UInt256[0]); + writer.Write(Transactions?.Values.ToArray() ?? new Transaction[0]); + writer.WriteVarInt(PreparationPayloads.Length); + foreach (var payload in PreparationPayloads) + { + bool hasPayload = !(payload is null); + writer.Write(hasPayload); + if (!hasPayload) continue; + writer.Write(payload); + } + writer.WriteVarInt(CommitPayloads.Length); + foreach (var payload in CommitPayloads) + { + bool hasPayload = !(payload is null); + writer.Write(hasPayload); + if (!hasPayload) continue; + writer.Write(payload); + } + writer.WriteVarInt(ChangeViewPayloads.Length); + foreach (var payload in ChangeViewPayloads) + { + bool hasPayload = !(payload is null); + writer.Write(hasPayload); + if (!hasPayload) continue; + writer.Write(payload); + } + writer.WriteVarInt(LastChangeViewPayloads.Length); + foreach (var payload in LastChangeViewPayloads) + { + bool hasPayload = !(payload is null); + writer.Write(hasPayload); + if (!hasPayload) continue; + writer.Write(payload); + } + } + } +} diff --git a/neo/Consensus/ConsensusService.cs b/neo/Consensus/ConsensusService.cs index ee5c91d635..9ef4302e69 100644 --- a/neo/Consensus/ConsensusService.cs +++ b/neo/Consensus/ConsensusService.cs @@ -1,659 +1,673 @@ -using Akka.Actor; -using Akka.Configuration; -using Neo.Cryptography; -using Neo.IO; -using Neo.IO.Actors; -using Neo.Ledger; -using Neo.Network.P2P; -using Neo.Network.P2P.Payloads; -using Neo.Persistence; -using Neo.Plugins; -using Neo.SmartContract.Native; -using Neo.Wallets; -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; - -namespace Neo.Consensus -{ - public sealed class ConsensusService : UntypedActor - { - public class Start { public bool IgnoreRecoveryLogs; } - public class SetViewNumber { public byte ViewNumber; } - internal class Timer { public uint Height; public byte ViewNumber; } - - private readonly ConsensusContext context; - private readonly IActorRef localNode; - private readonly IActorRef taskManager; - private ICancelable timer_token; - private DateTime block_received_time; - private bool started = false; - - /// - /// This will record the information from last scheduled timer - /// - private DateTime clock_started = TimeProvider.Current.UtcNow; - private TimeSpan expected_delay = TimeSpan.Zero; - - /// - /// This will be cleared every block (so it will not grow out of control, but is used to prevent repeatedly - /// responding to the same message. - /// - private readonly HashSet knownHashes = new HashSet(); - /// - /// This variable is only true during OnRecoveryMessageReceived - /// - private bool isRecovering = false; - - public ConsensusService(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet) - : this(localNode, taskManager, new ConsensusContext(wallet, store)) - { - } - - internal ConsensusService(IActorRef localNode, IActorRef taskManager, ConsensusContext context) - { - this.localNode = localNode; - this.taskManager = taskManager; - this.context = context; - Context.System.EventStream.Subscribe(Self, typeof(Blockchain.PersistCompleted)); - } - - private bool AddTransaction(Transaction tx, bool verify) - { - if (verify && !tx.Verify(context.Snapshot, context.Transactions.Values)) - { - Log($"Invalid transaction: {tx.Hash}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); - RequestChangeView(ChangeViewReason.TxInvalid); - return false; - } - if (!NativeContract.Policy.CheckPolicy(tx, context.Snapshot)) - { - Log($"reject tx: {tx.Hash}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); - RequestChangeView(ChangeViewReason.TxRejectedByPolicy); - return false; - } - context.Transactions[tx.Hash] = tx; - if (context.TransactionHashes.Length == context.Transactions.Count) - { - // if we are the primary for this view, but acting as a backup because we recovered our own - // previously sent prepare request, then we don't want to send a prepare response. - if (context.IsPrimary || context.WatchOnly) return true; - - // Check maximum block size via Native Contract policy - if (context.GetExpectedBlockSize() > NativeContract.Policy.GetMaxBlockSize(context.Snapshot)) - { - Log($"rejected block: {context.Block.Index}{Environment.NewLine} The size exceed the policy", LogLevel.Warning); - RequestChangeView(ChangeViewReason.BlockRejectedByPolicy); - return false; - } - - // Timeout extension due to prepare response sent - // around 2*15/M=30.0/5 ~ 40% block time (for M=5) - ExtendTimerByFactor(2); - - Log($"send prepare response"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareResponse() }); - CheckPreparations(); - } - return true; - } - - private void ChangeTimer(TimeSpan delay) - { - clock_started = TimeProvider.Current.UtcNow; - expected_delay = delay; - timer_token.CancelIfNotNull(); - timer_token = Context.System.Scheduler.ScheduleTellOnceCancelable(delay, Self, new Timer - { - Height = context.Block.Index, - ViewNumber = context.ViewNumber - }, ActorRefs.NoSender); - } - - private void CheckCommits() - { - if (context.CommitPayloads.Count(p => p?.ConsensusMessage.ViewNumber == context.ViewNumber) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p))) - { - Block block = context.CreateBlock(); - Log($"relay block: height={block.Index} hash={block.Hash} tx={block.Transactions.Length}"); - localNode.Tell(new LocalNode.Relay { Inventory = block }); - } - } - - private void CheckExpectedView(byte viewNumber) - { - if (context.ViewNumber >= viewNumber) return; - // if there are `M` change view payloads with NewViewNumber greater than viewNumber, then, it is safe to move - if (context.ChangeViewPayloads.Count(p => p != null && p.GetDeserializedMessage().NewViewNumber >= viewNumber) >= context.M) - { - if (!context.WatchOnly) - { - ChangeView message = context.ChangeViewPayloads[context.MyIndex]?.GetDeserializedMessage(); - // Communicate the network about my agreement to move to `viewNumber` - // if my last change view payload, `message`, has NewViewNumber lower than current view to change - if (message is null || message.NewViewNumber < viewNumber) - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeChangeView(ChangeViewReason.ChangeAgreement) }); - } - InitializeConsensus(viewNumber); - } - } - - private void CheckPreparations() - { - if (context.PreparationPayloads.Count(p => p != null) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p))) - { - ConsensusPayload payload = context.MakeCommit(); - Log($"send commit"); - context.Save(); - localNode.Tell(new LocalNode.SendDirectly { Inventory = payload }); - // Set timer, so we will resend the commit in case of a networking issue - ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock)); - CheckCommits(); - } - } - - private void InitializeConsensus(byte viewNumber) - { - context.Reset(viewNumber); - if (viewNumber > 0) - Log($"changeview: view={viewNumber} primary={context.Validators[context.GetPrimaryIndex((byte)(viewNumber - 1u))]}", LogLevel.Warning); - Log($"initialize: height={context.Block.Index} view={viewNumber} index={context.MyIndex} role={(context.IsPrimary ? "Primary" : context.WatchOnly ? "WatchOnly" : "Backup")}"); - if (context.WatchOnly) return; - if (context.IsPrimary) - { - if (isRecovering) - { - ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (viewNumber + 1))); - } - else - { - TimeSpan span = TimeProvider.Current.UtcNow - block_received_time; - if (span >= Blockchain.TimePerBlock) - ChangeTimer(TimeSpan.Zero); - else - ChangeTimer(Blockchain.TimePerBlock - span); - } - } - else - { - ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (viewNumber + 1))); - } - } - - private void Log(string message, LogLevel level = LogLevel.Info) - { - Plugin.Log(nameof(ConsensusService), level, message); - } - - private void OnChangeViewReceived(ConsensusPayload payload, ChangeView message) - { - if (message.NewViewNumber <= context.ViewNumber) - OnRecoveryRequestReceived(payload); - - if (context.CommitSent) return; - - var expectedView = context.ChangeViewPayloads[payload.ValidatorIndex]?.GetDeserializedMessage().NewViewNumber ?? (byte)0; - if (message.NewViewNumber <= expectedView) - return; - - Log($"{nameof(OnChangeViewReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} nv={message.NewViewNumber}"); - context.ChangeViewPayloads[payload.ValidatorIndex] = payload; - CheckExpectedView(message.NewViewNumber); - } - - private void OnCommitReceived(ConsensusPayload payload, Commit commit) - { - ref ConsensusPayload existingCommitPayload = ref context.CommitPayloads[payload.ValidatorIndex]; - if (existingCommitPayload != null) - { - if (existingCommitPayload.Hash != payload.Hash) - Log($"{nameof(OnCommitReceived)}: different commit from validator! height={payload.BlockIndex} index={payload.ValidatorIndex} view={commit.ViewNumber} existingView={existingCommitPayload.ConsensusMessage.ViewNumber}", LogLevel.Warning); - return; - } - - // Timeout extension: commit has been received with success - // around 4*15s/M=60.0s/5=12.0s ~ 80% block time (for M=5) - ExtendTimerByFactor(4); - - if (commit.ViewNumber == context.ViewNumber) - { - Log($"{nameof(OnCommitReceived)}: height={payload.BlockIndex} view={commit.ViewNumber} index={payload.ValidatorIndex} nc={context.CountCommitted} nf={context.CountFailed}"); - - byte[] hashData = context.EnsureHeader()?.GetHashData(); - if (hashData == null) - { - existingCommitPayload = payload; - } - else if (Crypto.Default.VerifySignature(hashData, commit.Signature, - context.Validators[payload.ValidatorIndex].EncodePoint(false))) - { - existingCommitPayload = payload; - CheckCommits(); - } - return; - } - // Receiving commit from another view - Log($"{nameof(OnCommitReceived)}: record commit for different view={commit.ViewNumber} index={payload.ValidatorIndex} height={payload.BlockIndex}"); - existingCommitPayload = payload; - } - - // this function increases existing timer (never decreases) with a value proportional to `maxDelayInBlockTimes`*`Blockchain.MillisecondsPerBlock` - private void ExtendTimerByFactor(int maxDelayInBlockTimes) - { - TimeSpan nextDelay = expected_delay - (TimeProvider.Current.UtcNow - clock_started) + TimeSpan.FromMilliseconds(maxDelayInBlockTimes * Blockchain.MillisecondsPerBlock / context.M); - if (!context.WatchOnly && !context.ViewChanging && !context.CommitSent && (nextDelay > TimeSpan.Zero)) - ChangeTimer(nextDelay); - } - - private void OnConsensusPayload(ConsensusPayload payload) - { - if (context.BlockSent) return; - if (payload.Version != context.Block.Version) return; - if (payload.PrevHash != context.Block.PrevHash || payload.BlockIndex != context.Block.Index) - { - if (context.Block.Index < payload.BlockIndex) - { - Log($"chain sync: expected={payload.BlockIndex} current={context.Block.Index - 1} nodes={LocalNode.Singleton.ConnectedCount}", LogLevel.Warning); - } - return; - } - if (payload.ValidatorIndex >= context.Validators.Length) return; - ConsensusMessage message; - try - { - message = payload.ConsensusMessage; - } - catch (FormatException) - { - return; - } - catch (IOException) - { - return; - } - context.LastSeenMessage[payload.ValidatorIndex] = (int)payload.BlockIndex; - foreach (IP2PPlugin plugin in Plugin.P2PPlugins) - if (!plugin.OnConsensusMessage(payload)) - return; - switch (message) - { - case ChangeView view: - OnChangeViewReceived(payload, view); - break; - case PrepareRequest request: - OnPrepareRequestReceived(payload, request); - break; - case PrepareResponse response: - OnPrepareResponseReceived(payload, response); - break; - case Commit commit: - OnCommitReceived(payload, commit); - break; - case RecoveryRequest _: - OnRecoveryRequestReceived(payload); - break; - case RecoveryMessage recovery: - OnRecoveryMessageReceived(payload, recovery); - break; - } - } - - private void OnPersistCompleted(Block block) - { - Log($"persist block: height={block.Index} hash={block.Hash} tx={block.Transactions.Length}"); - block_received_time = TimeProvider.Current.UtcNow; - knownHashes.Clear(); - InitializeConsensus(0); - } - - private void OnRecoveryMessageReceived(ConsensusPayload payload, RecoveryMessage message) - { - // isRecovering is always set to false again after OnRecoveryMessageReceived - isRecovering = true; - int validChangeViews = 0, totalChangeViews = 0, validPrepReq = 0, totalPrepReq = 0; - int validPrepResponses = 0, totalPrepResponses = 0, validCommits = 0, totalCommits = 0; - - Log($"{nameof(OnRecoveryMessageReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex}"); - try - { - if (message.ViewNumber > context.ViewNumber) - { - if (context.CommitSent) return; - ConsensusPayload[] changeViewPayloads = message.GetChangeViewPayloads(context, payload); - totalChangeViews = changeViewPayloads.Length; - foreach (ConsensusPayload changeViewPayload in changeViewPayloads) - if (ReverifyAndProcessPayload(changeViewPayload)) validChangeViews++; - } - if (message.ViewNumber == context.ViewNumber && !context.NotAcceptingPayloadsDueToViewChanging && !context.CommitSent) - { - if (!context.RequestSentOrReceived) - { - ConsensusPayload prepareRequestPayload = message.GetPrepareRequestPayload(context, payload); - if (prepareRequestPayload != null) - { - totalPrepReq = 1; - if (ReverifyAndProcessPayload(prepareRequestPayload)) validPrepReq++; - } - else if (context.IsPrimary) - SendPrepareRequest(); - } - ConsensusPayload[] prepareResponsePayloads = message.GetPrepareResponsePayloads(context, payload); - totalPrepResponses = prepareResponsePayloads.Length; - foreach (ConsensusPayload prepareResponsePayload in prepareResponsePayloads) - if (ReverifyAndProcessPayload(prepareResponsePayload)) validPrepResponses++; - } - if (message.ViewNumber <= context.ViewNumber) - { - // Ensure we know about all commits from lower view numbers. - ConsensusPayload[] commitPayloads = message.GetCommitPayloadsFromRecoveryMessage(context, payload); - totalCommits = commitPayloads.Length; - foreach (ConsensusPayload commitPayload in commitPayloads) - if (ReverifyAndProcessPayload(commitPayload)) validCommits++; - } - } - finally - { - Log($"{nameof(OnRecoveryMessageReceived)}: finished (valid/total) " + - $"ChgView: {validChangeViews}/{totalChangeViews} " + - $"PrepReq: {validPrepReq}/{totalPrepReq} " + - $"PrepResp: {validPrepResponses}/{totalPrepResponses} " + - $"Commits: {validCommits}/{totalCommits}"); - isRecovering = false; - } - } - - private void OnRecoveryRequestReceived(ConsensusPayload payload) - { - // We keep track of the payload hashes received in this block, and don't respond with recovery - // in response to the same payload that we already responded to previously. - // ChangeView messages include a Timestamp when the change view is sent, thus if a node restarts - // and issues a change view for the same view, it will have a different hash and will correctly respond - // again; however replay attacks of the ChangeView message from arbitrary nodes will not trigger an - // additional recovery message response. - if (!knownHashes.Add(payload.Hash)) return; - - Log($"On{payload.ConsensusMessage.GetType().Name}Received: height={payload.BlockIndex} index={payload.ValidatorIndex} view={payload.ConsensusMessage.ViewNumber}"); - if (context.WatchOnly) return; - if (!context.CommitSent) - { - bool shouldSendRecovery = false; - int allowedRecoveryNodeCount = context.F; - // Limit recoveries to be sent from an upper limit of `f` nodes - for (int i = 1; i <= allowedRecoveryNodeCount; i++) - { - var chosenIndex = (payload.ValidatorIndex + i) % context.Validators.Length; - if (chosenIndex != context.MyIndex) continue; - shouldSendRecovery = true; - break; - } - - if (!shouldSendRecovery) return; - } - Log($"send recovery: view={context.ViewNumber}"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryMessage() }); - } - - private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest message) - { - if (context.RequestSentOrReceived || context.NotAcceptingPayloadsDueToViewChanging) return; - if (payload.ValidatorIndex != context.Block.ConsensusData.PrimaryIndex || message.ViewNumber != context.ViewNumber) return; - Log($"{nameof(OnPrepareRequestReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} tx={message.TransactionHashes.Length}"); - if (message.Timestamp <= context.PrevHeader.Timestamp || message.Timestamp > TimeProvider.Current.UtcNow.AddMinutes(10).ToTimestampMS()) - { - Log($"Timestamp incorrect: {message.Timestamp}", LogLevel.Warning); - return; - } - if (message.TransactionHashes.Any(p => context.Snapshot.ContainsTransaction(p))) - { - Log($"Invalid request: transaction already exists", LogLevel.Warning); - return; - } - - // Timeout extension: prepare request has been received with success - // around 2*15/M=30.0/5 ~ 40% block time (for M=5) - ExtendTimerByFactor(2); - - context.Block.Timestamp = message.Timestamp; - context.Block.ConsensusData.Nonce = message.Nonce; - context.TransactionHashes = message.TransactionHashes; - context.Transactions = new Dictionary(); - for (int i = 0; i < context.PreparationPayloads.Length; i++) - if (context.PreparationPayloads[i] != null) - if (!context.PreparationPayloads[i].GetDeserializedMessage().PreparationHash.Equals(payload.Hash)) - context.PreparationPayloads[i] = null; - context.PreparationPayloads[payload.ValidatorIndex] = payload; - byte[] hashData = context.EnsureHeader().GetHashData(); - for (int i = 0; i < context.CommitPayloads.Length; i++) - if (context.CommitPayloads[i]?.ConsensusMessage.ViewNumber == context.ViewNumber) - if (!Crypto.Default.VerifySignature(hashData, context.CommitPayloads[i].GetDeserializedMessage().Signature, context.Validators[i].EncodePoint(false))) - context.CommitPayloads[i] = null; - Dictionary mempoolVerified = Blockchain.Singleton.MemPool.GetVerifiedTransactions().ToDictionary(p => p.Hash); - List unverified = new List(); - foreach (UInt256 hash in context.TransactionHashes) - { - if (mempoolVerified.TryGetValue(hash, out Transaction tx)) - { - if (!AddTransaction(tx, false)) - return; - } - else - { - if (Blockchain.Singleton.MemPool.TryGetValue(hash, out tx)) - unverified.Add(tx); - } - } - foreach (Transaction tx in unverified) - if (!AddTransaction(tx, true)) - return; - if (context.Transactions.Count < context.TransactionHashes.Length) - { - UInt256[] hashes = context.TransactionHashes.Where(i => !context.Transactions.ContainsKey(i)).ToArray(); - taskManager.Tell(new TaskManager.RestartTasks - { - Payload = InvPayload.Create(InventoryType.TX, hashes) - }); - } - } - - private void OnPrepareResponseReceived(ConsensusPayload payload, PrepareResponse message) - { - if (message.ViewNumber != context.ViewNumber) return; - if (context.PreparationPayloads[payload.ValidatorIndex] != null || context.NotAcceptingPayloadsDueToViewChanging) return; - if (context.PreparationPayloads[context.Block.ConsensusData.PrimaryIndex] != null && !message.PreparationHash.Equals(context.PreparationPayloads[context.Block.ConsensusData.PrimaryIndex].Hash)) - return; - - // Timeout extension: prepare response has been received with success - // around 2*15/M=30.0/5 ~ 40% block time (for M=5) - ExtendTimerByFactor(2); - - Log($"{nameof(OnPrepareResponseReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex}"); - context.PreparationPayloads[payload.ValidatorIndex] = payload; - if (context.WatchOnly || context.CommitSent) return; - if (context.RequestSentOrReceived) - CheckPreparations(); - } - - protected override void OnReceive(object message) - { - if (message is Start options) - { - if (started) return; - OnStart(options); - } - else - { - if (!started) return; - switch (message) - { - case SetViewNumber setView: - InitializeConsensus(setView.ViewNumber); - break; - case Timer timer: - OnTimer(timer); - break; - case ConsensusPayload payload: - OnConsensusPayload(payload); - break; - case Transaction transaction: - OnTransaction(transaction); - break; - case Blockchain.PersistCompleted completed: - OnPersistCompleted(completed.Block); - break; - } - } - } - - private void RequestRecovery() - { - if (context.Block.Index == Blockchain.Singleton.HeaderHeight + 1) - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryRequest() }); - } - - private void OnStart(Start options) - { - Log("OnStart"); - started = true; - if (!options.IgnoreRecoveryLogs && context.Load()) - { - if (context.Transactions != null) - { - Sender.Ask(new Blockchain.FillMemoryPool - { - Transactions = context.Transactions.Values - }).Wait(); - } - if (context.CommitSent) - { - CheckPreparations(); - return; - } - } - InitializeConsensus(0); - // Issue a ChangeView with NewViewNumber of 0 to request recovery messages on start-up. - if (!context.WatchOnly) - RequestRecovery(); - } - - private void OnTimer(Timer timer) - { - if (context.WatchOnly || context.BlockSent) return; - if (timer.Height != context.Block.Index || timer.ViewNumber != context.ViewNumber) return; - Log($"timeout: height={timer.Height} view={timer.ViewNumber}"); - if (context.IsPrimary && !context.RequestSentOrReceived) - { - SendPrepareRequest(); - } - else if ((context.IsPrimary && context.RequestSentOrReceived) || context.IsBackup) - { - if (context.CommitSent) - { - // Re-send commit periodically by sending recover message in case of a network issue. - Log($"send recovery to resend commit"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryMessage() }); - ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << 1)); - } - else - { - var reason = ChangeViewReason.Timeout; - - if (context.Block != null && context.TransactionHashes?.Count() > context.Transactions?.Count) - { - reason = ChangeViewReason.TxNotFound; - } - - RequestChangeView(reason); - } - } - } - - private void OnTransaction(Transaction transaction) - { - if (!context.IsBackup || context.NotAcceptingPayloadsDueToViewChanging || !context.RequestSentOrReceived || context.ResponseSent || context.BlockSent) - return; - if (context.Transactions.ContainsKey(transaction.Hash)) return; - if (!context.TransactionHashes.Contains(transaction.Hash)) return; - AddTransaction(transaction, true); - } - - protected override void PostStop() - { - Log("OnStop"); - started = false; - Context.System.EventStream.Unsubscribe(Self); - context.Dispose(); - base.PostStop(); - } - - public static Props Props(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet) - { - return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, store, wallet)).WithMailbox("consensus-service-mailbox"); - } - - private void RequestChangeView(ChangeViewReason reason) - { - if (context.WatchOnly) return; - // Request for next view is always one view more than the current context.ViewNumber - // Nodes will not contribute for changing to a view higher than (context.ViewNumber+1), unless they are recovered - // The latter may happen by nodes in higher views with, at least, `M` proofs - byte expectedView = context.ViewNumber; - expectedView++; - ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (expectedView + 1))); - if ((context.CountCommitted + context.CountFailed) > context.F) - { - Log($"Skip requesting change view to nv={expectedView} because nc={context.CountCommitted} nf={context.CountFailed}"); - RequestRecovery(); - return; - } - Log($"request change view: height={context.Block.Index} view={context.ViewNumber} nv={expectedView} nc={context.CountCommitted} nf={context.CountFailed}"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeChangeView(reason) }); - CheckExpectedView(expectedView); - } - - private bool ReverifyAndProcessPayload(ConsensusPayload payload) - { - if (!payload.Verify(context.Snapshot)) return false; - OnConsensusPayload(payload); - return true; - } - - private void SendPrepareRequest() - { - Log($"send prepare request: height={context.Block.Index} view={context.ViewNumber}"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareRequest() }); - - if (context.Validators.Length == 1) - CheckPreparations(); - - if (context.TransactionHashes.Length > 0) - { - foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, context.TransactionHashes)) - localNode.Tell(Message.Create(MessageCommand.Inv, payload)); - } - ChangeTimer(TimeSpan.FromMilliseconds((Blockchain.MillisecondsPerBlock << (context.ViewNumber + 1)) - (context.ViewNumber == 0 ? Blockchain.MillisecondsPerBlock : 0))); - } - } - - internal class ConsensusServiceMailbox : PriorityMailbox - { - public ConsensusServiceMailbox(Akka.Actor.Settings settings, Config config) - : base(settings, config) - { - } - - internal protected override bool IsHighPriority(object message) - { - switch (message) - { - case ConsensusPayload _: - case ConsensusService.SetViewNumber _: - case ConsensusService.Timer _: - case Blockchain.PersistCompleted _: - return true; - default: - return false; - } - } - } -} +using Akka.Actor; +using Akka.Configuration; +using Neo.Cryptography; +using Neo.IO; +using Neo.IO.Actors; +using Neo.Ledger; +using Neo.Network.P2P; +using Neo.Network.P2P.Payloads; +using Neo.Persistence; +using Neo.Plugins; +using Neo.SmartContract.Native; +using Neo.Wallets; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; + +namespace Neo.Consensus +{ + public sealed class ConsensusService : UntypedActor + { + public class Start { public bool IgnoreRecoveryLogs; } + public class SetViewNumber { public byte ViewNumber; } + internal class Timer { public uint Height; public byte ViewNumber; } + + private readonly ConsensusContext context; + private readonly IActorRef localNode; + private readonly IActorRef taskManager; + private ICancelable timer_token; + private DateTime block_received_time; + private bool started = false; + + /// + /// This will record the information from last scheduled timer + /// + private DateTime clock_started = TimeProvider.Current.UtcNow; + private TimeSpan expected_delay = TimeSpan.Zero; + + /// + /// This will be cleared every block (so it will not grow out of control, but is used to prevent repeatedly + /// responding to the same message. + /// + private readonly HashSet knownHashes = new HashSet(); + /// + /// This variable is only true during OnRecoveryMessageReceived + /// + private bool isRecovering = false; + + public ConsensusService(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet) + : this(localNode, taskManager, new ConsensusContext(wallet, store)) + { + } + + internal ConsensusService(IActorRef localNode, IActorRef taskManager, ConsensusContext context) + { + this.localNode = localNode; + this.taskManager = taskManager; + this.context = context; + Context.System.EventStream.Subscribe(Self, typeof(Blockchain.PersistCompleted)); + } + + private bool AddTransaction(Transaction tx, bool verify) + { + if (verify && !tx.Verify(context.Snapshot, context.Transactions.Values)) + { + Log($"Invalid transaction: {tx.Hash}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); + RequestChangeView(ChangeViewReason.TxInvalid); + return false; + } + if (!NativeContract.Policy.CheckPolicy(tx, context.Snapshot)) + { + Log($"reject tx: {tx.Hash}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); + RequestChangeView(ChangeViewReason.TxRejectedByPolicy); + return false; + } + context.Transactions[tx.Hash] = tx; + return CheckPrepareResponse(); + } + + private bool CheckPrepareResponse() + { + if (context.TransactionHashes.Length == context.Transactions.Count) + { + // if we are the primary for this view, but acting as a backup because we recovered our own + // previously sent prepare request, then we don't want to send a prepare response. + if (context.IsPrimary || context.WatchOnly) return true; + + // Check maximum block size via Native Contract policy + if (context.GetExpectedBlockSize() > NativeContract.Policy.GetMaxBlockSize(context.Snapshot)) + { + Log($"rejected block: {context.Block.Index}{Environment.NewLine} The size exceed the policy", LogLevel.Warning); + RequestChangeView(ChangeViewReason.BlockRejectedByPolicy); + return false; + } + + // Timeout extension due to prepare response sent + // around 2*15/M=30.0/5 ~ 40% block time (for M=5) + ExtendTimerByFactor(2); + + Log($"send prepare response"); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareResponse() }); + CheckPreparations(); + } + + return true; + } + + private void ChangeTimer(TimeSpan delay) + { + clock_started = TimeProvider.Current.UtcNow; + expected_delay = delay; + timer_token.CancelIfNotNull(); + timer_token = Context.System.Scheduler.ScheduleTellOnceCancelable(delay, Self, new Timer + { + Height = context.Block.Index, + ViewNumber = context.ViewNumber + }, ActorRefs.NoSender); + } + + private void CheckCommits() + { + if (context.CommitPayloads.Count(p => p?.ConsensusMessage.ViewNumber == context.ViewNumber) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p))) + { + Block block = context.CreateBlock(); + Log($"relay block: height={block.Index} hash={block.Hash} tx={block.Transactions.Length}"); + localNode.Tell(new LocalNode.Relay { Inventory = block }); + } + } + + private void CheckExpectedView(byte viewNumber) + { + if (context.ViewNumber >= viewNumber) return; + // if there are `M` change view payloads with NewViewNumber greater than viewNumber, then, it is safe to move + if (context.ChangeViewPayloads.Count(p => p != null && p.GetDeserializedMessage().NewViewNumber >= viewNumber) >= context.M) + { + if (!context.WatchOnly) + { + ChangeView message = context.ChangeViewPayloads[context.MyIndex]?.GetDeserializedMessage(); + // Communicate the network about my agreement to move to `viewNumber` + // if my last change view payload, `message`, has NewViewNumber lower than current view to change + if (message is null || message.NewViewNumber < viewNumber) + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeChangeView(ChangeViewReason.ChangeAgreement) }); + } + InitializeConsensus(viewNumber); + } + } + + private void CheckPreparations() + { + if (context.PreparationPayloads.Count(p => p != null) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p))) + { + ConsensusPayload payload = context.MakeCommit(); + Log($"send commit"); + context.Save(); + localNode.Tell(new LocalNode.SendDirectly { Inventory = payload }); + // Set timer, so we will resend the commit in case of a networking issue + ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock)); + CheckCommits(); + } + } + + private void InitializeConsensus(byte viewNumber) + { + context.Reset(viewNumber); + if (viewNumber > 0) + Log($"changeview: view={viewNumber} primary={context.Validators[context.GetPrimaryIndex((byte)(viewNumber - 1u))]}", LogLevel.Warning); + Log($"initialize: height={context.Block.Index} view={viewNumber} index={context.MyIndex} role={(context.IsPrimary ? "Primary" : context.WatchOnly ? "WatchOnly" : "Backup")}"); + if (context.WatchOnly) return; + if (context.IsPrimary) + { + if (isRecovering) + { + ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (viewNumber + 1))); + } + else + { + TimeSpan span = TimeProvider.Current.UtcNow - block_received_time; + if (span >= Blockchain.TimePerBlock) + ChangeTimer(TimeSpan.Zero); + else + ChangeTimer(Blockchain.TimePerBlock - span); + } + } + else + { + ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (viewNumber + 1))); + } + } + + private void Log(string message, LogLevel level = LogLevel.Info) + { + Plugin.Log(nameof(ConsensusService), level, message); + } + + private void OnChangeViewReceived(ConsensusPayload payload, ChangeView message) + { + if (message.NewViewNumber <= context.ViewNumber) + OnRecoveryRequestReceived(payload); + + if (context.CommitSent) return; + + var expectedView = context.ChangeViewPayloads[payload.ValidatorIndex]?.GetDeserializedMessage().NewViewNumber ?? (byte)0; + if (message.NewViewNumber <= expectedView) + return; + + Log($"{nameof(OnChangeViewReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} nv={message.NewViewNumber}"); + context.ChangeViewPayloads[payload.ValidatorIndex] = payload; + CheckExpectedView(message.NewViewNumber); + } + + private void OnCommitReceived(ConsensusPayload payload, Commit commit) + { + ref ConsensusPayload existingCommitPayload = ref context.CommitPayloads[payload.ValidatorIndex]; + if (existingCommitPayload != null) + { + if (existingCommitPayload.Hash != payload.Hash) + Log($"{nameof(OnCommitReceived)}: different commit from validator! height={payload.BlockIndex} index={payload.ValidatorIndex} view={commit.ViewNumber} existingView={existingCommitPayload.ConsensusMessage.ViewNumber}", LogLevel.Warning); + return; + } + + // Timeout extension: commit has been received with success + // around 4*15s/M=60.0s/5=12.0s ~ 80% block time (for M=5) + ExtendTimerByFactor(4); + + if (commit.ViewNumber == context.ViewNumber) + { + Log($"{nameof(OnCommitReceived)}: height={payload.BlockIndex} view={commit.ViewNumber} index={payload.ValidatorIndex} nc={context.CountCommitted} nf={context.CountFailed}"); + + byte[] hashData = context.EnsureHeader()?.GetHashData(); + if (hashData == null) + { + existingCommitPayload = payload; + } + else if (Crypto.Default.VerifySignature(hashData, commit.Signature, + context.Validators[payload.ValidatorIndex].EncodePoint(false))) + { + existingCommitPayload = payload; + CheckCommits(); + } + return; + } + // Receiving commit from another view + Log($"{nameof(OnCommitReceived)}: record commit for different view={commit.ViewNumber} index={payload.ValidatorIndex} height={payload.BlockIndex}"); + existingCommitPayload = payload; + } + + // this function increases existing timer (never decreases) with a value proportional to `maxDelayInBlockTimes`*`Blockchain.MillisecondsPerBlock` + private void ExtendTimerByFactor(int maxDelayInBlockTimes) + { + TimeSpan nextDelay = expected_delay - (TimeProvider.Current.UtcNow - clock_started) + TimeSpan.FromMilliseconds(maxDelayInBlockTimes * Blockchain.MillisecondsPerBlock / context.M); + if (!context.WatchOnly && !context.ViewChanging && !context.CommitSent && (nextDelay > TimeSpan.Zero)) + ChangeTimer(nextDelay); + } + + private void OnConsensusPayload(ConsensusPayload payload) + { + if (context.BlockSent) return; + if (payload.Version != context.Block.Version) return; + if (payload.PrevHash != context.Block.PrevHash || payload.BlockIndex != context.Block.Index) + { + if (context.Block.Index < payload.BlockIndex) + { + Log($"chain sync: expected={payload.BlockIndex} current={context.Block.Index - 1} nodes={LocalNode.Singleton.ConnectedCount}", LogLevel.Warning); + } + return; + } + if (payload.ValidatorIndex >= context.Validators.Length) return; + ConsensusMessage message; + try + { + message = payload.ConsensusMessage; + } + catch (FormatException) + { + return; + } + catch (IOException) + { + return; + } + context.LastSeenMessage[payload.ValidatorIndex] = (int)payload.BlockIndex; + foreach (IP2PPlugin plugin in Plugin.P2PPlugins) + if (!plugin.OnConsensusMessage(payload)) + return; + switch (message) + { + case ChangeView view: + OnChangeViewReceived(payload, view); + break; + case PrepareRequest request: + OnPrepareRequestReceived(payload, request); + break; + case PrepareResponse response: + OnPrepareResponseReceived(payload, response); + break; + case Commit commit: + OnCommitReceived(payload, commit); + break; + case RecoveryRequest _: + OnRecoveryRequestReceived(payload); + break; + case RecoveryMessage recovery: + OnRecoveryMessageReceived(payload, recovery); + break; + } + } + + private void OnPersistCompleted(Block block) + { + Log($"persist block: height={block.Index} hash={block.Hash} tx={block.Transactions.Length}"); + block_received_time = TimeProvider.Current.UtcNow; + knownHashes.Clear(); + InitializeConsensus(0); + } + + private void OnRecoveryMessageReceived(ConsensusPayload payload, RecoveryMessage message) + { + // isRecovering is always set to false again after OnRecoveryMessageReceived + isRecovering = true; + int validChangeViews = 0, totalChangeViews = 0, validPrepReq = 0, totalPrepReq = 0; + int validPrepResponses = 0, totalPrepResponses = 0, validCommits = 0, totalCommits = 0; + + Log($"{nameof(OnRecoveryMessageReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex}"); + try + { + if (message.ViewNumber > context.ViewNumber) + { + if (context.CommitSent) return; + ConsensusPayload[] changeViewPayloads = message.GetChangeViewPayloads(context, payload); + totalChangeViews = changeViewPayloads.Length; + foreach (ConsensusPayload changeViewPayload in changeViewPayloads) + if (ReverifyAndProcessPayload(changeViewPayload)) validChangeViews++; + } + if (message.ViewNumber == context.ViewNumber && !context.NotAcceptingPayloadsDueToViewChanging && !context.CommitSent) + { + if (!context.RequestSentOrReceived) + { + ConsensusPayload prepareRequestPayload = message.GetPrepareRequestPayload(context, payload); + if (prepareRequestPayload != null) + { + totalPrepReq = 1; + if (ReverifyAndProcessPayload(prepareRequestPayload)) validPrepReq++; + } + else if (context.IsPrimary) + SendPrepareRequest(); + } + ConsensusPayload[] prepareResponsePayloads = message.GetPrepareResponsePayloads(context, payload); + totalPrepResponses = prepareResponsePayloads.Length; + foreach (ConsensusPayload prepareResponsePayload in prepareResponsePayloads) + if (ReverifyAndProcessPayload(prepareResponsePayload)) validPrepResponses++; + } + if (message.ViewNumber <= context.ViewNumber) + { + // Ensure we know about all commits from lower view numbers. + ConsensusPayload[] commitPayloads = message.GetCommitPayloadsFromRecoveryMessage(context, payload); + totalCommits = commitPayloads.Length; + foreach (ConsensusPayload commitPayload in commitPayloads) + if (ReverifyAndProcessPayload(commitPayload)) validCommits++; + } + } + finally + { + Log($"{nameof(OnRecoveryMessageReceived)}: finished (valid/total) " + + $"ChgView: {validChangeViews}/{totalChangeViews} " + + $"PrepReq: {validPrepReq}/{totalPrepReq} " + + $"PrepResp: {validPrepResponses}/{totalPrepResponses} " + + $"Commits: {validCommits}/{totalCommits}"); + isRecovering = false; + } + } + + private void OnRecoveryRequestReceived(ConsensusPayload payload) + { + // We keep track of the payload hashes received in this block, and don't respond with recovery + // in response to the same payload that we already responded to previously. + // ChangeView messages include a Timestamp when the change view is sent, thus if a node restarts + // and issues a change view for the same view, it will have a different hash and will correctly respond + // again; however replay attacks of the ChangeView message from arbitrary nodes will not trigger an + // additional recovery message response. + if (!knownHashes.Add(payload.Hash)) return; + + Log($"On{payload.ConsensusMessage.GetType().Name}Received: height={payload.BlockIndex} index={payload.ValidatorIndex} view={payload.ConsensusMessage.ViewNumber}"); + if (context.WatchOnly) return; + if (!context.CommitSent) + { + bool shouldSendRecovery = false; + int allowedRecoveryNodeCount = context.F; + // Limit recoveries to be sent from an upper limit of `f` nodes + for (int i = 1; i <= allowedRecoveryNodeCount; i++) + { + var chosenIndex = (payload.ValidatorIndex + i) % context.Validators.Length; + if (chosenIndex != context.MyIndex) continue; + shouldSendRecovery = true; + break; + } + + if (!shouldSendRecovery) return; + } + Log($"send recovery: view={context.ViewNumber}"); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryMessage() }); + } + + private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest message) + { + if (context.RequestSentOrReceived || context.NotAcceptingPayloadsDueToViewChanging) return; + if (payload.ValidatorIndex != context.Block.ConsensusData.PrimaryIndex || message.ViewNumber != context.ViewNumber) return; + Log($"{nameof(OnPrepareRequestReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} tx={message.TransactionHashes.Length}"); + if (message.Timestamp <= context.PrevHeader.Timestamp || message.Timestamp > TimeProvider.Current.UtcNow.AddMinutes(10).ToTimestampMS()) + { + Log($"Timestamp incorrect: {message.Timestamp}", LogLevel.Warning); + return; + } + if (message.TransactionHashes.Any(p => context.Snapshot.ContainsTransaction(p))) + { + Log($"Invalid request: transaction already exists", LogLevel.Warning); + return; + } + + // Timeout extension: prepare request has been received with success + // around 2*15/M=30.0/5 ~ 40% block time (for M=5) + ExtendTimerByFactor(2); + + context.Block.Timestamp = message.Timestamp; + context.Block.ConsensusData.Nonce = message.Nonce; + context.TransactionHashes = message.TransactionHashes; + context.Transactions = new Dictionary(); + for (int i = 0; i < context.PreparationPayloads.Length; i++) + if (context.PreparationPayloads[i] != null) + if (!context.PreparationPayloads[i].GetDeserializedMessage().PreparationHash.Equals(payload.Hash)) + context.PreparationPayloads[i] = null; + context.PreparationPayloads[payload.ValidatorIndex] = payload; + byte[] hashData = context.EnsureHeader().GetHashData(); + for (int i = 0; i < context.CommitPayloads.Length; i++) + if (context.CommitPayloads[i]?.ConsensusMessage.ViewNumber == context.ViewNumber) + if (!Crypto.Default.VerifySignature(hashData, context.CommitPayloads[i].GetDeserializedMessage().Signature, context.Validators[i].EncodePoint(false))) + context.CommitPayloads[i] = null; + + if (context.TransactionHashes.Length == 0) + { + // There are no tx so we should act like if all the transactions was filled + CheckPrepareResponse(); + return; + } + + Dictionary mempoolVerified = Blockchain.Singleton.MemPool.GetVerifiedTransactions().ToDictionary(p => p.Hash); + List unverified = new List(); + foreach (UInt256 hash in context.TransactionHashes) + { + if (mempoolVerified.TryGetValue(hash, out Transaction tx)) + { + if (!AddTransaction(tx, false)) + return; + } + else + { + if (Blockchain.Singleton.MemPool.TryGetValue(hash, out tx)) + unverified.Add(tx); + } + } + foreach (Transaction tx in unverified) + if (!AddTransaction(tx, true)) + return; + if (context.Transactions.Count < context.TransactionHashes.Length) + { + UInt256[] hashes = context.TransactionHashes.Where(i => !context.Transactions.ContainsKey(i)).ToArray(); + taskManager.Tell(new TaskManager.RestartTasks + { + Payload = InvPayload.Create(InventoryType.TX, hashes) + }); + } + } + + private void OnPrepareResponseReceived(ConsensusPayload payload, PrepareResponse message) + { + if (message.ViewNumber != context.ViewNumber) return; + if (context.PreparationPayloads[payload.ValidatorIndex] != null || context.NotAcceptingPayloadsDueToViewChanging) return; + if (context.PreparationPayloads[context.Block.ConsensusData.PrimaryIndex] != null && !message.PreparationHash.Equals(context.PreparationPayloads[context.Block.ConsensusData.PrimaryIndex].Hash)) + return; + + // Timeout extension: prepare response has been received with success + // around 2*15/M=30.0/5 ~ 40% block time (for M=5) + ExtendTimerByFactor(2); + + Log($"{nameof(OnPrepareResponseReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex}"); + context.PreparationPayloads[payload.ValidatorIndex] = payload; + if (context.WatchOnly || context.CommitSent) return; + if (context.RequestSentOrReceived) + CheckPreparations(); + } + + protected override void OnReceive(object message) + { + if (message is Start options) + { + if (started) return; + OnStart(options); + } + else + { + if (!started) return; + switch (message) + { + case SetViewNumber setView: + InitializeConsensus(setView.ViewNumber); + break; + case Timer timer: + OnTimer(timer); + break; + case ConsensusPayload payload: + OnConsensusPayload(payload); + break; + case Transaction transaction: + OnTransaction(transaction); + break; + case Blockchain.PersistCompleted completed: + OnPersistCompleted(completed.Block); + break; + } + } + } + + private void RequestRecovery() + { + if (context.Block.Index == Blockchain.Singleton.HeaderHeight + 1) + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryRequest() }); + } + + private void OnStart(Start options) + { + Log("OnStart"); + started = true; + if (!options.IgnoreRecoveryLogs && context.Load()) + { + if (context.Transactions != null) + { + Sender.Ask(new Blockchain.FillMemoryPool + { + Transactions = context.Transactions.Values + }).Wait(); + } + if (context.CommitSent) + { + CheckPreparations(); + return; + } + } + InitializeConsensus(0); + // Issue a ChangeView with NewViewNumber of 0 to request recovery messages on start-up. + if (!context.WatchOnly) + RequestRecovery(); + } + + private void OnTimer(Timer timer) + { + if (context.WatchOnly || context.BlockSent) return; + if (timer.Height != context.Block.Index || timer.ViewNumber != context.ViewNumber) return; + Log($"timeout: height={timer.Height} view={timer.ViewNumber}"); + if (context.IsPrimary && !context.RequestSentOrReceived) + { + SendPrepareRequest(); + } + else if ((context.IsPrimary && context.RequestSentOrReceived) || context.IsBackup) + { + if (context.CommitSent) + { + // Re-send commit periodically by sending recover message in case of a network issue. + Log($"send recovery to resend commit"); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryMessage() }); + ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << 1)); + } + else + { + var reason = ChangeViewReason.Timeout; + + if (context.Block != null && context.TransactionHashes?.Count() > context.Transactions?.Count) + { + reason = ChangeViewReason.TxNotFound; + } + + RequestChangeView(reason); + } + } + } + + private void OnTransaction(Transaction transaction) + { + if (!context.IsBackup || context.NotAcceptingPayloadsDueToViewChanging || !context.RequestSentOrReceived || context.ResponseSent || context.BlockSent) + return; + if (context.Transactions.ContainsKey(transaction.Hash)) return; + if (!context.TransactionHashes.Contains(transaction.Hash)) return; + AddTransaction(transaction, true); + } + + protected override void PostStop() + { + Log("OnStop"); + started = false; + Context.System.EventStream.Unsubscribe(Self); + context.Dispose(); + base.PostStop(); + } + + public static Props Props(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet) + { + return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, store, wallet)).WithMailbox("consensus-service-mailbox"); + } + + private void RequestChangeView(ChangeViewReason reason) + { + if (context.WatchOnly) return; + // Request for next view is always one view more than the current context.ViewNumber + // Nodes will not contribute for changing to a view higher than (context.ViewNumber+1), unless they are recovered + // The latter may happen by nodes in higher views with, at least, `M` proofs + byte expectedView = context.ViewNumber; + expectedView++; + ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (expectedView + 1))); + if ((context.CountCommitted + context.CountFailed) > context.F) + { + Log($"Skip requesting change view to nv={expectedView} because nc={context.CountCommitted} nf={context.CountFailed}"); + RequestRecovery(); + return; + } + Log($"request change view: height={context.Block.Index} view={context.ViewNumber} nv={expectedView} nc={context.CountCommitted} nf={context.CountFailed}"); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeChangeView(reason) }); + CheckExpectedView(expectedView); + } + + private bool ReverifyAndProcessPayload(ConsensusPayload payload) + { + if (!payload.Verify(context.Snapshot)) return false; + OnConsensusPayload(payload); + return true; + } + + private void SendPrepareRequest() + { + Log($"send prepare request: height={context.Block.Index} view={context.ViewNumber}"); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareRequest() }); + + if (context.Validators.Length == 1) + CheckPreparations(); + + if (context.TransactionHashes.Length > 0) + { + foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, context.TransactionHashes)) + localNode.Tell(Message.Create(MessageCommand.Inv, payload)); + } + ChangeTimer(TimeSpan.FromMilliseconds((Blockchain.MillisecondsPerBlock << (context.ViewNumber + 1)) - (context.ViewNumber == 0 ? Blockchain.MillisecondsPerBlock : 0))); + } + } + + internal class ConsensusServiceMailbox : PriorityMailbox + { + public ConsensusServiceMailbox(Akka.Actor.Settings settings, Config config) + : base(settings, config) + { + } + + internal protected override bool IsHighPriority(object message) + { + switch (message) + { + case ConsensusPayload _: + case ConsensusService.SetViewNumber _: + case ConsensusService.Timer _: + case Blockchain.PersistCompleted _: + return true; + default: + return false; + } + } + } +} From 5171efc1107ba32f1643f5c9239e200f477687d3 Mon Sep 17 00:00:00 2001 From: Vitor Date: Sat, 24 Aug 2019 13:57:35 -0300 Subject: [PATCH 2/4] Removing unnecessary lines changes - Cleaning PR --- neo/Consensus/ConsensusContext.cs | 905 ++++++++++--------- neo/Consensus/ConsensusService.cs | 1345 ++++++++++++++--------------- 2 files changed, 1124 insertions(+), 1126 deletions(-) diff --git a/neo/Consensus/ConsensusContext.cs b/neo/Consensus/ConsensusContext.cs index 0bd773889f..b81d16a923 100644 --- a/neo/Consensus/ConsensusContext.cs +++ b/neo/Consensus/ConsensusContext.cs @@ -1,453 +1,452 @@ -using Neo.Cryptography; -using Neo.Cryptography.ECC; -using Neo.IO; -using Neo.Ledger; -using Neo.Network.P2P.Payloads; -using Neo.Persistence; -using Neo.SmartContract; -using Neo.SmartContract.Native; -using Neo.VM; -using Neo.Wallets; -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Runtime.CompilerServices; - -namespace Neo.Consensus -{ - internal class ConsensusContext : IDisposable, ISerializable - { - /// - /// Prefix for saving consensus state. - /// - public const byte CN_Context = 0xf4; - - public Block Block; - public byte ViewNumber; - public ECPoint[] Validators; - public int MyIndex; - public UInt256[] TransactionHashes; - public Dictionary Transactions; - public ConsensusPayload[] PreparationPayloads; - public ConsensusPayload[] CommitPayloads; - public ConsensusPayload[] ChangeViewPayloads; - public ConsensusPayload[] LastChangeViewPayloads; - // LastSeenMessage array stores the height of the last seen message, for each validator. - // if this node never heard from validator i, LastSeenMessage[i] will be -1. - public int[] LastSeenMessage; - - public Snapshot Snapshot { get; private set; } - private KeyPair keyPair; - private int _witnessSize; - private readonly Wallet wallet; - private readonly Store store; - private readonly Random random = new Random(); - - public int F => (Validators.Length - 1) / 3; - public int M => Validators.Length - F; - public bool IsPrimary => MyIndex == Block.ConsensusData.PrimaryIndex; - public bool IsBackup => MyIndex >= 0 && MyIndex != Block.ConsensusData.PrimaryIndex; - public bool WatchOnly => MyIndex < 0; - public Header PrevHeader => Snapshot.GetHeader(Block.PrevHash); - public int CountCommitted => CommitPayloads.Count(p => p != null); - public int CountFailed => LastSeenMessage.Count(p => p < (((int)Block.Index) - 1)); - - #region Consensus States - public bool RequestSentOrReceived => PreparationPayloads[Block.ConsensusData.PrimaryIndex] != null; - public bool ResponseSent => !WatchOnly && PreparationPayloads[MyIndex] != null; - public bool CommitSent => !WatchOnly && CommitPayloads[MyIndex] != null; - public bool BlockSent => Block.Transactions != null; - public bool ViewChanging => !WatchOnly && ChangeViewPayloads[MyIndex]?.GetDeserializedMessage().NewViewNumber > ViewNumber; - public bool NotAcceptingPayloadsDueToViewChanging => ViewChanging && !MoreThanFNodesCommittedOrLost; - // A possible attack can happen if the last node to commit is malicious and either sends change view after his - // commit to stall nodes in a higher view, or if he refuses to send recovery messages. In addition, if a node - // asking change views loses network or crashes and comes back when nodes are committed in more than one higher - // numbered view, it is possible for the node accepting recovery to commit in any of the higher views, thus - // potentially splitting nodes among views and stalling the network. - public bool MoreThanFNodesCommittedOrLost => (CountCommitted + CountFailed) > F; - #endregion - - public int Size => throw new NotImplementedException(); - - public ConsensusContext(Wallet wallet, Store store) - { - this.wallet = wallet; - this.store = store; - } - - public Block CreateBlock() - { - Contract contract = Contract.CreateMultiSigContract(M, Validators); - ContractParametersContext sc = new ContractParametersContext(Block); - for (int i = 0, j = 0; i < Validators.Length && j < M; i++) - { - if (CommitPayloads[i]?.ConsensusMessage.ViewNumber != ViewNumber) continue; - sc.AddSignature(contract, Validators[i], CommitPayloads[i].GetDeserializedMessage().Signature); - j++; - } - Block.Witness = sc.GetWitnesses()[0]; - Block.Transactions = TransactionHashes.Select(p => Transactions[p]).ToArray(); - return Block; - } - - public void Deserialize(BinaryReader reader) - { - Reset(0); - if (reader.ReadUInt32() != Block.Version) throw new FormatException(); - if (reader.ReadUInt32() != Block.Index) throw new InvalidOperationException(); - Block.Timestamp = reader.ReadUInt64(); - Block.NextConsensus = reader.ReadSerializable(); - if (Block.NextConsensus.Equals(UInt160.Zero)) - Block.NextConsensus = null; - Block.ConsensusData = reader.ReadSerializable(); - ViewNumber = reader.ReadByte(); - TransactionHashes = reader.ReadSerializableArray(); - if (TransactionHashes.Length == 0) - TransactionHashes = null; - Transaction[] transactions = reader.ReadSerializableArray(Block.MaxTransactionsPerBlock); - Transactions = transactions.Length == 0 ? null : transactions.ToDictionary(p => p.Hash); - PreparationPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; - for (int i = 0; i < PreparationPayloads.Length; i++) - PreparationPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; - CommitPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; - for (int i = 0; i < CommitPayloads.Length; i++) - CommitPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; - ChangeViewPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; - for (int i = 0; i < ChangeViewPayloads.Length; i++) - ChangeViewPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; - LastChangeViewPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; - for (int i = 0; i < LastChangeViewPayloads.Length; i++) - LastChangeViewPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; - } - - public void Dispose() - { - Snapshot?.Dispose(); - } - - public Block EnsureHeader() - { - if (TransactionHashes == null) return null; - if (Block.MerkleRoot is null) - Block.MerkleRoot = Block.CalculateMerkleRoot(Block.ConsensusData.Hash, TransactionHashes); - return Block; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public uint GetPrimaryIndex(byte viewNumber) - { - int p = ((int)Block.Index - viewNumber) % Validators.Length; - return p >= 0 ? (uint)p : (uint)(p + Validators.Length); - } - - public bool Load() - { - byte[] data = store.Get(CN_Context, new byte[0]); - if (data is null || data.Length == 0) return false; - using (MemoryStream ms = new MemoryStream(data, false)) - using (BinaryReader reader = new BinaryReader(ms)) - { - try - { - Deserialize(reader); - } - catch - { - return false; - } - return true; - } - } - - public ConsensusPayload MakeChangeView(ChangeViewReason reason) - { - return ChangeViewPayloads[MyIndex] = MakeSignedPayload(new ChangeView - { - Reason = reason, - Timestamp = TimeProvider.Current.UtcNow.ToTimestampMS() - }); - } - - public ConsensusPayload MakeCommit() - { - return CommitPayloads[MyIndex] ?? (CommitPayloads[MyIndex] = MakeSignedPayload(new Commit - { - Signature = EnsureHeader().Sign(keyPair) - })); - } - - private ConsensusPayload MakeSignedPayload(ConsensusMessage message) - { - message.ViewNumber = ViewNumber; - ConsensusPayload payload = new ConsensusPayload - { - Version = Block.Version, - PrevHash = Block.PrevHash, - BlockIndex = Block.Index, - ValidatorIndex = (ushort)MyIndex, - ConsensusMessage = message - }; - SignPayload(payload); - return payload; - } - - private void SignPayload(ConsensusPayload payload) - { - ContractParametersContext sc; - try - { - sc = new ContractParametersContext(payload); - wallet.Sign(sc); - } - catch (InvalidOperationException) - { - return; - } - payload.Witness = sc.GetWitnesses()[0]; - } - - /// - /// Return the expected block size - /// - internal int GetExpectedBlockSize() - { - return GetExpectedBlockSizeWithoutTransactions(Transactions.Count) + // Base size - Transactions.Values.Sum(u => u.Size); // Sum Txs - } - - /// - /// Return the expected block size without txs - /// - /// Expected transactions - internal int GetExpectedBlockSizeWithoutTransactions(int expectedTransactions) - { - var blockSize = - // BlockBase - sizeof(uint) + //Version - UInt256.Length + //PrevHash - UInt256.Length + //MerkleRoot - sizeof(ulong) + //Timestamp - sizeof(uint) + //Index - UInt160.Length + //NextConsensus - 1 + // - _witnessSize; //Witness - - blockSize += - // Block - Block.ConsensusData.Size + //ConsensusData - IO.Helper.GetVarSize(expectedTransactions + 1); //Transactions count - - return blockSize; - } - - /// - /// Prevent that block exceed the max size - /// - /// Ordered transactions - internal void EnsureMaxBlockSize(IEnumerable txs) - { - uint maxBlockSize = NativeContract.Policy.GetMaxBlockSize(Snapshot); - uint maxTransactionsPerBlock = NativeContract.Policy.GetMaxTransactionsPerBlock(Snapshot); - - // Limit Speaker proposal to the limit `MaxTransactionsPerBlock` or all available transactions of the mempool - txs = txs.Take((int)maxTransactionsPerBlock); - List hashes = new List(); - Transactions = new Dictionary(); - - // We need to know the expected block size - - var blockSize = GetExpectedBlockSizeWithoutTransactions(txs.Count()); - - // Iterate transaction until reach the size - - foreach (Transaction tx in txs) - { - // Check if maximum block size has been already exceeded with the current selected set - blockSize += tx.Size; - if (blockSize > maxBlockSize) break; - - hashes.Add(tx.Hash); - Transactions.Add(tx.Hash, tx); - } - - TransactionHashes = hashes.ToArray(); - } - - public ConsensusPayload MakePrepareRequest() - { - byte[] buffer = new byte[sizeof(ulong)]; - random.NextBytes(buffer); - Block.ConsensusData.Nonce = BitConverter.ToUInt64(buffer, 0); - EnsureMaxBlockSize(Blockchain.Singleton.MemPool.GetSortedVerifiedTransactions()); - Block.Timestamp = Math.Max(TimeProvider.Current.UtcNow.ToTimestampMS(), PrevHeader.Timestamp + 1); - - return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareRequest - { - Timestamp = Block.Timestamp, - Nonce = Block.ConsensusData.Nonce, - TransactionHashes = TransactionHashes - }); - } - - public ConsensusPayload MakeRecoveryRequest() - { - return MakeSignedPayload(new RecoveryRequest - { - Timestamp = TimeProvider.Current.UtcNow.ToTimestampMS() - }); - } - - public ConsensusPayload MakeRecoveryMessage() - { - PrepareRequest prepareRequestMessage = null; - if (TransactionHashes != null) - { - prepareRequestMessage = new PrepareRequest - { - ViewNumber = ViewNumber, - Timestamp = Block.Timestamp, - Nonce = Block.ConsensusData.Nonce, - TransactionHashes = TransactionHashes - }; - } - return MakeSignedPayload(new RecoveryMessage() - { - ChangeViewMessages = LastChangeViewPayloads.Where(p => p != null).Select(p => RecoveryMessage.ChangeViewPayloadCompact.FromPayload(p)).Take(M).ToDictionary(p => (int)p.ValidatorIndex), - PrepareRequestMessage = prepareRequestMessage, - // We only need a PreparationHash set if we don't have the PrepareRequest information. - PreparationHash = TransactionHashes == null ? PreparationPayloads.Where(p => p != null).GroupBy(p => p.GetDeserializedMessage().PreparationHash, (k, g) => new { Hash = k, Count = g.Count() }).OrderByDescending(p => p.Count).Select(p => p.Hash).FirstOrDefault() : null, - PreparationMessages = PreparationPayloads.Where(p => p != null).Select(p => RecoveryMessage.PreparationPayloadCompact.FromPayload(p)).ToDictionary(p => (int)p.ValidatorIndex), - CommitMessages = CommitSent - ? CommitPayloads.Where(p => p != null).Select(p => RecoveryMessage.CommitPayloadCompact.FromPayload(p)).ToDictionary(p => (int)p.ValidatorIndex) - : new Dictionary() - }); - } - - public ConsensusPayload MakePrepareResponse() - { - return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareResponse - { - PreparationHash = PreparationPayloads[Block.ConsensusData.PrimaryIndex].Hash - }); - } - - public void Reset(byte viewNumber) - { - if (viewNumber == 0) - { - Snapshot?.Dispose(); - Snapshot = Blockchain.Singleton.GetSnapshot(); - Block = new Block - { - PrevHash = Snapshot.CurrentBlockHash, - Index = Snapshot.Height + 1, - NextConsensus = Blockchain.GetConsensusAddress(NativeContract.NEO.GetValidators(Snapshot).ToArray()), - ConsensusData = new ConsensusData() - }; - var pv = Validators; - Validators = NativeContract.NEO.GetNextBlockValidators(Snapshot); - if (_witnessSize == 0 || (pv != null && pv.Length != Validators.Length)) - { - // Compute the expected size of the witness - using (ScriptBuilder sb = new ScriptBuilder()) - { - for (int x = 0; x < M; x++) - { - sb.EmitPush(new byte[64]); - } - _witnessSize = new Witness - { - InvocationScript = sb.ToArray(), - VerificationScript = Contract.CreateMultiSigRedeemScript(M, Validators) - }.Size; - } - } - MyIndex = -1; - ChangeViewPayloads = new ConsensusPayload[Validators.Length]; - LastChangeViewPayloads = new ConsensusPayload[Validators.Length]; - CommitPayloads = new ConsensusPayload[Validators.Length]; - if (LastSeenMessage == null) - { - LastSeenMessage = new int[Validators.Length]; - for (int i = 0; i < Validators.Length; i++) - LastSeenMessage[i] = -1; - } - keyPair = null; - for (int i = 0; i < Validators.Length; i++) - { - WalletAccount account = wallet?.GetAccount(Validators[i]); - if (account?.HasKey != true) continue; - MyIndex = i; - keyPair = account.GetKey(); - break; - } - } - else - { - for (int i = 0; i < LastChangeViewPayloads.Length; i++) - if (ChangeViewPayloads[i]?.GetDeserializedMessage().NewViewNumber >= viewNumber) - LastChangeViewPayloads[i] = ChangeViewPayloads[i]; - else - LastChangeViewPayloads[i] = null; - } - ViewNumber = viewNumber; - Block.ConsensusData.PrimaryIndex = GetPrimaryIndex(viewNumber); - Block.MerkleRoot = null; - Block.Timestamp = 0; - Block.Transactions = null; - TransactionHashes = null; - PreparationPayloads = new ConsensusPayload[Validators.Length]; - if (MyIndex >= 0) LastSeenMessage[MyIndex] = (int)Block.Index; - } - - public void Save() - { - store.PutSync(CN_Context, new byte[0], this.ToArray()); - } - - public void Serialize(BinaryWriter writer) - { - writer.Write(Block.Version); - writer.Write(Block.Index); - writer.Write(Block.Timestamp); - writer.Write(Block.NextConsensus ?? UInt160.Zero); - writer.Write(Block.ConsensusData); - writer.Write(ViewNumber); - writer.Write(TransactionHashes ?? new UInt256[0]); - writer.Write(Transactions?.Values.ToArray() ?? new Transaction[0]); - writer.WriteVarInt(PreparationPayloads.Length); - foreach (var payload in PreparationPayloads) - { - bool hasPayload = !(payload is null); - writer.Write(hasPayload); - if (!hasPayload) continue; - writer.Write(payload); - } - writer.WriteVarInt(CommitPayloads.Length); - foreach (var payload in CommitPayloads) - { - bool hasPayload = !(payload is null); - writer.Write(hasPayload); - if (!hasPayload) continue; - writer.Write(payload); - } - writer.WriteVarInt(ChangeViewPayloads.Length); - foreach (var payload in ChangeViewPayloads) - { - bool hasPayload = !(payload is null); - writer.Write(hasPayload); - if (!hasPayload) continue; - writer.Write(payload); - } - writer.WriteVarInt(LastChangeViewPayloads.Length); - foreach (var payload in LastChangeViewPayloads) - { - bool hasPayload = !(payload is null); - writer.Write(hasPayload); - if (!hasPayload) continue; - writer.Write(payload); - } - } - } -} +using Neo.Cryptography; +using Neo.Cryptography.ECC; +using Neo.IO; +using Neo.Ledger; +using Neo.Network.P2P.Payloads; +using Neo.Persistence; +using Neo.SmartContract; +using Neo.SmartContract.Native; +using Neo.VM; +using Neo.Wallets; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Runtime.CompilerServices; + +namespace Neo.Consensus +{ + internal class ConsensusContext : IDisposable, ISerializable + { + /// + /// Prefix for saving consensus state. + /// + public const byte CN_Context = 0xf4; + + public Block Block; + public byte ViewNumber; + public ECPoint[] Validators; + public int MyIndex; + public UInt256[] TransactionHashes; + public Dictionary Transactions; + public ConsensusPayload[] PreparationPayloads; + public ConsensusPayload[] CommitPayloads; + public ConsensusPayload[] ChangeViewPayloads; + public ConsensusPayload[] LastChangeViewPayloads; + // LastSeenMessage array stores the height of the last seen message, for each validator. + // if this node never heard from validator i, LastSeenMessage[i] will be -1. + public int[] LastSeenMessage; + + public Snapshot Snapshot { get; private set; } + private KeyPair keyPair; + private int _witnessSize; + private readonly Wallet wallet; + private readonly Store store; + private readonly Random random = new Random(); + + public int F => (Validators.Length - 1) / 3; + public int M => Validators.Length - F; + public bool IsPrimary => MyIndex == Block.ConsensusData.PrimaryIndex; + public bool IsBackup => MyIndex >= 0 && MyIndex != Block.ConsensusData.PrimaryIndex; + public bool WatchOnly => MyIndex < 0; + public Header PrevHeader => Snapshot.GetHeader(Block.PrevHash); + public int CountCommitted => CommitPayloads.Count(p => p != null); + public int CountFailed => LastSeenMessage.Count(p => p < (((int)Block.Index) - 1)); + + #region Consensus States + public bool RequestSentOrReceived => PreparationPayloads[Block.ConsensusData.PrimaryIndex] != null; + public bool ResponseSent => !WatchOnly && PreparationPayloads[MyIndex] != null; + public bool CommitSent => !WatchOnly && CommitPayloads[MyIndex] != null; + public bool BlockSent => Block.Transactions != null; + public bool ViewChanging => !WatchOnly && ChangeViewPayloads[MyIndex]?.GetDeserializedMessage().NewViewNumber > ViewNumber; + public bool NotAcceptingPayloadsDueToViewChanging => ViewChanging && !MoreThanFNodesCommittedOrLost; + // A possible attack can happen if the last node to commit is malicious and either sends change view after his + // commit to stall nodes in a higher view, or if he refuses to send recovery messages. In addition, if a node + // asking change views loses network or crashes and comes back when nodes are committed in more than one higher + // numbered view, it is possible for the node accepting recovery to commit in any of the higher views, thus + // potentially splitting nodes among views and stalling the network. + public bool MoreThanFNodesCommittedOrLost => (CountCommitted + CountFailed) > F; + #endregion + + public int Size => throw new NotImplementedException(); + + public ConsensusContext(Wallet wallet, Store store) + { + this.wallet = wallet; + this.store = store; + } + + public Block CreateBlock() + { + Contract contract = Contract.CreateMultiSigContract(M, Validators); + ContractParametersContext sc = new ContractParametersContext(Block); + for (int i = 0, j = 0; i < Validators.Length && j < M; i++) + { + if (CommitPayloads[i]?.ConsensusMessage.ViewNumber != ViewNumber) continue; + sc.AddSignature(contract, Validators[i], CommitPayloads[i].GetDeserializedMessage().Signature); + j++; + } + Block.Witness = sc.GetWitnesses()[0]; + Block.Transactions = TransactionHashes.Select(p => Transactions[p]).ToArray(); + return Block; + } + + public void Deserialize(BinaryReader reader) + { + Reset(0); + if (reader.ReadUInt32() != Block.Version) throw new FormatException(); + if (reader.ReadUInt32() != Block.Index) throw new InvalidOperationException(); + Block.Timestamp = reader.ReadUInt64(); + Block.NextConsensus = reader.ReadSerializable(); + if (Block.NextConsensus.Equals(UInt160.Zero)) + Block.NextConsensus = null; + Block.ConsensusData = reader.ReadSerializable(); + ViewNumber = reader.ReadByte(); + TransactionHashes = reader.ReadSerializableArray(); + if (TransactionHashes.Length == 0) + TransactionHashes = null; + Transaction[] transactions = reader.ReadSerializableArray(Block.MaxTransactionsPerBlock); + Transactions = transactions.Length == 0 ? null : transactions.ToDictionary(p => p.Hash); + PreparationPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; + for (int i = 0; i < PreparationPayloads.Length; i++) + PreparationPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; + CommitPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; + for (int i = 0; i < CommitPayloads.Length; i++) + CommitPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; + ChangeViewPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; + for (int i = 0; i < ChangeViewPayloads.Length; i++) + ChangeViewPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; + LastChangeViewPayloads = new ConsensusPayload[reader.ReadVarInt(Blockchain.MaxValidators)]; + for (int i = 0; i < LastChangeViewPayloads.Length; i++) + LastChangeViewPayloads[i] = reader.ReadBoolean() ? reader.ReadSerializable() : null; + } + + public void Dispose() + { + Snapshot?.Dispose(); + } + + public Block EnsureHeader() + { + if (TransactionHashes == null) return null; + if (Block.MerkleRoot is null) + Block.MerkleRoot = Block.CalculateMerkleRoot(Block.ConsensusData.Hash, TransactionHashes); + return Block; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public uint GetPrimaryIndex(byte viewNumber) + { + int p = ((int)Block.Index - viewNumber) % Validators.Length; + return p >= 0 ? (uint)p : (uint)(p + Validators.Length); + } + + public bool Load() + { + byte[] data = store.Get(CN_Context, new byte[0]); + if (data is null || data.Length == 0) return false; + using (MemoryStream ms = new MemoryStream(data, false)) + using (BinaryReader reader = new BinaryReader(ms)) + { + try + { + Deserialize(reader); + } + catch + { + return false; + } + return true; + } + } + + public ConsensusPayload MakeChangeView(ChangeViewReason reason) + { + return ChangeViewPayloads[MyIndex] = MakeSignedPayload(new ChangeView + { + Reason = reason, + Timestamp = TimeProvider.Current.UtcNow.ToTimestampMS() + }); + } + + public ConsensusPayload MakeCommit() + { + return CommitPayloads[MyIndex] ?? (CommitPayloads[MyIndex] = MakeSignedPayload(new Commit + { + Signature = EnsureHeader().Sign(keyPair) + })); + } + + private ConsensusPayload MakeSignedPayload(ConsensusMessage message) + { + message.ViewNumber = ViewNumber; + ConsensusPayload payload = new ConsensusPayload + { + Version = Block.Version, + PrevHash = Block.PrevHash, + BlockIndex = Block.Index, + ValidatorIndex = (ushort)MyIndex, + ConsensusMessage = message + }; + SignPayload(payload); + return payload; + } + + private void SignPayload(ConsensusPayload payload) + { + ContractParametersContext sc; + try + { + sc = new ContractParametersContext(payload); + wallet.Sign(sc); + } + catch (InvalidOperationException) + { + return; + } + payload.Witness = sc.GetWitnesses()[0]; + } + + /// + /// Return the expected block size + /// + internal int GetExpectedBlockSize() + { + return GetExpectedBlockSizeWithoutTransactions(Transactions.Count) + // Base size + Transactions.Values.Sum(u => u.Size); // Sum Txs + } + + /// + /// Return the expected block size without txs + /// + /// Expected transactions + internal int GetExpectedBlockSizeWithoutTransactions(int expectedTransactions) + { + var blockSize = + // BlockBase + sizeof(uint) + //Version + UInt256.Length + //PrevHash + UInt256.Length + //MerkleRoot + sizeof(ulong) + //Timestamp + sizeof(uint) + //Index + UInt160.Length + //NextConsensus + 1 + // + _witnessSize; //Witness + + blockSize += + // Block + Block.ConsensusData.Size + //ConsensusData + IO.Helper.GetVarSize(expectedTransactions + 1); //Transactions count + + return blockSize; + } + + /// + /// Prevent that block exceed the max size + /// + /// Ordered transactions + internal void EnsureMaxBlockSize(IEnumerable txs) + { + uint maxBlockSize = NativeContract.Policy.GetMaxBlockSize(Snapshot); + uint maxTransactionsPerBlock = NativeContract.Policy.GetMaxTransactionsPerBlock(Snapshot); + + // Limit Speaker proposal to the limit `MaxTransactionsPerBlock` or all available transactions of the mempool + txs = txs.Take((int)maxTransactionsPerBlock); + List hashes = new List(); + Transactions = new Dictionary(); + + // Expected block size + var blockSize = GetExpectedBlockSizeWithoutTransactions(txs.Count()); + + // Iterate transaction until reach the size + + foreach (Transaction tx in txs) + { + // Check if maximum block size has been already exceeded with the current selected set + blockSize += tx.Size; + if (blockSize > maxBlockSize) break; + + hashes.Add(tx.Hash); + Transactions.Add(tx.Hash, tx); + } + + TransactionHashes = hashes.ToArray(); + } + + public ConsensusPayload MakePrepareRequest() + { + byte[] buffer = new byte[sizeof(ulong)]; + random.NextBytes(buffer); + Block.ConsensusData.Nonce = BitConverter.ToUInt64(buffer, 0); + EnsureMaxBlockSize(Blockchain.Singleton.MemPool.GetSortedVerifiedTransactions()); + Block.Timestamp = Math.Max(TimeProvider.Current.UtcNow.ToTimestampMS(), PrevHeader.Timestamp + 1); + + return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareRequest + { + Timestamp = Block.Timestamp, + Nonce = Block.ConsensusData.Nonce, + TransactionHashes = TransactionHashes + }); + } + + public ConsensusPayload MakeRecoveryRequest() + { + return MakeSignedPayload(new RecoveryRequest + { + Timestamp = TimeProvider.Current.UtcNow.ToTimestampMS() + }); + } + + public ConsensusPayload MakeRecoveryMessage() + { + PrepareRequest prepareRequestMessage = null; + if (TransactionHashes != null) + { + prepareRequestMessage = new PrepareRequest + { + ViewNumber = ViewNumber, + Timestamp = Block.Timestamp, + Nonce = Block.ConsensusData.Nonce, + TransactionHashes = TransactionHashes + }; + } + return MakeSignedPayload(new RecoveryMessage() + { + ChangeViewMessages = LastChangeViewPayloads.Where(p => p != null).Select(p => RecoveryMessage.ChangeViewPayloadCompact.FromPayload(p)).Take(M).ToDictionary(p => (int)p.ValidatorIndex), + PrepareRequestMessage = prepareRequestMessage, + // We only need a PreparationHash set if we don't have the PrepareRequest information. + PreparationHash = TransactionHashes == null ? PreparationPayloads.Where(p => p != null).GroupBy(p => p.GetDeserializedMessage().PreparationHash, (k, g) => new { Hash = k, Count = g.Count() }).OrderByDescending(p => p.Count).Select(p => p.Hash).FirstOrDefault() : null, + PreparationMessages = PreparationPayloads.Where(p => p != null).Select(p => RecoveryMessage.PreparationPayloadCompact.FromPayload(p)).ToDictionary(p => (int)p.ValidatorIndex), + CommitMessages = CommitSent + ? CommitPayloads.Where(p => p != null).Select(p => RecoveryMessage.CommitPayloadCompact.FromPayload(p)).ToDictionary(p => (int)p.ValidatorIndex) + : new Dictionary() + }); + } + + public ConsensusPayload MakePrepareResponse() + { + return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareResponse + { + PreparationHash = PreparationPayloads[Block.ConsensusData.PrimaryIndex].Hash + }); + } + + public void Reset(byte viewNumber) + { + if (viewNumber == 0) + { + Snapshot?.Dispose(); + Snapshot = Blockchain.Singleton.GetSnapshot(); + Block = new Block + { + PrevHash = Snapshot.CurrentBlockHash, + Index = Snapshot.Height + 1, + NextConsensus = Blockchain.GetConsensusAddress(NativeContract.NEO.GetValidators(Snapshot).ToArray()), + ConsensusData = new ConsensusData() + }; + var pv = Validators; + Validators = NativeContract.NEO.GetNextBlockValidators(Snapshot); + if (_witnessSize == 0 || (pv != null && pv.Length != Validators.Length)) + { + // Compute the expected size of the witness + using (ScriptBuilder sb = new ScriptBuilder()) + { + for (int x = 0; x < M; x++) + { + sb.EmitPush(new byte[64]); + } + _witnessSize = new Witness + { + InvocationScript = sb.ToArray(), + VerificationScript = Contract.CreateMultiSigRedeemScript(M, Validators) + }.Size; + } + } + MyIndex = -1; + ChangeViewPayloads = new ConsensusPayload[Validators.Length]; + LastChangeViewPayloads = new ConsensusPayload[Validators.Length]; + CommitPayloads = new ConsensusPayload[Validators.Length]; + if (LastSeenMessage == null) + { + LastSeenMessage = new int[Validators.Length]; + for (int i = 0; i < Validators.Length; i++) + LastSeenMessage[i] = -1; + } + keyPair = null; + for (int i = 0; i < Validators.Length; i++) + { + WalletAccount account = wallet?.GetAccount(Validators[i]); + if (account?.HasKey != true) continue; + MyIndex = i; + keyPair = account.GetKey(); + break; + } + } + else + { + for (int i = 0; i < LastChangeViewPayloads.Length; i++) + if (ChangeViewPayloads[i]?.GetDeserializedMessage().NewViewNumber >= viewNumber) + LastChangeViewPayloads[i] = ChangeViewPayloads[i]; + else + LastChangeViewPayloads[i] = null; + } + ViewNumber = viewNumber; + Block.ConsensusData.PrimaryIndex = GetPrimaryIndex(viewNumber); + Block.MerkleRoot = null; + Block.Timestamp = 0; + Block.Transactions = null; + TransactionHashes = null; + PreparationPayloads = new ConsensusPayload[Validators.Length]; + if (MyIndex >= 0) LastSeenMessage[MyIndex] = (int)Block.Index; + } + + public void Save() + { + store.PutSync(CN_Context, new byte[0], this.ToArray()); + } + + public void Serialize(BinaryWriter writer) + { + writer.Write(Block.Version); + writer.Write(Block.Index); + writer.Write(Block.Timestamp); + writer.Write(Block.NextConsensus ?? UInt160.Zero); + writer.Write(Block.ConsensusData); + writer.Write(ViewNumber); + writer.Write(TransactionHashes ?? new UInt256[0]); + writer.Write(Transactions?.Values.ToArray() ?? new Transaction[0]); + writer.WriteVarInt(PreparationPayloads.Length); + foreach (var payload in PreparationPayloads) + { + bool hasPayload = !(payload is null); + writer.Write(hasPayload); + if (!hasPayload) continue; + writer.Write(payload); + } + writer.WriteVarInt(CommitPayloads.Length); + foreach (var payload in CommitPayloads) + { + bool hasPayload = !(payload is null); + writer.Write(hasPayload); + if (!hasPayload) continue; + writer.Write(payload); + } + writer.WriteVarInt(ChangeViewPayloads.Length); + foreach (var payload in ChangeViewPayloads) + { + bool hasPayload = !(payload is null); + writer.Write(hasPayload); + if (!hasPayload) continue; + writer.Write(payload); + } + writer.WriteVarInt(LastChangeViewPayloads.Length); + foreach (var payload in LastChangeViewPayloads) + { + bool hasPayload = !(payload is null); + writer.Write(hasPayload); + if (!hasPayload) continue; + writer.Write(payload); + } + } + } +} diff --git a/neo/Consensus/ConsensusService.cs b/neo/Consensus/ConsensusService.cs index 9ef4302e69..2e881cdff0 100644 --- a/neo/Consensus/ConsensusService.cs +++ b/neo/Consensus/ConsensusService.cs @@ -1,673 +1,672 @@ -using Akka.Actor; -using Akka.Configuration; -using Neo.Cryptography; -using Neo.IO; -using Neo.IO.Actors; -using Neo.Ledger; -using Neo.Network.P2P; -using Neo.Network.P2P.Payloads; -using Neo.Persistence; -using Neo.Plugins; -using Neo.SmartContract.Native; -using Neo.Wallets; -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; - -namespace Neo.Consensus -{ - public sealed class ConsensusService : UntypedActor - { - public class Start { public bool IgnoreRecoveryLogs; } - public class SetViewNumber { public byte ViewNumber; } - internal class Timer { public uint Height; public byte ViewNumber; } - - private readonly ConsensusContext context; - private readonly IActorRef localNode; - private readonly IActorRef taskManager; - private ICancelable timer_token; - private DateTime block_received_time; - private bool started = false; - - /// - /// This will record the information from last scheduled timer - /// - private DateTime clock_started = TimeProvider.Current.UtcNow; - private TimeSpan expected_delay = TimeSpan.Zero; - - /// - /// This will be cleared every block (so it will not grow out of control, but is used to prevent repeatedly - /// responding to the same message. - /// - private readonly HashSet knownHashes = new HashSet(); - /// - /// This variable is only true during OnRecoveryMessageReceived - /// - private bool isRecovering = false; - - public ConsensusService(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet) - : this(localNode, taskManager, new ConsensusContext(wallet, store)) - { - } - - internal ConsensusService(IActorRef localNode, IActorRef taskManager, ConsensusContext context) - { - this.localNode = localNode; - this.taskManager = taskManager; - this.context = context; - Context.System.EventStream.Subscribe(Self, typeof(Blockchain.PersistCompleted)); - } - - private bool AddTransaction(Transaction tx, bool verify) - { - if (verify && !tx.Verify(context.Snapshot, context.Transactions.Values)) - { - Log($"Invalid transaction: {tx.Hash}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); - RequestChangeView(ChangeViewReason.TxInvalid); - return false; - } - if (!NativeContract.Policy.CheckPolicy(tx, context.Snapshot)) - { - Log($"reject tx: {tx.Hash}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); - RequestChangeView(ChangeViewReason.TxRejectedByPolicy); - return false; - } - context.Transactions[tx.Hash] = tx; - return CheckPrepareResponse(); - } - - private bool CheckPrepareResponse() - { - if (context.TransactionHashes.Length == context.Transactions.Count) - { - // if we are the primary for this view, but acting as a backup because we recovered our own - // previously sent prepare request, then we don't want to send a prepare response. - if (context.IsPrimary || context.WatchOnly) return true; - - // Check maximum block size via Native Contract policy - if (context.GetExpectedBlockSize() > NativeContract.Policy.GetMaxBlockSize(context.Snapshot)) - { - Log($"rejected block: {context.Block.Index}{Environment.NewLine} The size exceed the policy", LogLevel.Warning); - RequestChangeView(ChangeViewReason.BlockRejectedByPolicy); - return false; - } - - // Timeout extension due to prepare response sent - // around 2*15/M=30.0/5 ~ 40% block time (for M=5) - ExtendTimerByFactor(2); - - Log($"send prepare response"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareResponse() }); - CheckPreparations(); - } - - return true; - } - - private void ChangeTimer(TimeSpan delay) - { - clock_started = TimeProvider.Current.UtcNow; - expected_delay = delay; - timer_token.CancelIfNotNull(); - timer_token = Context.System.Scheduler.ScheduleTellOnceCancelable(delay, Self, new Timer - { - Height = context.Block.Index, - ViewNumber = context.ViewNumber - }, ActorRefs.NoSender); - } - - private void CheckCommits() - { - if (context.CommitPayloads.Count(p => p?.ConsensusMessage.ViewNumber == context.ViewNumber) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p))) - { - Block block = context.CreateBlock(); - Log($"relay block: height={block.Index} hash={block.Hash} tx={block.Transactions.Length}"); - localNode.Tell(new LocalNode.Relay { Inventory = block }); - } - } - - private void CheckExpectedView(byte viewNumber) - { - if (context.ViewNumber >= viewNumber) return; - // if there are `M` change view payloads with NewViewNumber greater than viewNumber, then, it is safe to move - if (context.ChangeViewPayloads.Count(p => p != null && p.GetDeserializedMessage().NewViewNumber >= viewNumber) >= context.M) - { - if (!context.WatchOnly) - { - ChangeView message = context.ChangeViewPayloads[context.MyIndex]?.GetDeserializedMessage(); - // Communicate the network about my agreement to move to `viewNumber` - // if my last change view payload, `message`, has NewViewNumber lower than current view to change - if (message is null || message.NewViewNumber < viewNumber) - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeChangeView(ChangeViewReason.ChangeAgreement) }); - } - InitializeConsensus(viewNumber); - } - } - - private void CheckPreparations() - { - if (context.PreparationPayloads.Count(p => p != null) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p))) - { - ConsensusPayload payload = context.MakeCommit(); - Log($"send commit"); - context.Save(); - localNode.Tell(new LocalNode.SendDirectly { Inventory = payload }); - // Set timer, so we will resend the commit in case of a networking issue - ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock)); - CheckCommits(); - } - } - - private void InitializeConsensus(byte viewNumber) - { - context.Reset(viewNumber); - if (viewNumber > 0) - Log($"changeview: view={viewNumber} primary={context.Validators[context.GetPrimaryIndex((byte)(viewNumber - 1u))]}", LogLevel.Warning); - Log($"initialize: height={context.Block.Index} view={viewNumber} index={context.MyIndex} role={(context.IsPrimary ? "Primary" : context.WatchOnly ? "WatchOnly" : "Backup")}"); - if (context.WatchOnly) return; - if (context.IsPrimary) - { - if (isRecovering) - { - ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (viewNumber + 1))); - } - else - { - TimeSpan span = TimeProvider.Current.UtcNow - block_received_time; - if (span >= Blockchain.TimePerBlock) - ChangeTimer(TimeSpan.Zero); - else - ChangeTimer(Blockchain.TimePerBlock - span); - } - } - else - { - ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (viewNumber + 1))); - } - } - - private void Log(string message, LogLevel level = LogLevel.Info) - { - Plugin.Log(nameof(ConsensusService), level, message); - } - - private void OnChangeViewReceived(ConsensusPayload payload, ChangeView message) - { - if (message.NewViewNumber <= context.ViewNumber) - OnRecoveryRequestReceived(payload); - - if (context.CommitSent) return; - - var expectedView = context.ChangeViewPayloads[payload.ValidatorIndex]?.GetDeserializedMessage().NewViewNumber ?? (byte)0; - if (message.NewViewNumber <= expectedView) - return; - - Log($"{nameof(OnChangeViewReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} nv={message.NewViewNumber}"); - context.ChangeViewPayloads[payload.ValidatorIndex] = payload; - CheckExpectedView(message.NewViewNumber); - } - - private void OnCommitReceived(ConsensusPayload payload, Commit commit) - { - ref ConsensusPayload existingCommitPayload = ref context.CommitPayloads[payload.ValidatorIndex]; - if (existingCommitPayload != null) - { - if (existingCommitPayload.Hash != payload.Hash) - Log($"{nameof(OnCommitReceived)}: different commit from validator! height={payload.BlockIndex} index={payload.ValidatorIndex} view={commit.ViewNumber} existingView={existingCommitPayload.ConsensusMessage.ViewNumber}", LogLevel.Warning); - return; - } - - // Timeout extension: commit has been received with success - // around 4*15s/M=60.0s/5=12.0s ~ 80% block time (for M=5) - ExtendTimerByFactor(4); - - if (commit.ViewNumber == context.ViewNumber) - { - Log($"{nameof(OnCommitReceived)}: height={payload.BlockIndex} view={commit.ViewNumber} index={payload.ValidatorIndex} nc={context.CountCommitted} nf={context.CountFailed}"); - - byte[] hashData = context.EnsureHeader()?.GetHashData(); - if (hashData == null) - { - existingCommitPayload = payload; - } - else if (Crypto.Default.VerifySignature(hashData, commit.Signature, - context.Validators[payload.ValidatorIndex].EncodePoint(false))) - { - existingCommitPayload = payload; - CheckCommits(); - } - return; - } - // Receiving commit from another view - Log($"{nameof(OnCommitReceived)}: record commit for different view={commit.ViewNumber} index={payload.ValidatorIndex} height={payload.BlockIndex}"); - existingCommitPayload = payload; - } - - // this function increases existing timer (never decreases) with a value proportional to `maxDelayInBlockTimes`*`Blockchain.MillisecondsPerBlock` - private void ExtendTimerByFactor(int maxDelayInBlockTimes) - { - TimeSpan nextDelay = expected_delay - (TimeProvider.Current.UtcNow - clock_started) + TimeSpan.FromMilliseconds(maxDelayInBlockTimes * Blockchain.MillisecondsPerBlock / context.M); - if (!context.WatchOnly && !context.ViewChanging && !context.CommitSent && (nextDelay > TimeSpan.Zero)) - ChangeTimer(nextDelay); - } - - private void OnConsensusPayload(ConsensusPayload payload) - { - if (context.BlockSent) return; - if (payload.Version != context.Block.Version) return; - if (payload.PrevHash != context.Block.PrevHash || payload.BlockIndex != context.Block.Index) - { - if (context.Block.Index < payload.BlockIndex) - { - Log($"chain sync: expected={payload.BlockIndex} current={context.Block.Index - 1} nodes={LocalNode.Singleton.ConnectedCount}", LogLevel.Warning); - } - return; - } - if (payload.ValidatorIndex >= context.Validators.Length) return; - ConsensusMessage message; - try - { - message = payload.ConsensusMessage; - } - catch (FormatException) - { - return; - } - catch (IOException) - { - return; - } - context.LastSeenMessage[payload.ValidatorIndex] = (int)payload.BlockIndex; - foreach (IP2PPlugin plugin in Plugin.P2PPlugins) - if (!plugin.OnConsensusMessage(payload)) - return; - switch (message) - { - case ChangeView view: - OnChangeViewReceived(payload, view); - break; - case PrepareRequest request: - OnPrepareRequestReceived(payload, request); - break; - case PrepareResponse response: - OnPrepareResponseReceived(payload, response); - break; - case Commit commit: - OnCommitReceived(payload, commit); - break; - case RecoveryRequest _: - OnRecoveryRequestReceived(payload); - break; - case RecoveryMessage recovery: - OnRecoveryMessageReceived(payload, recovery); - break; - } - } - - private void OnPersistCompleted(Block block) - { - Log($"persist block: height={block.Index} hash={block.Hash} tx={block.Transactions.Length}"); - block_received_time = TimeProvider.Current.UtcNow; - knownHashes.Clear(); - InitializeConsensus(0); - } - - private void OnRecoveryMessageReceived(ConsensusPayload payload, RecoveryMessage message) - { - // isRecovering is always set to false again after OnRecoveryMessageReceived - isRecovering = true; - int validChangeViews = 0, totalChangeViews = 0, validPrepReq = 0, totalPrepReq = 0; - int validPrepResponses = 0, totalPrepResponses = 0, validCommits = 0, totalCommits = 0; - - Log($"{nameof(OnRecoveryMessageReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex}"); - try - { - if (message.ViewNumber > context.ViewNumber) - { - if (context.CommitSent) return; - ConsensusPayload[] changeViewPayloads = message.GetChangeViewPayloads(context, payload); - totalChangeViews = changeViewPayloads.Length; - foreach (ConsensusPayload changeViewPayload in changeViewPayloads) - if (ReverifyAndProcessPayload(changeViewPayload)) validChangeViews++; - } - if (message.ViewNumber == context.ViewNumber && !context.NotAcceptingPayloadsDueToViewChanging && !context.CommitSent) - { - if (!context.RequestSentOrReceived) - { - ConsensusPayload prepareRequestPayload = message.GetPrepareRequestPayload(context, payload); - if (prepareRequestPayload != null) - { - totalPrepReq = 1; - if (ReverifyAndProcessPayload(prepareRequestPayload)) validPrepReq++; - } - else if (context.IsPrimary) - SendPrepareRequest(); - } - ConsensusPayload[] prepareResponsePayloads = message.GetPrepareResponsePayloads(context, payload); - totalPrepResponses = prepareResponsePayloads.Length; - foreach (ConsensusPayload prepareResponsePayload in prepareResponsePayloads) - if (ReverifyAndProcessPayload(prepareResponsePayload)) validPrepResponses++; - } - if (message.ViewNumber <= context.ViewNumber) - { - // Ensure we know about all commits from lower view numbers. - ConsensusPayload[] commitPayloads = message.GetCommitPayloadsFromRecoveryMessage(context, payload); - totalCommits = commitPayloads.Length; - foreach (ConsensusPayload commitPayload in commitPayloads) - if (ReverifyAndProcessPayload(commitPayload)) validCommits++; - } - } - finally - { - Log($"{nameof(OnRecoveryMessageReceived)}: finished (valid/total) " + - $"ChgView: {validChangeViews}/{totalChangeViews} " + - $"PrepReq: {validPrepReq}/{totalPrepReq} " + - $"PrepResp: {validPrepResponses}/{totalPrepResponses} " + - $"Commits: {validCommits}/{totalCommits}"); - isRecovering = false; - } - } - - private void OnRecoveryRequestReceived(ConsensusPayload payload) - { - // We keep track of the payload hashes received in this block, and don't respond with recovery - // in response to the same payload that we already responded to previously. - // ChangeView messages include a Timestamp when the change view is sent, thus if a node restarts - // and issues a change view for the same view, it will have a different hash and will correctly respond - // again; however replay attacks of the ChangeView message from arbitrary nodes will not trigger an - // additional recovery message response. - if (!knownHashes.Add(payload.Hash)) return; - - Log($"On{payload.ConsensusMessage.GetType().Name}Received: height={payload.BlockIndex} index={payload.ValidatorIndex} view={payload.ConsensusMessage.ViewNumber}"); - if (context.WatchOnly) return; - if (!context.CommitSent) - { - bool shouldSendRecovery = false; - int allowedRecoveryNodeCount = context.F; - // Limit recoveries to be sent from an upper limit of `f` nodes - for (int i = 1; i <= allowedRecoveryNodeCount; i++) - { - var chosenIndex = (payload.ValidatorIndex + i) % context.Validators.Length; - if (chosenIndex != context.MyIndex) continue; - shouldSendRecovery = true; - break; - } - - if (!shouldSendRecovery) return; - } - Log($"send recovery: view={context.ViewNumber}"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryMessage() }); - } - - private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest message) - { - if (context.RequestSentOrReceived || context.NotAcceptingPayloadsDueToViewChanging) return; - if (payload.ValidatorIndex != context.Block.ConsensusData.PrimaryIndex || message.ViewNumber != context.ViewNumber) return; - Log($"{nameof(OnPrepareRequestReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} tx={message.TransactionHashes.Length}"); - if (message.Timestamp <= context.PrevHeader.Timestamp || message.Timestamp > TimeProvider.Current.UtcNow.AddMinutes(10).ToTimestampMS()) - { - Log($"Timestamp incorrect: {message.Timestamp}", LogLevel.Warning); - return; - } - if (message.TransactionHashes.Any(p => context.Snapshot.ContainsTransaction(p))) - { - Log($"Invalid request: transaction already exists", LogLevel.Warning); - return; - } - - // Timeout extension: prepare request has been received with success - // around 2*15/M=30.0/5 ~ 40% block time (for M=5) - ExtendTimerByFactor(2); - - context.Block.Timestamp = message.Timestamp; - context.Block.ConsensusData.Nonce = message.Nonce; - context.TransactionHashes = message.TransactionHashes; - context.Transactions = new Dictionary(); - for (int i = 0; i < context.PreparationPayloads.Length; i++) - if (context.PreparationPayloads[i] != null) - if (!context.PreparationPayloads[i].GetDeserializedMessage().PreparationHash.Equals(payload.Hash)) - context.PreparationPayloads[i] = null; - context.PreparationPayloads[payload.ValidatorIndex] = payload; - byte[] hashData = context.EnsureHeader().GetHashData(); - for (int i = 0; i < context.CommitPayloads.Length; i++) - if (context.CommitPayloads[i]?.ConsensusMessage.ViewNumber == context.ViewNumber) - if (!Crypto.Default.VerifySignature(hashData, context.CommitPayloads[i].GetDeserializedMessage().Signature, context.Validators[i].EncodePoint(false))) - context.CommitPayloads[i] = null; - - if (context.TransactionHashes.Length == 0) - { - // There are no tx so we should act like if all the transactions was filled - CheckPrepareResponse(); - return; - } - - Dictionary mempoolVerified = Blockchain.Singleton.MemPool.GetVerifiedTransactions().ToDictionary(p => p.Hash); - List unverified = new List(); - foreach (UInt256 hash in context.TransactionHashes) - { - if (mempoolVerified.TryGetValue(hash, out Transaction tx)) - { - if (!AddTransaction(tx, false)) - return; - } - else - { - if (Blockchain.Singleton.MemPool.TryGetValue(hash, out tx)) - unverified.Add(tx); - } - } - foreach (Transaction tx in unverified) - if (!AddTransaction(tx, true)) - return; - if (context.Transactions.Count < context.TransactionHashes.Length) - { - UInt256[] hashes = context.TransactionHashes.Where(i => !context.Transactions.ContainsKey(i)).ToArray(); - taskManager.Tell(new TaskManager.RestartTasks - { - Payload = InvPayload.Create(InventoryType.TX, hashes) - }); - } - } - - private void OnPrepareResponseReceived(ConsensusPayload payload, PrepareResponse message) - { - if (message.ViewNumber != context.ViewNumber) return; - if (context.PreparationPayloads[payload.ValidatorIndex] != null || context.NotAcceptingPayloadsDueToViewChanging) return; - if (context.PreparationPayloads[context.Block.ConsensusData.PrimaryIndex] != null && !message.PreparationHash.Equals(context.PreparationPayloads[context.Block.ConsensusData.PrimaryIndex].Hash)) - return; - - // Timeout extension: prepare response has been received with success - // around 2*15/M=30.0/5 ~ 40% block time (for M=5) - ExtendTimerByFactor(2); - - Log($"{nameof(OnPrepareResponseReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex}"); - context.PreparationPayloads[payload.ValidatorIndex] = payload; - if (context.WatchOnly || context.CommitSent) return; - if (context.RequestSentOrReceived) - CheckPreparations(); - } - - protected override void OnReceive(object message) - { - if (message is Start options) - { - if (started) return; - OnStart(options); - } - else - { - if (!started) return; - switch (message) - { - case SetViewNumber setView: - InitializeConsensus(setView.ViewNumber); - break; - case Timer timer: - OnTimer(timer); - break; - case ConsensusPayload payload: - OnConsensusPayload(payload); - break; - case Transaction transaction: - OnTransaction(transaction); - break; - case Blockchain.PersistCompleted completed: - OnPersistCompleted(completed.Block); - break; - } - } - } - - private void RequestRecovery() - { - if (context.Block.Index == Blockchain.Singleton.HeaderHeight + 1) - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryRequest() }); - } - - private void OnStart(Start options) - { - Log("OnStart"); - started = true; - if (!options.IgnoreRecoveryLogs && context.Load()) - { - if (context.Transactions != null) - { - Sender.Ask(new Blockchain.FillMemoryPool - { - Transactions = context.Transactions.Values - }).Wait(); - } - if (context.CommitSent) - { - CheckPreparations(); - return; - } - } - InitializeConsensus(0); - // Issue a ChangeView with NewViewNumber of 0 to request recovery messages on start-up. - if (!context.WatchOnly) - RequestRecovery(); - } - - private void OnTimer(Timer timer) - { - if (context.WatchOnly || context.BlockSent) return; - if (timer.Height != context.Block.Index || timer.ViewNumber != context.ViewNumber) return; - Log($"timeout: height={timer.Height} view={timer.ViewNumber}"); - if (context.IsPrimary && !context.RequestSentOrReceived) - { - SendPrepareRequest(); - } - else if ((context.IsPrimary && context.RequestSentOrReceived) || context.IsBackup) - { - if (context.CommitSent) - { - // Re-send commit periodically by sending recover message in case of a network issue. - Log($"send recovery to resend commit"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryMessage() }); - ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << 1)); - } - else - { - var reason = ChangeViewReason.Timeout; - - if (context.Block != null && context.TransactionHashes?.Count() > context.Transactions?.Count) - { - reason = ChangeViewReason.TxNotFound; - } - - RequestChangeView(reason); - } - } - } - - private void OnTransaction(Transaction transaction) - { - if (!context.IsBackup || context.NotAcceptingPayloadsDueToViewChanging || !context.RequestSentOrReceived || context.ResponseSent || context.BlockSent) - return; - if (context.Transactions.ContainsKey(transaction.Hash)) return; - if (!context.TransactionHashes.Contains(transaction.Hash)) return; - AddTransaction(transaction, true); - } - - protected override void PostStop() - { - Log("OnStop"); - started = false; - Context.System.EventStream.Unsubscribe(Self); - context.Dispose(); - base.PostStop(); - } - - public static Props Props(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet) - { - return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, store, wallet)).WithMailbox("consensus-service-mailbox"); - } - - private void RequestChangeView(ChangeViewReason reason) - { - if (context.WatchOnly) return; - // Request for next view is always one view more than the current context.ViewNumber - // Nodes will not contribute for changing to a view higher than (context.ViewNumber+1), unless they are recovered - // The latter may happen by nodes in higher views with, at least, `M` proofs - byte expectedView = context.ViewNumber; - expectedView++; - ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (expectedView + 1))); - if ((context.CountCommitted + context.CountFailed) > context.F) - { - Log($"Skip requesting change view to nv={expectedView} because nc={context.CountCommitted} nf={context.CountFailed}"); - RequestRecovery(); - return; - } - Log($"request change view: height={context.Block.Index} view={context.ViewNumber} nv={expectedView} nc={context.CountCommitted} nf={context.CountFailed}"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeChangeView(reason) }); - CheckExpectedView(expectedView); - } - - private bool ReverifyAndProcessPayload(ConsensusPayload payload) - { - if (!payload.Verify(context.Snapshot)) return false; - OnConsensusPayload(payload); - return true; - } - - private void SendPrepareRequest() - { - Log($"send prepare request: height={context.Block.Index} view={context.ViewNumber}"); - localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareRequest() }); - - if (context.Validators.Length == 1) - CheckPreparations(); - - if (context.TransactionHashes.Length > 0) - { - foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, context.TransactionHashes)) - localNode.Tell(Message.Create(MessageCommand.Inv, payload)); - } - ChangeTimer(TimeSpan.FromMilliseconds((Blockchain.MillisecondsPerBlock << (context.ViewNumber + 1)) - (context.ViewNumber == 0 ? Blockchain.MillisecondsPerBlock : 0))); - } - } - - internal class ConsensusServiceMailbox : PriorityMailbox - { - public ConsensusServiceMailbox(Akka.Actor.Settings settings, Config config) - : base(settings, config) - { - } - - internal protected override bool IsHighPriority(object message) - { - switch (message) - { - case ConsensusPayload _: - case ConsensusService.SetViewNumber _: - case ConsensusService.Timer _: - case Blockchain.PersistCompleted _: - return true; - default: - return false; - } - } - } -} +using Akka.Actor; +using Akka.Configuration; +using Neo.Cryptography; +using Neo.IO; +using Neo.IO.Actors; +using Neo.Ledger; +using Neo.Network.P2P; +using Neo.Network.P2P.Payloads; +using Neo.Persistence; +using Neo.Plugins; +using Neo.SmartContract.Native; +using Neo.Wallets; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; + +namespace Neo.Consensus +{ + public sealed class ConsensusService : UntypedActor + { + public class Start { public bool IgnoreRecoveryLogs; } + public class SetViewNumber { public byte ViewNumber; } + internal class Timer { public uint Height; public byte ViewNumber; } + + private readonly ConsensusContext context; + private readonly IActorRef localNode; + private readonly IActorRef taskManager; + private ICancelable timer_token; + private DateTime block_received_time; + private bool started = false; + + /// + /// This will record the information from last scheduled timer + /// + private DateTime clock_started = TimeProvider.Current.UtcNow; + private TimeSpan expected_delay = TimeSpan.Zero; + + /// + /// This will be cleared every block (so it will not grow out of control, but is used to prevent repeatedly + /// responding to the same message. + /// + private readonly HashSet knownHashes = new HashSet(); + /// + /// This variable is only true during OnRecoveryMessageReceived + /// + private bool isRecovering = false; + + public ConsensusService(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet) + : this(localNode, taskManager, new ConsensusContext(wallet, store)) + { + } + + internal ConsensusService(IActorRef localNode, IActorRef taskManager, ConsensusContext context) + { + this.localNode = localNode; + this.taskManager = taskManager; + this.context = context; + Context.System.EventStream.Subscribe(Self, typeof(Blockchain.PersistCompleted)); + } + + private bool AddTransaction(Transaction tx, bool verify) + { + if (verify && !tx.Verify(context.Snapshot, context.Transactions.Values)) + { + Log($"Invalid transaction: {tx.Hash}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); + RequestChangeView(ChangeViewReason.TxInvalid); + return false; + } + if (!NativeContract.Policy.CheckPolicy(tx, context.Snapshot)) + { + Log($"reject tx: {tx.Hash}{Environment.NewLine}{tx.ToArray().ToHexString()}", LogLevel.Warning); + RequestChangeView(ChangeViewReason.TxRejectedByPolicy); + return false; + } + context.Transactions[tx.Hash] = tx; + return CheckPrepareResponse(); + } + + private bool CheckPrepareResponse() + { + if (context.TransactionHashes.Length == context.Transactions.Count) + { + // if we are the primary for this view, but acting as a backup because we recovered our own + // previously sent prepare request, then we don't want to send a prepare response. + if (context.IsPrimary || context.WatchOnly) return true; + + // Check maximum block size via Native Contract policy + if (context.GetExpectedBlockSize() > NativeContract.Policy.GetMaxBlockSize(context.Snapshot)) + { + Log($"rejected block: {context.Block.Index}{Environment.NewLine} The size exceed the policy", LogLevel.Warning); + RequestChangeView(ChangeViewReason.BlockRejectedByPolicy); + return false; + } + + // Timeout extension due to prepare response sent + // around 2*15/M=30.0/5 ~ 40% block time (for M=5) + ExtendTimerByFactor(2); + + Log($"send prepare response"); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareResponse() }); + CheckPreparations(); + } + return true; + } + + private void ChangeTimer(TimeSpan delay) + { + clock_started = TimeProvider.Current.UtcNow; + expected_delay = delay; + timer_token.CancelIfNotNull(); + timer_token = Context.System.Scheduler.ScheduleTellOnceCancelable(delay, Self, new Timer + { + Height = context.Block.Index, + ViewNumber = context.ViewNumber + }, ActorRefs.NoSender); + } + + private void CheckCommits() + { + if (context.CommitPayloads.Count(p => p?.ConsensusMessage.ViewNumber == context.ViewNumber) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p))) + { + Block block = context.CreateBlock(); + Log($"relay block: height={block.Index} hash={block.Hash} tx={block.Transactions.Length}"); + localNode.Tell(new LocalNode.Relay { Inventory = block }); + } + } + + private void CheckExpectedView(byte viewNumber) + { + if (context.ViewNumber >= viewNumber) return; + // if there are `M` change view payloads with NewViewNumber greater than viewNumber, then, it is safe to move + if (context.ChangeViewPayloads.Count(p => p != null && p.GetDeserializedMessage().NewViewNumber >= viewNumber) >= context.M) + { + if (!context.WatchOnly) + { + ChangeView message = context.ChangeViewPayloads[context.MyIndex]?.GetDeserializedMessage(); + // Communicate the network about my agreement to move to `viewNumber` + // if my last change view payload, `message`, has NewViewNumber lower than current view to change + if (message is null || message.NewViewNumber < viewNumber) + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeChangeView(ChangeViewReason.ChangeAgreement) }); + } + InitializeConsensus(viewNumber); + } + } + + private void CheckPreparations() + { + if (context.PreparationPayloads.Count(p => p != null) >= context.M && context.TransactionHashes.All(p => context.Transactions.ContainsKey(p))) + { + ConsensusPayload payload = context.MakeCommit(); + Log($"send commit"); + context.Save(); + localNode.Tell(new LocalNode.SendDirectly { Inventory = payload }); + // Set timer, so we will resend the commit in case of a networking issue + ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock)); + CheckCommits(); + } + } + + private void InitializeConsensus(byte viewNumber) + { + context.Reset(viewNumber); + if (viewNumber > 0) + Log($"changeview: view={viewNumber} primary={context.Validators[context.GetPrimaryIndex((byte)(viewNumber - 1u))]}", LogLevel.Warning); + Log($"initialize: height={context.Block.Index} view={viewNumber} index={context.MyIndex} role={(context.IsPrimary ? "Primary" : context.WatchOnly ? "WatchOnly" : "Backup")}"); + if (context.WatchOnly) return; + if (context.IsPrimary) + { + if (isRecovering) + { + ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (viewNumber + 1))); + } + else + { + TimeSpan span = TimeProvider.Current.UtcNow - block_received_time; + if (span >= Blockchain.TimePerBlock) + ChangeTimer(TimeSpan.Zero); + else + ChangeTimer(Blockchain.TimePerBlock - span); + } + } + else + { + ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (viewNumber + 1))); + } + } + + private void Log(string message, LogLevel level = LogLevel.Info) + { + Plugin.Log(nameof(ConsensusService), level, message); + } + + private void OnChangeViewReceived(ConsensusPayload payload, ChangeView message) + { + if (message.NewViewNumber <= context.ViewNumber) + OnRecoveryRequestReceived(payload); + + if (context.CommitSent) return; + + var expectedView = context.ChangeViewPayloads[payload.ValidatorIndex]?.GetDeserializedMessage().NewViewNumber ?? (byte)0; + if (message.NewViewNumber <= expectedView) + return; + + Log($"{nameof(OnChangeViewReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} nv={message.NewViewNumber}"); + context.ChangeViewPayloads[payload.ValidatorIndex] = payload; + CheckExpectedView(message.NewViewNumber); + } + + private void OnCommitReceived(ConsensusPayload payload, Commit commit) + { + ref ConsensusPayload existingCommitPayload = ref context.CommitPayloads[payload.ValidatorIndex]; + if (existingCommitPayload != null) + { + if (existingCommitPayload.Hash != payload.Hash) + Log($"{nameof(OnCommitReceived)}: different commit from validator! height={payload.BlockIndex} index={payload.ValidatorIndex} view={commit.ViewNumber} existingView={existingCommitPayload.ConsensusMessage.ViewNumber}", LogLevel.Warning); + return; + } + + // Timeout extension: commit has been received with success + // around 4*15s/M=60.0s/5=12.0s ~ 80% block time (for M=5) + ExtendTimerByFactor(4); + + if (commit.ViewNumber == context.ViewNumber) + { + Log($"{nameof(OnCommitReceived)}: height={payload.BlockIndex} view={commit.ViewNumber} index={payload.ValidatorIndex} nc={context.CountCommitted} nf={context.CountFailed}"); + + byte[] hashData = context.EnsureHeader()?.GetHashData(); + if (hashData == null) + { + existingCommitPayload = payload; + } + else if (Crypto.Default.VerifySignature(hashData, commit.Signature, + context.Validators[payload.ValidatorIndex].EncodePoint(false))) + { + existingCommitPayload = payload; + CheckCommits(); + } + return; + } + // Receiving commit from another view + Log($"{nameof(OnCommitReceived)}: record commit for different view={commit.ViewNumber} index={payload.ValidatorIndex} height={payload.BlockIndex}"); + existingCommitPayload = payload; + } + + // this function increases existing timer (never decreases) with a value proportional to `maxDelayInBlockTimes`*`Blockchain.MillisecondsPerBlock` + private void ExtendTimerByFactor(int maxDelayInBlockTimes) + { + TimeSpan nextDelay = expected_delay - (TimeProvider.Current.UtcNow - clock_started) + TimeSpan.FromMilliseconds(maxDelayInBlockTimes * Blockchain.MillisecondsPerBlock / context.M); + if (!context.WatchOnly && !context.ViewChanging && !context.CommitSent && (nextDelay > TimeSpan.Zero)) + ChangeTimer(nextDelay); + } + + private void OnConsensusPayload(ConsensusPayload payload) + { + if (context.BlockSent) return; + if (payload.Version != context.Block.Version) return; + if (payload.PrevHash != context.Block.PrevHash || payload.BlockIndex != context.Block.Index) + { + if (context.Block.Index < payload.BlockIndex) + { + Log($"chain sync: expected={payload.BlockIndex} current={context.Block.Index - 1} nodes={LocalNode.Singleton.ConnectedCount}", LogLevel.Warning); + } + return; + } + if (payload.ValidatorIndex >= context.Validators.Length) return; + ConsensusMessage message; + try + { + message = payload.ConsensusMessage; + } + catch (FormatException) + { + return; + } + catch (IOException) + { + return; + } + context.LastSeenMessage[payload.ValidatorIndex] = (int)payload.BlockIndex; + foreach (IP2PPlugin plugin in Plugin.P2PPlugins) + if (!plugin.OnConsensusMessage(payload)) + return; + switch (message) + { + case ChangeView view: + OnChangeViewReceived(payload, view); + break; + case PrepareRequest request: + OnPrepareRequestReceived(payload, request); + break; + case PrepareResponse response: + OnPrepareResponseReceived(payload, response); + break; + case Commit commit: + OnCommitReceived(payload, commit); + break; + case RecoveryRequest _: + OnRecoveryRequestReceived(payload); + break; + case RecoveryMessage recovery: + OnRecoveryMessageReceived(payload, recovery); + break; + } + } + + private void OnPersistCompleted(Block block) + { + Log($"persist block: height={block.Index} hash={block.Hash} tx={block.Transactions.Length}"); + block_received_time = TimeProvider.Current.UtcNow; + knownHashes.Clear(); + InitializeConsensus(0); + } + + private void OnRecoveryMessageReceived(ConsensusPayload payload, RecoveryMessage message) + { + // isRecovering is always set to false again after OnRecoveryMessageReceived + isRecovering = true; + int validChangeViews = 0, totalChangeViews = 0, validPrepReq = 0, totalPrepReq = 0; + int validPrepResponses = 0, totalPrepResponses = 0, validCommits = 0, totalCommits = 0; + + Log($"{nameof(OnRecoveryMessageReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex}"); + try + { + if (message.ViewNumber > context.ViewNumber) + { + if (context.CommitSent) return; + ConsensusPayload[] changeViewPayloads = message.GetChangeViewPayloads(context, payload); + totalChangeViews = changeViewPayloads.Length; + foreach (ConsensusPayload changeViewPayload in changeViewPayloads) + if (ReverifyAndProcessPayload(changeViewPayload)) validChangeViews++; + } + if (message.ViewNumber == context.ViewNumber && !context.NotAcceptingPayloadsDueToViewChanging && !context.CommitSent) + { + if (!context.RequestSentOrReceived) + { + ConsensusPayload prepareRequestPayload = message.GetPrepareRequestPayload(context, payload); + if (prepareRequestPayload != null) + { + totalPrepReq = 1; + if (ReverifyAndProcessPayload(prepareRequestPayload)) validPrepReq++; + } + else if (context.IsPrimary) + SendPrepareRequest(); + } + ConsensusPayload[] prepareResponsePayloads = message.GetPrepareResponsePayloads(context, payload); + totalPrepResponses = prepareResponsePayloads.Length; + foreach (ConsensusPayload prepareResponsePayload in prepareResponsePayloads) + if (ReverifyAndProcessPayload(prepareResponsePayload)) validPrepResponses++; + } + if (message.ViewNumber <= context.ViewNumber) + { + // Ensure we know about all commits from lower view numbers. + ConsensusPayload[] commitPayloads = message.GetCommitPayloadsFromRecoveryMessage(context, payload); + totalCommits = commitPayloads.Length; + foreach (ConsensusPayload commitPayload in commitPayloads) + if (ReverifyAndProcessPayload(commitPayload)) validCommits++; + } + } + finally + { + Log($"{nameof(OnRecoveryMessageReceived)}: finished (valid/total) " + + $"ChgView: {validChangeViews}/{totalChangeViews} " + + $"PrepReq: {validPrepReq}/{totalPrepReq} " + + $"PrepResp: {validPrepResponses}/{totalPrepResponses} " + + $"Commits: {validCommits}/{totalCommits}"); + isRecovering = false; + } + } + + private void OnRecoveryRequestReceived(ConsensusPayload payload) + { + // We keep track of the payload hashes received in this block, and don't respond with recovery + // in response to the same payload that we already responded to previously. + // ChangeView messages include a Timestamp when the change view is sent, thus if a node restarts + // and issues a change view for the same view, it will have a different hash and will correctly respond + // again; however replay attacks of the ChangeView message from arbitrary nodes will not trigger an + // additional recovery message response. + if (!knownHashes.Add(payload.Hash)) return; + + Log($"On{payload.ConsensusMessage.GetType().Name}Received: height={payload.BlockIndex} index={payload.ValidatorIndex} view={payload.ConsensusMessage.ViewNumber}"); + if (context.WatchOnly) return; + if (!context.CommitSent) + { + bool shouldSendRecovery = false; + int allowedRecoveryNodeCount = context.F; + // Limit recoveries to be sent from an upper limit of `f` nodes + for (int i = 1; i <= allowedRecoveryNodeCount; i++) + { + var chosenIndex = (payload.ValidatorIndex + i) % context.Validators.Length; + if (chosenIndex != context.MyIndex) continue; + shouldSendRecovery = true; + break; + } + + if (!shouldSendRecovery) return; + } + Log($"send recovery: view={context.ViewNumber}"); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryMessage() }); + } + + private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest message) + { + if (context.RequestSentOrReceived || context.NotAcceptingPayloadsDueToViewChanging) return; + if (payload.ValidatorIndex != context.Block.ConsensusData.PrimaryIndex || message.ViewNumber != context.ViewNumber) return; + Log($"{nameof(OnPrepareRequestReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex} tx={message.TransactionHashes.Length}"); + if (message.Timestamp <= context.PrevHeader.Timestamp || message.Timestamp > TimeProvider.Current.UtcNow.AddMinutes(10).ToTimestampMS()) + { + Log($"Timestamp incorrect: {message.Timestamp}", LogLevel.Warning); + return; + } + if (message.TransactionHashes.Any(p => context.Snapshot.ContainsTransaction(p))) + { + Log($"Invalid request: transaction already exists", LogLevel.Warning); + return; + } + + // Timeout extension: prepare request has been received with success + // around 2*15/M=30.0/5 ~ 40% block time (for M=5) + ExtendTimerByFactor(2); + + context.Block.Timestamp = message.Timestamp; + context.Block.ConsensusData.Nonce = message.Nonce; + context.TransactionHashes = message.TransactionHashes; + context.Transactions = new Dictionary(); + for (int i = 0; i < context.PreparationPayloads.Length; i++) + if (context.PreparationPayloads[i] != null) + if (!context.PreparationPayloads[i].GetDeserializedMessage().PreparationHash.Equals(payload.Hash)) + context.PreparationPayloads[i] = null; + context.PreparationPayloads[payload.ValidatorIndex] = payload; + byte[] hashData = context.EnsureHeader().GetHashData(); + for (int i = 0; i < context.CommitPayloads.Length; i++) + if (context.CommitPayloads[i]?.ConsensusMessage.ViewNumber == context.ViewNumber) + if (!Crypto.Default.VerifySignature(hashData, context.CommitPayloads[i].GetDeserializedMessage().Signature, context.Validators[i].EncodePoint(false))) + context.CommitPayloads[i] = null; + + if (context.TransactionHashes.Length == 0) + { + // There are no tx so we should act like if all the transactions were filled + CheckPrepareResponse(); + return; + } + + Dictionary mempoolVerified = Blockchain.Singleton.MemPool.GetVerifiedTransactions().ToDictionary(p => p.Hash); + List unverified = new List(); + foreach (UInt256 hash in context.TransactionHashes) + { + if (mempoolVerified.TryGetValue(hash, out Transaction tx)) + { + if (!AddTransaction(tx, false)) + return; + } + else + { + if (Blockchain.Singleton.MemPool.TryGetValue(hash, out tx)) + unverified.Add(tx); + } + } + foreach (Transaction tx in unverified) + if (!AddTransaction(tx, true)) + return; + if (context.Transactions.Count < context.TransactionHashes.Length) + { + UInt256[] hashes = context.TransactionHashes.Where(i => !context.Transactions.ContainsKey(i)).ToArray(); + taskManager.Tell(new TaskManager.RestartTasks + { + Payload = InvPayload.Create(InventoryType.TX, hashes) + }); + } + } + + private void OnPrepareResponseReceived(ConsensusPayload payload, PrepareResponse message) + { + if (message.ViewNumber != context.ViewNumber) return; + if (context.PreparationPayloads[payload.ValidatorIndex] != null || context.NotAcceptingPayloadsDueToViewChanging) return; + if (context.PreparationPayloads[context.Block.ConsensusData.PrimaryIndex] != null && !message.PreparationHash.Equals(context.PreparationPayloads[context.Block.ConsensusData.PrimaryIndex].Hash)) + return; + + // Timeout extension: prepare response has been received with success + // around 2*15/M=30.0/5 ~ 40% block time (for M=5) + ExtendTimerByFactor(2); + + Log($"{nameof(OnPrepareResponseReceived)}: height={payload.BlockIndex} view={message.ViewNumber} index={payload.ValidatorIndex}"); + context.PreparationPayloads[payload.ValidatorIndex] = payload; + if (context.WatchOnly || context.CommitSent) return; + if (context.RequestSentOrReceived) + CheckPreparations(); + } + + protected override void OnReceive(object message) + { + if (message is Start options) + { + if (started) return; + OnStart(options); + } + else + { + if (!started) return; + switch (message) + { + case SetViewNumber setView: + InitializeConsensus(setView.ViewNumber); + break; + case Timer timer: + OnTimer(timer); + break; + case ConsensusPayload payload: + OnConsensusPayload(payload); + break; + case Transaction transaction: + OnTransaction(transaction); + break; + case Blockchain.PersistCompleted completed: + OnPersistCompleted(completed.Block); + break; + } + } + } + + private void RequestRecovery() + { + if (context.Block.Index == Blockchain.Singleton.HeaderHeight + 1) + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryRequest() }); + } + + private void OnStart(Start options) + { + Log("OnStart"); + started = true; + if (!options.IgnoreRecoveryLogs && context.Load()) + { + if (context.Transactions != null) + { + Sender.Ask(new Blockchain.FillMemoryPool + { + Transactions = context.Transactions.Values + }).Wait(); + } + if (context.CommitSent) + { + CheckPreparations(); + return; + } + } + InitializeConsensus(0); + // Issue a ChangeView with NewViewNumber of 0 to request recovery messages on start-up. + if (!context.WatchOnly) + RequestRecovery(); + } + + private void OnTimer(Timer timer) + { + if (context.WatchOnly || context.BlockSent) return; + if (timer.Height != context.Block.Index || timer.ViewNumber != context.ViewNumber) return; + Log($"timeout: height={timer.Height} view={timer.ViewNumber}"); + if (context.IsPrimary && !context.RequestSentOrReceived) + { + SendPrepareRequest(); + } + else if ((context.IsPrimary && context.RequestSentOrReceived) || context.IsBackup) + { + if (context.CommitSent) + { + // Re-send commit periodically by sending recover message in case of a network issue. + Log($"send recovery to resend commit"); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeRecoveryMessage() }); + ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << 1)); + } + else + { + var reason = ChangeViewReason.Timeout; + + if (context.Block != null && context.TransactionHashes?.Count() > context.Transactions?.Count) + { + reason = ChangeViewReason.TxNotFound; + } + + RequestChangeView(reason); + } + } + } + + private void OnTransaction(Transaction transaction) + { + if (!context.IsBackup || context.NotAcceptingPayloadsDueToViewChanging || !context.RequestSentOrReceived || context.ResponseSent || context.BlockSent) + return; + if (context.Transactions.ContainsKey(transaction.Hash)) return; + if (!context.TransactionHashes.Contains(transaction.Hash)) return; + AddTransaction(transaction, true); + } + + protected override void PostStop() + { + Log("OnStop"); + started = false; + Context.System.EventStream.Unsubscribe(Self); + context.Dispose(); + base.PostStop(); + } + + public static Props Props(IActorRef localNode, IActorRef taskManager, Store store, Wallet wallet) + { + return Akka.Actor.Props.Create(() => new ConsensusService(localNode, taskManager, store, wallet)).WithMailbox("consensus-service-mailbox"); + } + + private void RequestChangeView(ChangeViewReason reason) + { + if (context.WatchOnly) return; + // Request for next view is always one view more than the current context.ViewNumber + // Nodes will not contribute for changing to a view higher than (context.ViewNumber+1), unless they are recovered + // The latter may happen by nodes in higher views with, at least, `M` proofs + byte expectedView = context.ViewNumber; + expectedView++; + ChangeTimer(TimeSpan.FromMilliseconds(Blockchain.MillisecondsPerBlock << (expectedView + 1))); + if ((context.CountCommitted + context.CountFailed) > context.F) + { + Log($"Skip requesting change view to nv={expectedView} because nc={context.CountCommitted} nf={context.CountFailed}"); + RequestRecovery(); + return; + } + Log($"request change view: height={context.Block.Index} view={context.ViewNumber} nv={expectedView} nc={context.CountCommitted} nf={context.CountFailed}"); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakeChangeView(reason) }); + CheckExpectedView(expectedView); + } + + private bool ReverifyAndProcessPayload(ConsensusPayload payload) + { + if (!payload.Verify(context.Snapshot)) return false; + OnConsensusPayload(payload); + return true; + } + + private void SendPrepareRequest() + { + Log($"send prepare request: height={context.Block.Index} view={context.ViewNumber}"); + localNode.Tell(new LocalNode.SendDirectly { Inventory = context.MakePrepareRequest() }); + + if (context.Validators.Length == 1) + CheckPreparations(); + + if (context.TransactionHashes.Length > 0) + { + foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, context.TransactionHashes)) + localNode.Tell(Message.Create(MessageCommand.Inv, payload)); + } + ChangeTimer(TimeSpan.FromMilliseconds((Blockchain.MillisecondsPerBlock << (context.ViewNumber + 1)) - (context.ViewNumber == 0 ? Blockchain.MillisecondsPerBlock : 0))); + } + } + + internal class ConsensusServiceMailbox : PriorityMailbox + { + public ConsensusServiceMailbox(Akka.Actor.Settings settings, Config config) + : base(settings, config) + { + } + + internal protected override bool IsHighPriority(object message) + { + switch (message) + { + case ConsensusPayload _: + case ConsensusService.SetViewNumber _: + case ConsensusService.Timer _: + case Blockchain.PersistCompleted _: + return true; + default: + return false; + } + } + } +} From 28167aae19098936e4e142e98e4f618ef0b470c7 Mon Sep 17 00:00:00 2001 From: Vitor Date: Sat, 24 Aug 2019 14:01:53 -0300 Subject: [PATCH 3/4] Reverting all changes on CC --- neo/Consensus/ConsensusContext.cs | 114 +++++++++++++++--------------- 1 file changed, 58 insertions(+), 56 deletions(-) diff --git a/neo/Consensus/ConsensusContext.cs b/neo/Consensus/ConsensusContext.cs index b81d16a923..a06c6c965b 100644 --- a/neo/Consensus/ConsensusContext.cs +++ b/neo/Consensus/ConsensusContext.cs @@ -6,7 +6,7 @@ using Neo.Persistence; using Neo.SmartContract; using Neo.SmartContract.Native; -using Neo.VM; +using Neo.VM; using Neo.Wallets; using System; using System.Collections.Generic; @@ -39,11 +39,11 @@ internal class ConsensusContext : IDisposable, ISerializable public Snapshot Snapshot { get; private set; } private KeyPair keyPair; - private int _witnessSize; + private int _witnessSize; private readonly Wallet wallet; private readonly Store store; private readonly Random random = new Random(); - + public int F => (Validators.Length - 1) / 3; public int M => Validators.Length - F; public bool IsPrimary => MyIndex == Block.ConsensusData.PrimaryIndex; @@ -205,40 +205,40 @@ private void SignPayload(ConsensusPayload payload) return; } payload.Witness = sc.GetWitnesses()[0]; - } - + } + /// /// Return the expected block size /// - internal int GetExpectedBlockSize() - { - return GetExpectedBlockSizeWithoutTransactions(Transactions.Count) + // Base size - Transactions.Values.Sum(u => u.Size); // Sum Txs + internal int GetExpectedBlockSize() + { + return GetExpectedBlockSizeWithoutTransactions(Transactions.Count) + // Base size + Transactions.Values.Sum(u => u.Size); // Sum Txs } - /// - /// Return the expected block size without txs - /// - /// Expected transactions - internal int GetExpectedBlockSizeWithoutTransactions(int expectedTransactions) - { - var blockSize = - // BlockBase - sizeof(uint) + //Version - UInt256.Length + //PrevHash - UInt256.Length + //MerkleRoot - sizeof(ulong) + //Timestamp - sizeof(uint) + //Index - UInt160.Length + //NextConsensus - 1 + // - _witnessSize; //Witness - - blockSize += - // Block - Block.ConsensusData.Size + //ConsensusData - IO.Helper.GetVarSize(expectedTransactions + 1); //Transactions count - - return blockSize; + /// + /// Return the expected block size without txs + /// + /// Expected transactions + internal int GetExpectedBlockSizeWithoutTransactions(int expectedTransactions) + { + var blockSize = + // BlockBase + sizeof(uint) + //Version + UInt256.Length + //PrevHash + UInt256.Length + //MerkleRoot + sizeof(ulong) + //Timestamp + sizeof(uint) + //Index + UInt160.Length + //NextConsensus + 1 + // + _witnessSize; //Witness + + blockSize += + // Block + Block.ConsensusData.Size + //ConsensusData + IO.Helper.GetVarSize(expectedTransactions + 1); //Transactions count + + return blockSize; } /// @@ -247,16 +247,18 @@ internal int GetExpectedBlockSizeWithoutTransactions(int expectedTransactions) /// Ordered transactions internal void EnsureMaxBlockSize(IEnumerable txs) { - uint maxBlockSize = NativeContract.Policy.GetMaxBlockSize(Snapshot); - uint maxTransactionsPerBlock = NativeContract.Policy.GetMaxTransactionsPerBlock(Snapshot); + uint maxBlockSize = NativeContract.Policy.GetMaxBlockSize(Snapshot); + uint maxTransactionsPerBlock = NativeContract.Policy.GetMaxTransactionsPerBlock(Snapshot); // Limit Speaker proposal to the limit `MaxTransactionsPerBlock` or all available transactions of the mempool txs = txs.Take((int)maxTransactionsPerBlock); List hashes = new List(); Transactions = new Dictionary(); - - // Expected block size - var blockSize = GetExpectedBlockSizeWithoutTransactions(txs.Count()); + Block.Transactions = new Transaction[0]; + + // We need to know the expected block size + + var blockSize = GetExpectedBlockSizeWithoutTransactions(txs.Count()); // Iterate transaction until reach the size @@ -269,8 +271,8 @@ internal void EnsureMaxBlockSize(IEnumerable txs) hashes.Add(tx.Hash); Transactions.Add(tx.Hash, tx); } - - TransactionHashes = hashes.ToArray(); + + TransactionHashes = hashes.ToArray(); } public ConsensusPayload MakePrepareRequest() @@ -279,8 +281,8 @@ public ConsensusPayload MakePrepareRequest() random.NextBytes(buffer); Block.ConsensusData.Nonce = BitConverter.ToUInt64(buffer, 0); EnsureMaxBlockSize(Blockchain.Singleton.MemPool.GetSortedVerifiedTransactions()); - Block.Timestamp = Math.Max(TimeProvider.Current.UtcNow.ToTimestampMS(), PrevHeader.Timestamp + 1); - + Block.Timestamp = Math.Max(TimeProvider.Current.UtcNow.ToTimestampMS(), PrevHeader.Timestamp + 1); + return PreparationPayloads[MyIndex] = MakeSignedPayload(new PrepareRequest { Timestamp = Block.Timestamp, @@ -346,21 +348,21 @@ public void Reset(byte viewNumber) }; var pv = Validators; Validators = NativeContract.NEO.GetNextBlockValidators(Snapshot); - if (_witnessSize == 0 || (pv != null && pv.Length != Validators.Length)) - { - // Compute the expected size of the witness - using (ScriptBuilder sb = new ScriptBuilder()) - { - for (int x = 0; x < M; x++) - { - sb.EmitPush(new byte[64]); - } - _witnessSize = new Witness - { - InvocationScript = sb.ToArray(), - VerificationScript = Contract.CreateMultiSigRedeemScript(M, Validators) - }.Size; - } + if (_witnessSize == 0 || (pv != null && pv.Length != Validators.Length)) + { + // Compute the expected size of the witness + using (ScriptBuilder sb = new ScriptBuilder()) + { + for (int x = 0; x < M; x++) + { + sb.EmitPush(new byte[64]); + } + _witnessSize = new Witness + { + InvocationScript = sb.ToArray(), + VerificationScript = Contract.CreateMultiSigRedeemScript(M, Validators) + }.Size; + } } MyIndex = -1; ChangeViewPayloads = new ConsensusPayload[Validators.Length]; From 8c89ca8d78637424b4775fc4142a0a606cda6f2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Naz=C3=A1rio=20Coelho?= Date: Sat, 24 Aug 2019 19:26:32 -0300 Subject: [PATCH 4/4] Modifying ConsensusContext with proposed changes --- neo/Consensus/ConsensusContext.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/neo/Consensus/ConsensusContext.cs b/neo/Consensus/ConsensusContext.cs index fbb21d8cd7..4574d70f7e 100644 --- a/neo/Consensus/ConsensusContext.cs +++ b/neo/Consensus/ConsensusContext.cs @@ -254,14 +254,11 @@ internal void EnsureMaxBlockSize(IEnumerable txs) txs = txs.Take((int)maxTransactionsPerBlock); List hashes = new List(); Transactions = new Dictionary(); - Block.Transactions = new Transaction[0]; - - // We need to know the expected block size + // Expected block size var blockSize = GetExpectedBlockSizeWithoutTransactions(txs.Count()); // Iterate transaction until reach the size - foreach (Transaction tx in txs) { // Check if maximum block size has been already exceeded with the current selected set