diff --git a/neo/Consensus/ConsensusContext.cs b/neo/Consensus/ConsensusContext.cs index d7d9b1bcfd..924709fc9e 100644 --- a/neo/Consensus/ConsensusContext.cs +++ b/neo/Consensus/ConsensusContext.cs @@ -202,7 +202,7 @@ public void Reset() public void Fill() { - IEnumerable mem_pool = Blockchain.Singleton.GetMemoryPool(); + IEnumerable mem_pool = Blockchain.Singleton.MemPool.GetSortedVerifiedTransactions(); foreach (IPolicyPlugin plugin in Plugin.Policies) mem_pool = plugin.FilterForBlock(mem_pool); List transactions = mem_pool.ToList(); diff --git a/neo/Consensus/ConsensusService.cs b/neo/Consensus/ConsensusService.cs index 77b727e0c8..6cd80b38f9 100644 --- a/neo/Consensus/ConsensusService.cs +++ b/neo/Consensus/ConsensusService.cs @@ -219,19 +219,20 @@ private void OnPrepareRequestReceived(ConsensusPayload payload, PrepareRequest m if (!Crypto.Default.VerifySignature(hashData, context.Signatures[i], context.Validators[i].EncodePoint(false))) context.Signatures[i] = null; context.Signatures[payload.ValidatorIndex] = message.Signature; - Dictionary mempool = Blockchain.Singleton.GetMemoryPool().ToDictionary(p => p.Hash); + Dictionary mempoolVerified = Blockchain.Singleton.MemPool.GetVerifiedTransactions().ToDictionary(p => p.Hash); + List unverified = new List(); foreach (UInt256 hash in context.TransactionHashes.Skip(1)) { - if (mempool.TryGetValue(hash, out Transaction tx)) + if (mempoolVerified.TryGetValue(hash, out Transaction tx)) { if (!AddTransaction(tx, false)) return; } else { - tx = Blockchain.Singleton.GetUnverifiedTransaction(hash); - if (tx != null) + + if (Blockchain.Singleton.MemPool.TryGetValue(hash, out tx)) unverified.Add(tx); } } diff --git a/neo/Ledger/Blockchain.cs b/neo/Ledger/Blockchain.cs index db9ecb1cf5..fa9d7dc5fb 100644 --- a/neo/Ledger/Blockchain.cs +++ b/neo/Ledger/Blockchain.cs @@ -1,6 +1,5 @@ using Akka.Actor; using Akka.Configuration; -using Neo.Cryptography; using Neo.Cryptography.ECC; using Neo.IO; using Neo.IO.Actors; @@ -12,10 +11,8 @@ using Neo.SmartContract; using Neo.VM; using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Numerics; using System.Threading; namespace Neo.Ledger @@ -114,19 +111,20 @@ public class ImportCompleted { } } }; + private const int MemoryPoolMaxTransactions = 50_000; + private const int MaxTxToReverifyPerIdle = 10; private static readonly object lockObj = new object(); private readonly NeoSystem system; private readonly List header_index = new List(); private uint stored_header_count = 0; private readonly Dictionary block_cache = new Dictionary(); private readonly Dictionary> block_cache_unverified = new Dictionary>(); - private readonly MemoryPool mem_pool = new MemoryPool(50_000); - private readonly ConcurrentDictionary mem_pool_unverified = new ConcurrentDictionary(); internal readonly RelayCache RelayCache = new RelayCache(100); private readonly HashSet subscribers = new HashSet(); private Snapshot currentSnapshot; public Store Store { get; } + public MemoryPool MemPool { get; } public uint Height => currentSnapshot.Height; public uint HeaderHeight => (uint)header_index.Count - 1; public UInt256 CurrentBlockHash => currentSnapshot.CurrentBlockHash; @@ -150,6 +148,7 @@ static Blockchain() public Blockchain(NeoSystem system, Store store) { this.system = system; + this.MemPool = new MemoryPool(system, MemoryPoolMaxTransactions); this.Store = store; lock (lockObj) { @@ -190,7 +189,7 @@ public bool ContainsBlock(UInt256 hash) public bool ContainsTransaction(UInt256 hash) { - if (mem_pool.ContainsKey(hash)) return true; + if (MemPool.ContainsKey(hash)) return true; return Store.ContainsTransaction(hash); } @@ -218,11 +217,6 @@ public static UInt160 GetConsensusAddress(ECPoint[] validators) return Contract.CreateMultiSigRedeemScript(validators.Length - (validators.Length - 1) / 3, validators).ToScriptHash(); } - public IEnumerable GetMemoryPool() - { - return mem_pool; - } - public Snapshot GetSnapshot() { return Store.GetSnapshot(); @@ -230,17 +224,11 @@ public Snapshot GetSnapshot() public Transaction GetTransaction(UInt256 hash) { - if (mem_pool.TryGetValue(hash, out Transaction transaction)) + if (MemPool.TryGetValue(hash, out Transaction transaction)) return transaction; return Store.GetTransaction(hash); } - internal Transaction GetUnverifiedTransaction(UInt256 hash) - { - mem_pool_unverified.TryGetValue(hash, out Transaction transaction); - return transaction; - } - private void OnImport(IEnumerable blocks) { foreach (Block block in blocks) @@ -264,7 +252,7 @@ private void AddUnverifiedBlockToCache(Block block) blocks.AddLast(block); } - + private RelayResultReason OnNewBlock(Block block) { if (block.Index <= Height) @@ -289,7 +277,7 @@ private RelayResultReason OnNewBlock(Block block) if (block.Index == Height + 1) { Block block_persist = block; - List blocksToPersistList = new List(); + List blocksToPersistList = new List(); while (true) { blocksToPersistList.Add(block_persist); @@ -315,7 +303,7 @@ private RelayResultReason OnNewBlock(Block block) if (block_cache_unverified.TryGetValue(Height + 1, out LinkedList unverifiedBlocks)) { foreach (var unverifiedBlock in unverifiedBlocks) - Self.Tell(unverifiedBlock, ActorRefs.NoSender); + Self.Tell(unverifiedBlock, ActorRefs.NoSender); block_cache_unverified.Remove(Height + 1); } } @@ -385,12 +373,14 @@ private RelayResultReason OnNewTransaction(Transaction transaction) return RelayResultReason.Invalid; if (ContainsTransaction(transaction.Hash)) return RelayResultReason.AlreadyExists; - if (!transaction.Verify(currentSnapshot, GetMemoryPool())) + if (!MemPool.CanTransactionFitInPool(transaction)) + return RelayResultReason.OutOfMemory; + if (!transaction.Verify(currentSnapshot, MemPool.GetVerifiedTransactions())) return RelayResultReason.Invalid; if (!Plugin.CheckPolicy(transaction)) return RelayResultReason.PolicyFail; - if (!mem_pool.TryAdd(transaction.Hash, transaction)) + if (!MemPool.TryAdd(transaction.Hash, transaction)) return RelayResultReason.OutOfMemory; system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = transaction }); @@ -400,18 +390,7 @@ private RelayResultReason OnNewTransaction(Transaction transaction) private void OnPersistCompleted(Block block) { block_cache.Remove(block.Hash); - foreach (Transaction tx in block.Transactions) - mem_pool.TryRemove(tx.Hash, out _); - mem_pool_unverified.Clear(); - foreach (Transaction tx in mem_pool - .OrderByDescending(p => p.NetworkFee / p.Size) - .ThenByDescending(p => p.NetworkFee) - .ThenByDescending(p => new BigInteger(p.Hash.ToArray()))) - { - mem_pool_unverified.TryAdd(tx.Hash, tx); - Self.Tell(tx, ActorRefs.NoSender); - } - mem_pool.Clear(); + MemPool.UpdatePoolForBlockPersisted(block, currentSnapshot); PersistCompleted completed = new PersistCompleted { Block = block }; system.Consensus?.Tell(completed); Distribute(completed); @@ -439,6 +418,10 @@ protected override void OnReceive(object message) case ConsensusPayload payload: Sender.Tell(OnNewConsensus(payload)); break; + case Idle _: + if (MemPool.ReVerifyTopUnverifiedTransactionsIfNeeded(MaxTxToReverifyPerIdle, currentSnapshot)) + Self.Tell(Idle.Instance, ActorRefs.NoSender); + break; case Terminated terminated: subscribers.Remove(terminated.ActorRef); break; diff --git a/neo/Ledger/MemoryPool.cs b/neo/Ledger/MemoryPool.cs index 28aea62113..1d05679934 100644 --- a/neo/Ledger/MemoryPool.cs +++ b/neo/Ledger/MemoryPool.cs @@ -1,165 +1,513 @@ using Neo.Network.P2P.Payloads; using System; using System.Collections; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; -using System.Numerics; +using System.Runtime.CompilerServices; +using System.Threading; +using Akka.Util.Internal; +using Neo.Network.P2P; +using Neo.Persistence; +using Neo.Plugins; namespace Neo.Ledger { - internal class MemoryPool : IReadOnlyCollection + public class MemoryPool : IReadOnlyCollection { - private class PoolItem + private class PoolItem : IComparable { public readonly Transaction Transaction; public readonly DateTime Timestamp; + public DateTime LastBroadcastTimestamp; public PoolItem(Transaction tx) { Transaction = tx; Timestamp = DateTime.UtcNow; + LastBroadcastTimestamp = Timestamp; + } + + public int CompareTo(Transaction tx) + { + if (tx == null) return 1; + int ret = Transaction.FeePerByte.CompareTo(tx.FeePerByte); + if (ret != 0) return ret; + ret = Transaction.NetworkFee.CompareTo(tx.NetworkFee); + if (ret != 0) return ret; + + return Transaction.Hash.CompareTo(tx.Hash); + } + + public int CompareTo(PoolItem otherItem) + { + if (otherItem == null) return 1; + return CompareTo(otherItem.Transaction); } } - private readonly ConcurrentDictionary _mem_pool_fee = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _mem_pool_free = new ConcurrentDictionary(); + // Allow reverified transactions to be rebroadcast if it has been this many block times since last broadcast. + private const int BlocksTillRebroadcastLowPriorityPoolTx = 30; + private const int BlocksTillRebroadcastHighPriorityPoolTx = 10; + + private static readonly double MaxSecondsToReverifyHighPrioTx = (double) Blockchain.SecondsPerBlock / 3; + private static readonly double MaxSecondsToReverifyLowPrioTx = (double) Blockchain.SecondsPerBlock / 5; + + // These two are not expected to be hit, they are just safegaurds. + private static readonly double MaxSecondsToReverifyHighPrioTxPerIdle = (double) Blockchain.SecondsPerBlock / 15; + private static readonly double MaxSecondsToReverifyLowPrioTxPerIdle = (double) Blockchain.SecondsPerBlock / 30; + private readonly NeoSystem _system; + + // + /// + /// Guarantees consistency of the pool data structures. + /// + /// Note: The data structures are only modified from the `Blockchain` actor; so operations guaranteed to be + /// performed by the blockchain actor do not need to acquire the read lock; they only need the write + /// lock for write operations. + /// + private readonly ReaderWriterLockSlim _txRwLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); + + /// + /// Store all verified unsorted transactions currently in the pool. + /// + private readonly Dictionary _unsortedTransactions = new Dictionary(); + /// + /// Stores the verified low priority sorted transactions currently in the pool. + /// + private readonly SortedSet _sortedLowPrioTransactions = new SortedSet(); + /// + /// Stores the verified high priority sorted transactins currently in the pool. + /// + private readonly SortedSet _sortedHighPrioTransactions = new SortedSet(); + + /// + /// Store the unverified transactions currently in the pool. + /// + /// Transactions in this data structure were valid in some prior block, but may no longer be valid. + /// The top ones that could make it into the next block get verified and moved into the verified data structures + /// (_unsortedTransactions, _sortedLowPrioTransactions, and _sortedHighPrioTransactions) after each block. + /// + private readonly Dictionary _unverifiedTransactions = new Dictionary(); + private readonly SortedSet _unverifiedSortedHighPriorityTransactions = new SortedSet(); + private readonly SortedSet _unverifiedSortedLowPriorityTransactions = new SortedSet(); + + private int _maxTxPerBlock; + private int _maxLowPriorityTxPerBlock; + + /// + /// Total maximum capacity of transactions the pool can hold. + /// public int Capacity { get; } - public int Count => _mem_pool_fee.Count + _mem_pool_free.Count; - public MemoryPool(int capacity) + /// + /// Total count of transactions in the pool. + /// + public int Count { + get + { + _txRwLock.EnterReadLock(); + try + { + return _unsortedTransactions.Count + _unverifiedTransactions.Count; + } + finally + { + _txRwLock.ExitReadLock(); + } + } + } + + /// + /// Total count of verified transactions in the pool. + /// + public int VerifiedCount => _unsortedTransactions.Count; // read of 32 bit type is atomic (no lock) + + public int UnVerifiedCount => _unverifiedTransactions.Count; + + public MemoryPool(NeoSystem system, int capacity) + { + _system = system; Capacity = capacity; + LoadMaxTxLimitsFromPolicyPlugins(); } - public void Clear() + public void LoadMaxTxLimitsFromPolicyPlugins() { - _mem_pool_free.Clear(); - _mem_pool_fee.Clear(); + _maxTxPerBlock = int.MaxValue; + _maxLowPriorityTxPerBlock = int.MaxValue; + foreach (IPolicyPlugin plugin in Plugin.Policies) + { + _maxTxPerBlock = Math.Min(_maxTxPerBlock, plugin.MaxTxPerBlock); + _maxLowPriorityTxPerBlock = Math.Min(_maxLowPriorityTxPerBlock, plugin.MaxLowPriorityTxPerBlock); + } + } + + /// + /// Determine whether the pool is holding this transaction and has at some point verified it. + /// Note: The pool may not have verified it since the last block was persisted. To get only the + /// transactions that have been verified during this block use GetVerifiedTransactions() + /// + /// the transaction hash + /// true if the MemoryPool contain the transaction + public bool ContainsKey(UInt256 hash) + { + _txRwLock.EnterReadLock(); + try + { + return _unsortedTransactions.ContainsKey(hash) + || _unverifiedTransactions.ContainsKey(hash); + } + finally + { + _txRwLock.ExitReadLock(); + } } - public bool ContainsKey(UInt256 hash) => _mem_pool_free.ContainsKey(hash) || _mem_pool_fee.ContainsKey(hash); + public bool TryGetValue(UInt256 hash, out Transaction tx) + { + _txRwLock.EnterReadLock(); + try + { + bool ret = _unsortedTransactions.TryGetValue(hash, out PoolItem item) + || _unverifiedTransactions.TryGetValue(hash, out item); + tx = ret ? item.Transaction : null; + return ret; + } + finally + { + _txRwLock.ExitReadLock(); + } + } + // Note: This isn't used in Fill during consensus, fill uses GetSortedVerifiedTransactions() public IEnumerator GetEnumerator() { - return - _mem_pool_fee.Select(p => p.Value.Transaction) - .Concat(_mem_pool_free.Select(p => p.Value.Transaction)) - .GetEnumerator(); + _txRwLock.EnterReadLock(); + try + { + return _unsortedTransactions.Select(p => p.Value.Transaction) + .Concat(_unverifiedTransactions.Select(p => p.Value.Transaction)) + .ToList() + .GetEnumerator(); + } + finally + { + _txRwLock.ExitReadLock(); + } } IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - static void RemoveLowestFee(ConcurrentDictionary pool, int count) + public IEnumerable GetVerifiedTransactions() { - if (count <= 0) return; - if (count >= pool.Count) + _txRwLock.EnterReadLock(); + try { - pool.Clear(); + return _unsortedTransactions.Select(p => p.Value.Transaction).ToArray(); } - else + finally { - UInt256[] delete = pool.AsParallel() - .OrderBy(p => p.Value.Transaction.NetworkFee / p.Value.Transaction.Size) - .ThenBy(p => p.Value.Transaction.NetworkFee) - .ThenBy(p => new BigInteger(p.Key.ToArray())) - .Take(count) - .Select(p => p.Key) - .ToArray(); + _txRwLock.ExitReadLock(); + } + } - foreach (UInt256 hash in delete) - { - pool.TryRemove(hash, out _); - } + public void GetVerifiedAndUnverifiedTransactions(out IEnumerable verifiedTransactions, + out IEnumerable unverifiedTransactions) + { + _txRwLock.EnterReadLock(); + try + { + verifiedTransactions = _sortedHighPrioTransactions.Select(p => p.Transaction) + .Concat(_sortedLowPrioTransactions.Select(p => p.Transaction)).ToArray(); + unverifiedTransactions = _unverifiedTransactions.Select(p => p.Value.Transaction).ToArray(); + } + finally + { + _txRwLock.ExitReadLock(); + } + } + + public IEnumerable GetSortedVerifiedTransactions() + { + _txRwLock.EnterReadLock(); + try + { + return _sortedHighPrioTransactions.Select(p => p.Transaction) + .Concat(_sortedLowPrioTransactions.Select(p => p.Transaction)) + .ToArray(); + } + finally + { + _txRwLock.ExitReadLock(); } } - static void RemoveOldest(ConcurrentDictionary pool, DateTime time) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private PoolItem GetLowestFeeTransaction(SortedSet verifiedTxSorted, + SortedSet unverifiedTxSorted, out SortedSet sortedPool) { - UInt256[] hashes = pool - .Where(p => p.Value.Timestamp < time) - .Select(p => p.Key) - .ToArray(); + PoolItem minItem = unverifiedTxSorted.Min; + sortedPool = minItem != null ? unverifiedTxSorted : null; + + PoolItem verifiedMin = verifiedTxSorted.Min; + if (verifiedMin == null) return minItem; + + if (minItem != null && verifiedMin.CompareTo(minItem) >= 0) + return minItem; - foreach (UInt256 hash in hashes) + sortedPool = verifiedTxSorted; + minItem = verifiedMin; + + return minItem; + } + + private PoolItem GetLowestFeeTransaction(out Dictionary unsortedTxPool, out SortedSet sortedPool) + { + var minItem = GetLowestFeeTransaction(_sortedLowPrioTransactions, _unverifiedSortedLowPriorityTransactions, + out sortedPool); + + if (minItem != null) { - pool.TryRemove(hash, out _); + unsortedTxPool = sortedPool == _unverifiedSortedLowPriorityTransactions + ? _unverifiedTransactions : _unsortedTransactions; + return minItem; + } + + try + { + return GetLowestFeeTransaction(_sortedHighPrioTransactions, _unverifiedSortedHighPriorityTransactions, + out sortedPool); + } + finally + { + unsortedTxPool = sortedPool == _unverifiedSortedHighPriorityTransactions + ? _unverifiedTransactions : _unsortedTransactions; } } - public bool TryAdd(UInt256 hash, Transaction tx) + // Note: this must only be called from a single thread (the Blockchain actor) + internal bool CanTransactionFitInPool(Transaction tx) { - ConcurrentDictionary pool; + if (Count < Capacity) return true; + + return GetLowestFeeTransaction(out _, out _).CompareTo(tx) <= 0; + } - if (tx.IsLowPriority) + /// + /// + /// Note: This must only be called from a single thread (the Blockchain actor) to add a transaction to the pool + /// one should tell the Blockchain actor about the transaction. + /// + /// + /// + /// + internal bool TryAdd(UInt256 hash, Transaction tx) + { + var poolItem = new PoolItem(tx); + + if (_unsortedTransactions.ContainsKey(hash)) return false; + + _txRwLock.EnterWriteLock(); + try { - pool = _mem_pool_free; + _unsortedTransactions.Add(hash, poolItem); + + SortedSet pool = tx.IsLowPriority ? _sortedLowPrioTransactions : _sortedHighPrioTransactions; + pool.Add(poolItem); + RemoveOverCapacity(); } - else + finally { - pool = _mem_pool_fee; + _txRwLock.ExitWriteLock(); } - pool.TryAdd(hash, new PoolItem(tx)); + return _unsortedTransactions.ContainsKey(hash); + } - if (Count > Capacity) + private void RemoveOverCapacity() + { + while (Count > Capacity) { - RemoveOldest(_mem_pool_free, DateTime.UtcNow.AddSeconds(-Blockchain.SecondsPerBlock * 20)); + PoolItem minItem = GetLowestFeeTransaction(out var unsortedPool, out var sortedPool); - var exceed = Count - Capacity; + unsortedPool.Remove(minItem.Transaction.Hash); + sortedPool.Remove(minItem); + } + } - if (exceed > 0) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool TryRemoveVerified(UInt256 hash, out PoolItem item) + { + if (!_unsortedTransactions.TryGetValue(hash, out item)) + return false; + + _unsortedTransactions.Remove(hash); + SortedSet pool = item.Transaction.IsLowPriority + ? _sortedLowPrioTransactions : _sortedHighPrioTransactions; + pool.Remove(item); + return true; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool TryRemoveUnVerified(UInt256 hash, out PoolItem item) + { + if (!_unverifiedTransactions.TryGetValue(hash, out item)) + return false; + + _unverifiedTransactions.Remove(hash); + SortedSet pool = item.Transaction.IsLowPriority + ? _unverifiedSortedLowPriorityTransactions : _unverifiedSortedHighPriorityTransactions; + pool.Remove(item); + return true; + } + + // Note: this must only be called from a single thread (the Blockchain actor) + internal void UpdatePoolForBlockPersisted(Block block, Snapshot snapshot) + { + _txRwLock.EnterWriteLock(); + try + { + // First remove the transactions verified in the block. + foreach (Transaction tx in block.Transactions) { - RemoveLowestFee(_mem_pool_free, exceed); - exceed = Count - Capacity; + if (TryRemoveVerified(tx.Hash, out _)) continue; + TryRemoveUnVerified(tx.Hash, out _); + } - if (exceed > 0) - { - RemoveLowestFee(_mem_pool_fee, exceed); - } + // Add all the previously verified transactions back to the unverified transactions + foreach (PoolItem item in _sortedHighPrioTransactions) + { + if (_unverifiedTransactions.TryAdd(item.Transaction.Hash, item)) + _unverifiedSortedHighPriorityTransactions.Add(item); } + + foreach (PoolItem item in _sortedLowPrioTransactions) + { + if (_unverifiedTransactions.TryAdd(item.Transaction.Hash, item)) + _unverifiedSortedLowPriorityTransactions.Add(item); + } + + // Clear the verified transactions now, since they all must be reverified. + _unsortedTransactions.Clear(); + _sortedHighPrioTransactions.Clear(); + _sortedLowPrioTransactions.Clear(); + } + finally + { + _txRwLock.ExitWriteLock(); } - return pool.ContainsKey(hash); + // If we know about headers of future blocks, no point in verifying transactions from the unverified tx pool + // until we get caught up. + if (block.Index < Blockchain.Singleton.HeaderHeight) + return; + + if (Plugin.Policies.Count == 0) + return; + + LoadMaxTxLimitsFromPolicyPlugins(); + + ReverifyTransactions(_sortedHighPrioTransactions, _unverifiedSortedHighPriorityTransactions, + _maxTxPerBlock, MaxSecondsToReverifyHighPrioTx, snapshot); + ReverifyTransactions(_sortedLowPrioTransactions, _unverifiedSortedLowPriorityTransactions, + _maxLowPriorityTxPerBlock, MaxSecondsToReverifyLowPrioTx, snapshot); } - public bool TryRemove(UInt256 hash, out Transaction tx) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private int ReverifyTransactions(SortedSet verifiedSortedTxPool, + SortedSet unverifiedSortedTxPool, int count, double secondsTimeout, Snapshot snapshot) { - if (_mem_pool_free.TryRemove(hash, out PoolItem item)) + DateTime reverifyCutOffTimeStamp = DateTime.UtcNow.AddSeconds(secondsTimeout); + + List reverifiedItems = new List(count); + List invalidItems = new List(); + foreach (PoolItem item in unverifiedSortedTxPool.Reverse().Take(count)) { - tx = item.Transaction; - return true; + // Re-verify the top fee max high priority transactions that can be verified in a block + if (item.Transaction.Verify(snapshot, _unsortedTransactions.Select(p => p.Value.Transaction))) + reverifiedItems.Add(item); + else // Transaction no longer valid -- will be removed from unverifiedTxPool. + invalidItems.Add(item); + + if (DateTime.UtcNow > reverifyCutOffTimeStamp) break; } - else if (_mem_pool_fee.TryRemove(hash, out item)) + + _txRwLock.EnterWriteLock(); + try { - tx = item.Transaction; - return true; + int blocksTillRebroadcast = unverifiedSortedTxPool == _sortedHighPrioTransactions + ? BlocksTillRebroadcastHighPriorityPoolTx : BlocksTillRebroadcastLowPriorityPoolTx; + + var rebroadcastCutOffTime = DateTime.UtcNow.AddSeconds( + -Blockchain.SecondsPerBlock * blocksTillRebroadcast); + foreach (PoolItem item in reverifiedItems) + { + if (_unsortedTransactions.TryAdd(item.Transaction.Hash, item)) + { + verifiedSortedTxPool.Add(item); + + if (item.LastBroadcastTimestamp < rebroadcastCutOffTime) + { + _system.LocalNode.Tell(new LocalNode.RelayDirectly { Inventory = item.Transaction }, _system.Blockchain); + item.LastBroadcastTimestamp = DateTime.UtcNow; + } + } + + _unverifiedTransactions.Remove(item.Transaction.Hash); + unverifiedSortedTxPool.Remove(item); + } + + foreach (PoolItem item in invalidItems) + { + _unverifiedTransactions.Remove(item.Transaction.Hash); + unverifiedSortedTxPool.Remove(item); + } } - else + finally { - tx = null; - return false; + _txRwLock.ExitWriteLock(); } + + return reverifiedItems.Count; } - public bool TryGetValue(UInt256 hash, out Transaction tx) + /// + /// Reverify up to a given maximum count of transactions. Verifies less at a time once the max that can be + /// persisted per block has been reached. + /// + /// Note: this must only be called from a single thread (the Blockchain actor) + /// + /// Max transactions to reverify, the value passed should be >=2. If 1 is passed it + /// will still potentially use 2. + /// The snapshot to use for verifying. + /// true if more unsorted messages exist, otherwise false + internal bool ReVerifyTopUnverifiedTransactionsIfNeeded(int maxToVerify, Snapshot snapshot) { - if (_mem_pool_free.TryGetValue(hash, out PoolItem item)) - { - tx = item.Transaction; - return true; - } - else if (_mem_pool_fee.TryGetValue(hash, out item)) + if (Blockchain.Singleton.Height < Blockchain.Singleton.HeaderHeight) + return false; + + if (_unverifiedSortedHighPriorityTransactions.Count > 0) { - tx = item.Transaction; - return true; + // Always leave at least 1 tx for low priority tx + int verifyCount = _sortedHighPrioTransactions.Count > _maxTxPerBlock || maxToVerify == 1 + ? 1 : maxToVerify - 1; + maxToVerify -= ReverifyTransactions(_sortedHighPrioTransactions, _unverifiedSortedHighPriorityTransactions, + verifyCount, MaxSecondsToReverifyHighPrioTxPerIdle, snapshot); + + if (maxToVerify == 0) maxToVerify++; } - else + + if (_unverifiedSortedLowPriorityTransactions.Count > 0) { - tx = null; - return false; + int verifyCount = _sortedLowPrioTransactions.Count > _maxLowPriorityTxPerBlock + ? 1 : maxToVerify; + ReverifyTransactions(_sortedLowPrioTransactions, _unverifiedSortedLowPriorityTransactions, + verifyCount, MaxSecondsToReverifyLowPrioTxPerIdle, snapshot); } + + return _unverifiedTransactions.Count > 0; } } } diff --git a/neo/Network/P2P/ProtocolHandler.cs b/neo/Network/P2P/ProtocolHandler.cs index 572c9b92fe..94766528de 100644 --- a/neo/Network/P2P/ProtocolHandler.cs +++ b/neo/Network/P2P/ProtocolHandler.cs @@ -264,7 +264,7 @@ private void OnInvMessageReceived(InvPayload payload) private void OnMemPoolMessageReceived() { - foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, Blockchain.Singleton.GetMemoryPool().Select(p => p.Hash).ToArray())) + foreach (InvPayload payload in InvPayload.CreateGroup(InventoryType.TX, Blockchain.Singleton.MemPool.GetVerifiedTransactions().Select(p => p.Hash).ToArray())) Context.Parent.Tell(Message.Create("inv", payload)); } diff --git a/neo/Network/RPC/RpcServer.cs b/neo/Network/RPC/RpcServer.cs index 0c090c770f..0f22ad4851 100644 --- a/neo/Network/RPC/RpcServer.cs +++ b/neo/Network/RPC/RpcServer.cs @@ -298,7 +298,20 @@ private JObject Process(string method, JArray _params) return json; } case "getrawmempool": - return new JArray(Blockchain.Singleton.GetMemoryPool().Select(p => (JObject)p.Hash.ToString())); + { + bool shouldGetUnverified = _params.Count >= 1 && _params[0].AsBooleanOrDefault(false); + if (!shouldGetUnverified) + return new JArray(Blockchain.Singleton.MemPool.GetVerifiedTransactions().Select(p => (JObject)p.Hash.ToString())); + + JObject json = new JObject(); + json["height"] = Blockchain.Singleton.Height; + Blockchain.Singleton.MemPool.GetVerifiedAndUnverifiedTransactions( + out IEnumerable verifiedTransactions, + out IEnumerable unverifiedTransactions); + json["verified"] = new JArray(verifiedTransactions.Select(p => (JObject) p.Hash.ToString())); + json["unverified"] = new JArray(unverifiedTransactions.Select(p => (JObject) p.Hash.ToString())); + return json; + } case "getrawtransaction": { UInt256 hash = UInt256.Parse(_params[0].AsString()); diff --git a/neo/Plugins/IPolicyPlugin.cs b/neo/Plugins/IPolicyPlugin.cs index 812d418ee2..9952cf2532 100644 --- a/neo/Plugins/IPolicyPlugin.cs +++ b/neo/Plugins/IPolicyPlugin.cs @@ -7,5 +7,7 @@ public interface IPolicyPlugin { bool FilterForMemoryPool(Transaction tx); IEnumerable FilterForBlock(IEnumerable transactions); + int MaxTxPerBlock { get; } + int MaxLowPriorityTxPerBlock { get; } } }