From 0c3ac1a7f74a4b76ea313faf416996ae1ab61fc1 Mon Sep 17 00:00:00 2001 From: Gav Date: Thu, 17 May 2018 10:55:11 +0200 Subject: [PATCH 1/3] Add extra bootnode for poc-1. --- polkadot/service/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index bcb454260e933..1fce482a12923 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -186,6 +186,7 @@ fn poc_1_testnet_config() -> ChainConfig { "enode://a93a29fa68d965452bf0ff8c1910f5992fe2273a72a1ee8d3a3482f68512a61974211ba32bb33f051ceb1530b8ba3527fc36224ba6b9910329025e6d9153cf50@104.211.54.233:30333".into(), "enode://051b18f63a316c4c5fef4631f8c550ae0adba179153588406fac3e5bbbbf534ebeda1bf475dceda27a531f6cdef3846ab6a010a269aa643a1fec7bff51af66bd@104.211.48.51:30333".into(), "enode://c831ec9011d2c02d2c4620fc88db6d897a40d2f88fd75f47b9e4cf3b243999acb6f01b7b7343474650b34eeb1363041a422a91f1fc3850e43482983ee15aa582@104.211.48.247:30333".into(), + "enode://4d10d3422f611f07c03d38227b8d3aa88e11e9c2b5d5dd2e9519085836f3cd0c978a5dce647441b272469b0fb0c8ec1cc0d35453fe623b3a98276ffea7a794d0@40.117.153.33:30333".into(), ]; ChainConfig { genesis_config, boot_nodes } } From 6fe12237e6e29f8221f7d975223c708ffc20ac3f Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 17 May 2018 13:47:53 +0200 Subject: [PATCH 2/3] Archive Mode and groundwork for state-preserving handles (#166) * less eager deletion of DB values * archive mode --- polkadot/consensus/src/service.rs | 1 - substrate/client/db/src/lib.rs | 153 ++++++++++++++++++++----- substrate/client/src/backend.rs | 9 +- substrate/client/src/in_mem.rs | 36 +++--- substrate/state-machine/src/backend.rs | 59 +++++++--- substrate/state-machine/src/lib.rs | 9 +- 6 files changed, 202 insertions(+), 65 deletions(-) diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index ce2b562a475cd..97384040e9c86 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -198,7 +198,6 @@ fn start_bft( } }; - let input = Messages { network_stream: network.bft_messages(parent_hash), local_id: bft_service.local_id(), diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 9d2b875701404..5a21021862944 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -96,12 +96,6 @@ struct PendingBlock { is_best: bool, } -/// Database transaction -pub struct BlockImportOperation { - pending_state: DbState, - pending_block: Option, -} - #[derive(Clone)] struct Meta { best_hash: HeaderHash, @@ -261,11 +255,18 @@ impl client::blockchain::Backend for BlockchainDb { } } +/// Database transaction +pub struct BlockImportOperation { + old_state: DbState, + updates: MemoryDB, + pending_block: Option, +} + impl client::backend::BlockImportOperation for BlockImportOperation { type State = DbState; fn state(&self) -> Result<&Self::State, client::error::Error> { - Ok(&self.pending_state) + Ok(&self.old_state) } fn set_block_data(&mut self, header: block::Header, body: Option, justification: Option, is_best: bool) -> Result<(), client::error::Error> { @@ -280,14 +281,14 @@ impl client::backend::BlockImportOperation for BlockImportOperation { } fn update_storage(&mut self, update: MemoryDB) -> Result<(), client::error::Error> { - self.pending_state.commit(update); + self.updates = update; Ok(()) } fn reset_storage, Vec)>>(&mut self, iter: I) -> Result<(), client::error::Error> { // TODO: wipe out existing trie. - let (_, update) = self.pending_state.storage_root(iter.into_iter().map(|(k, v)| (k, Some(v)))); - self.pending_state.commit(update); + let (_, update) = self.old_state.storage_root(iter.into_iter().map(|(k, v)| (k, Some(v)))); + self.updates = update; Ok(()) } } @@ -341,10 +342,10 @@ impl<'a> HashDB for Ephemeral<'a> { } /// DB-backed patricia trie state, transaction type is an overlay of changes to commit. +#[derive(Clone)] pub struct DbState { db: Arc, root: TrieH256, - updates: MemoryDB, } impl state_machine::Backend for DbState { @@ -364,10 +365,6 @@ impl state_machine::Backend for DbState { .get(key).map(|x| x.map(|val| val.to_vec())).map_err(map_e) } - fn commit(&mut self, transaction: MemoryDB) { - self.updates = transaction; - } - fn pairs(&self) -> Vec<(Vec, Vec)> { let mut read_overlay = MemoryDB::default(); let eph = Ephemeral { @@ -423,10 +420,12 @@ impl state_machine::Backend for DbState { } } -/// In-memory backend. Keeps all states and blocks in memory. Useful for testing. +/// Disk backend. Keeps data in a key-value store. In archive mode, trie nodes are kept from all blocks. +/// Otherwise, trie nodes are kept only from the most recent block. pub struct Backend { db: Arc, blockchain: BlockchainDb, + archive: bool, } impl Backend { @@ -438,22 +437,23 @@ impl Backend { let path = config.path.to_str().ok_or_else(|| client::error::ErrorKind::Backend("Invalid database path".into()))?; let db = Arc::new(Database::open(&db_config, &path).map_err(db_err)?); - Backend::from_kvdb(db as Arc<_>) + Backend::from_kvdb(db as Arc<_>, true) } #[cfg(test)] fn new_test() -> Backend { let db = Arc::new(::kvdb_memorydb::create(columns::NUM_COLUMNS)); - Backend::from_kvdb(db as Arc<_>).expect("failed to create test-db") + Backend::from_kvdb(db as Arc<_>, false).expect("failed to create test-db") } - fn from_kvdb(db: Arc) -> Result { + fn from_kvdb(db: Arc, archive: bool) -> Result { let blockchain = BlockchainDb::new(db.clone())?; Ok(Backend { db, blockchain, + archive }) } } @@ -467,7 +467,8 @@ impl client::backend::Backend for Backend { let state = self.state_at(block)?; Ok(BlockImportOperation { pending_block: None, - pending_state: state, + old_state: state, + updates: MemoryDB::default(), }) } @@ -488,10 +489,10 @@ impl client::backend::Backend for Backend { if pending_block.is_best { transaction.put(columns::META, meta::BEST_BLOCK, &key); } - for (key, (val, rc)) in operation.pending_state.updates.drain() { + for (key, (val, rc)) in operation.updates.drain() { if rc > 0 { transaction.put(columns::STATE, &key.0[..], &val); - } else { + } else if rc < 0 && !self.archive { transaction.delete(columns::STATE, &key.0[..]); } } @@ -518,7 +519,6 @@ impl client::backend::Backend for Backend { return Ok(DbState { db: self.db.clone(), - updates: Default::default(), root, }) } @@ -528,7 +528,6 @@ impl client::backend::Backend for Backend { self.blockchain.header(block).and_then(|maybe_hdr| maybe_hdr.map(|hdr| { DbState { db: self.db.clone(), - updates: Default::default(), root: hdr.state_root.0.into(), } }).ok_or_else(|| client::error::ErrorKind::UnknownBlock(block).into())) @@ -595,7 +594,7 @@ mod tests { (vec![1, 2, 3], vec![9, 9, 9]), ]; - header.state_root = op.pending_state.storage_root(storage + header.state_root = op.old_state.storage_root(storage .iter() .cloned() .map(|(x, y)| (x, Some(y))) @@ -634,7 +633,7 @@ mod tests { (vec![5, 5, 5], Some(vec![4, 5, 6])), ]; - let (root, overlay) = op.pending_state.storage_root(storage.iter().cloned()); + let (root, overlay) = op.old_state.storage_root(storage.iter().cloned()); op.update_storage(overlay).unwrap(); header.state_root = root.into(); @@ -654,4 +653,106 @@ mod tests { assert_eq!(state.storage(&[5, 5, 5]).unwrap(), Some(vec![4, 5, 6])); } } + + #[test] + fn delete_only_when_negative_rc() { + let key; + let db = Backend::new_test(); + + { + let mut op = db.begin_operation(BlockId::Hash(Default::default())).unwrap(); + let mut header = block::Header { + number: 0, + parent_hash: Default::default(), + state_root: Default::default(), + digest: Default::default(), + extrinsics_root: Default::default(), + }; + + let storage: Vec<(_, _)> = vec![]; + + header.state_root = op.old_state.storage_root(storage + .iter() + .cloned() + .map(|(x, y)| (x, Some(y))) + ).0.into(); + + op.reset_storage(storage.iter().cloned()).unwrap(); + + key = op.updates.insert(b"hello"); + op.set_block_data( + header, + Some(vec![]), + None, + true + ).unwrap(); + + db.commit_operation(op).unwrap(); + + assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]); + } + + { + let mut op = db.begin_operation(BlockId::Number(0)).unwrap(); + let mut header = block::Header { + number: 1, + parent_hash: Default::default(), + state_root: Default::default(), + digest: Default::default(), + extrinsics_root: Default::default(), + }; + + let storage: Vec<(_, _)> = vec![]; + + header.state_root = op.old_state.storage_root(storage + .iter() + .cloned() + .map(|(x, y)| (x, Some(y))) + ).0.into(); + + op.updates.insert(b"hello"); + op.updates.remove(&key); + op.set_block_data( + header, + Some(vec![]), + None, + true + ).unwrap(); + + db.commit_operation(op).unwrap(); + + assert_eq!(db.db.get(::columns::STATE, &key.0[..]).unwrap().unwrap(), &b"hello"[..]); + } + + { + let mut op = db.begin_operation(BlockId::Number(1)).unwrap(); + let mut header = block::Header { + number: 1, + parent_hash: Default::default(), + state_root: Default::default(), + digest: Default::default(), + extrinsics_root: Default::default(), + }; + + let storage: Vec<(_, _)> = vec![]; + + header.state_root = op.old_state.storage_root(storage + .iter() + .cloned() + .map(|(x, y)| (x, Some(y))) + ).0.into(); + + op.updates.remove(&key); + op.set_block_data( + header, + Some(vec![]), + None, + true + ).unwrap(); + + db.commit_operation(op).unwrap(); + + assert!(db.db.get(::columns::STATE, &key.0[..]).unwrap().is_none()); + } + } } diff --git a/substrate/client/src/backend.rs b/substrate/client/src/backend.rs index 1890668c26bae..dad95dc0fdbee 100644 --- a/substrate/client/src/backend.rs +++ b/substrate/client/src/backend.rs @@ -37,6 +37,13 @@ pub trait BlockImportOperation { } /// Client backend. Manages the data layer. +/// +/// Note on state pruning: while an object from `state_at` is alive, the state +/// should not be pruned. The backend should internally reference-count +/// its state objects. +/// +/// The same applies for live `BlockImportOperation`s: while an import operation building on a parent `P` +/// is alive, the state for `P` should not be pruned. pub trait Backend { /// Associated block insertion operation type. type BlockImportOperation: BlockImportOperation; @@ -52,6 +59,6 @@ pub trait Backend { fn commit_operation(&self, transaction: Self::BlockImportOperation) -> error::Result<()>; /// Returns reference to blockchain backend. fn blockchain(&self) -> &Self::Blockchain; - /// Returns state backend for specified block. + /// Returns state backend with post-state of given block. fn state_at(&self, block: BlockId) -> error::Result; } diff --git a/substrate/client/src/in_mem.rs b/substrate/client/src/in_mem.rs index e3989b8ac4d8e..138b6f27a24f8 100644 --- a/substrate/client/src/in_mem.rs +++ b/substrate/client/src/in_mem.rs @@ -18,14 +18,13 @@ use std::collections::HashMap; use parking_lot::RwLock; -use state_machine; use error; use backend; use runtime_support::Hashable; use primitives; use primitives::block::{self, Id as BlockId, HeaderHash}; use blockchain::{self, BlockStatus}; -use state_machine::backend::Backend as StateBackend; +use state_machine::backend::{Backend as StateBackend, InMemory}; fn header_hash(header: &block::Header) -> block::HeaderHash { header.blake2_256().into() @@ -43,12 +42,6 @@ struct Block { body: Option, } -/// In-memory operation. -pub struct BlockImportOperation { - pending_block: Option, - pending_state: state_machine::backend::InMemory, -} - #[derive(Clone)] struct BlockchainStorage { blocks: HashMap, @@ -160,11 +153,18 @@ impl blockchain::Backend for Blockchain { } } +/// In-memory operation. +pub struct BlockImportOperation { + pending_block: Option, + old_state: InMemory, + new_state: Option, +} + impl backend::BlockImportOperation for BlockImportOperation { - type State = state_machine::backend::InMemory; + type State = InMemory; fn state(&self) -> error::Result<&Self::State> { - Ok(&self.pending_state) + Ok(&self.old_state) } fn set_block_data(&mut self, header: block::Header, body: Option, justification: Option, is_new_best: bool) -> error::Result<()> { @@ -180,20 +180,20 @@ impl backend::BlockImportOperation for BlockImportOperation { Ok(()) } - fn update_storage(&mut self, update: ::Transaction) -> error::Result<()> { - self.pending_state.commit(update); + fn update_storage(&mut self, update: ::Transaction) -> error::Result<()> { + self.new_state = Some(self.old_state.update(update)); Ok(()) } fn reset_storage, Vec)>>(&mut self, iter: I) -> error::Result<()> { - self.pending_state = state_machine::backend::InMemory::from(iter.collect()); + self.new_state = Some(InMemory::from(iter.collect::>())); Ok(()) } } /// In-memory backend. Keeps all states and blocks in memory. Useful for testing. pub struct Backend { - states: RwLock>, + states: RwLock>, blockchain: Blockchain, } @@ -210,7 +210,7 @@ impl Backend { impl backend::Backend for Backend { type BlockImportOperation = BlockImportOperation; type Blockchain = Blockchain; - type State = state_machine::backend::InMemory; + type State = InMemory; fn begin_operation(&self, block: BlockId) -> error::Result { let state = match block { @@ -220,14 +220,16 @@ impl backend::Backend for Backend { Ok(BlockImportOperation { pending_block: None, - pending_state: state, + old_state: state, + new_state: None, }) } fn commit_operation(&self, operation: Self::BlockImportOperation) -> error::Result<()> { if let Some(pending_block) = operation.pending_block { let hash = header_hash(&pending_block.block.header); - self.states.write().insert(hash, operation.pending_state); + let old_state = &operation.old_state; + self.states.write().insert(hash, operation.new_state.unwrap_or_else(|| old_state.clone())); self.blockchain.insert(hash, pending_block.block.header, pending_block.block.justification, pending_block.block.body, pending_block.is_best); } Ok(()) diff --git a/substrate/state-machine/src/backend.rs b/substrate/state-machine/src/backend.rs index 9c90a47be161a..da68b932feff3 100644 --- a/substrate/state-machine/src/backend.rs +++ b/substrate/state-machine/src/backend.rs @@ -18,10 +18,13 @@ use std::{error, fmt}; use std::collections::HashMap; +use std::sync::Arc; /// A state backend is used to read state data and can have changes committed /// to it. -pub trait Backend { +/// +/// The clone operation should be cheap. +pub trait Backend: Clone { /// An error type when fetching data is not possible. type Error: super::Error; @@ -36,9 +39,6 @@ pub trait Backend { fn storage_root(&self, delta: I) -> ([u8; 32], Self::Transaction) where I: IntoIterator, Option>)>; - /// Commit updates to the backend and get new state. - fn commit(&mut self, tx: Self::Transaction); - /// Get all key/value pairs into a Vec. fn pairs(&self) -> Vec<(Vec, Vec)>; } @@ -60,20 +60,54 @@ impl error::Error for Void { /// In-memory backend. Fully recomputes tries on each commit but useful for /// tests. -pub type InMemory = HashMap, Vec>; +#[derive(Clone, PartialEq, Eq)] +pub struct InMemory { + inner: Arc, Vec>>, +} + +impl Default for InMemory { + fn default() -> Self { + InMemory { + inner: Arc::new(Default::default()), + } + } +} + +impl InMemory { + /// Copy the state, with applied updates + pub fn update(&self, changes: ::Transaction) -> Self { + let mut inner: HashMap<_, _> = (&*self.inner).clone(); + for (key, val) in changes { + match val { + Some(v) => { inner.insert(key, v); }, + None => { inner.remove(&key); }, + } + } + + inner.into() + } +} + +impl From, Vec>> for InMemory { + fn from(inner: HashMap, Vec>) -> Self { + InMemory { + inner: Arc::new(inner), + } + } +} impl Backend for InMemory { type Error = Void; type Transaction = Vec<(Vec, Option>)>; fn storage(&self, key: &[u8]) -> Result>, Self::Error> { - Ok(self.get(key).map(Clone::clone)) + Ok(self.inner.get(key).map(Clone::clone)) } fn storage_root(&self, delta: I) -> ([u8; 32], Self::Transaction) where I: IntoIterator, Option>)> { - let existing_pairs = self.iter().map(|(k, v)| (k.clone(), Some(v.clone()))); + let existing_pairs = self.inner.iter().map(|(k, v)| (k.clone(), Some(v.clone()))); let transaction: Vec<_> = delta.into_iter().collect(); let root = ::triehash::trie_root(existing_pairs.chain(transaction.iter().cloned()) @@ -85,17 +119,8 @@ impl Backend for InMemory { (root, transaction) } - fn commit(&mut self, changes: Self::Transaction) { - for (key, val) in changes { - match val { - Some(v) => { self.insert(key, v); }, - None => { self.remove(&key); }, - } - } - } - fn pairs(&self) -> Vec<(Vec, Vec)> { - self.iter().map(|(k, v)| (k.clone(), v.clone())).collect() + self.inner.iter().map(|(k, v)| (k.clone(), v.clone())).collect() } } diff --git a/substrate/state-machine/src/lib.rs b/substrate/state-machine/src/lib.rs index 72fa063dbb554..c8aa4900980ed 100644 --- a/substrate/state-machine/src/lib.rs +++ b/substrate/state-machine/src/lib.rs @@ -145,11 +145,13 @@ pub trait CodeExecutor: Sized + Send + Sync { } /// Execute a call using the given state backend, overlayed changes, and call executor. +/// Produces a state-backend-specific "transaction" which can be used to apply the changes +/// to the backing store, such as the disk. /// /// On an error, no prospective changes are written to the overlay. /// /// Note: changes to code will be in place if this call is made again. For running partial -/// blocks (e.g. a transaction at a time), ensure a differrent method is used. +/// blocks (e.g. a transaction at a time), ensure a different method is used. pub fn execute( backend: &B, overlay: &mut OverlayedChanges, @@ -228,12 +230,13 @@ mod tests { #[test] fn overlayed_storage_root_works() { - let backend = InMemory::from(map![ + let initial: HashMap<_, _> = map![ b"doe".to_vec() => b"reindeer".to_vec(), b"dog".to_vec() => b"puppyXXX".to_vec(), b"dogglesworth".to_vec() => b"catXXX".to_vec(), b"doug".to_vec() => b"notadog".to_vec() - ]); + ]; + let backend = InMemory::from(initial); let mut overlay = OverlayedChanges { committed: map![ b"dog".to_vec() => Some(b"puppy".to_vec()), From 512931b8cecd0aa1d9df5424015b5694400f72e2 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Thu, 31 May 2018 18:31:03 +0200 Subject: [PATCH 3/3] Backport of #191 (#193) * Merge branch 'master' into gav-result-dispatch * Fix merge. --- substrate/client/src/client.rs | 41 ++++++++-- substrate/network/src/consensus.rs | 117 +++++++++++++++++++++-------- substrate/network/src/protocol.rs | 8 +- 3 files changed, 121 insertions(+), 45 deletions(-) diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 30ae5a261134a..17fa6a8a51816 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -17,9 +17,9 @@ //! Substrate Client use futures::sync::mpsc; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; use primitives::{self, block, AuthorityId}; -use primitives::block::Id as BlockId; +use primitives::block::{Id as BlockId, HeaderHash}; use primitives::storage::{StorageKey, StorageData}; use runtime_support::Hashable; use codec::{KeyedVec, Slicable}; @@ -37,6 +37,8 @@ pub struct Client { backend: B, executor: E, import_notification_sinks: Mutex>>, + import_lock: Mutex<()>, + importing_block: RwLock>, // holds the block hash currently being imported. TODO: replace this with block queue } /// A source of blockchain evenets. @@ -181,6 +183,8 @@ impl Client where backend, executor, import_notification_sinks: Mutex::new(Vec::new()), + import_lock: Mutex::new(()), + importing_block: RwLock::new(None), }) } @@ -288,13 +292,32 @@ impl Client where header: JustifiedHeader, body: Option, ) -> error::Result { - // TODO: import lock - // TODO: import justification. let (header, justification) = header.into_inner(); match self.backend.blockchain().status(BlockId::Hash(header.parent_hash))? { - blockchain::BlockStatus::InChain => (), + blockchain::BlockStatus::InChain => {}, blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent), } + let hash: block::HeaderHash = header.blake2_256().into(); + + let _import_lock = self.import_lock.lock(); + *self.importing_block.write() = Some(hash); + let result = self.execute_and_import_block(origin, hash, header, justification, body); + *self.importing_block.write() = None; + result + } + + fn execute_and_import_block( + &self, + origin: BlockOrigin, + hash: HeaderHash, + header: block::Header, + justification: bft::Justification, + body: Option, + ) -> error::Result { + match self.backend.blockchain().status(BlockId::Hash(hash))? { + blockchain::BlockStatus::InChain => return Ok(ImportResult::AlreadyInChain), + blockchain::BlockStatus::Unknown => {}, + } let mut transaction = self.backend.begin_operation(BlockId::Hash(header.parent_hash))?; let mut overlay = OverlayedChanges::default(); @@ -308,12 +331,10 @@ impl Client where )?; let is_new_best = header.number == self.backend.blockchain().info()?.best_number + 1; - let hash: block::HeaderHash = header.blake2_256().into(); trace!("Imported {}, (#{}), best={}, origin={:?}", hash, header.number, is_new_best, origin); transaction.set_block_data(header.clone(), body, Some(justification.uncheck().into()), is_new_best)?; transaction.update_storage(storage_update)?; self.backend.commit_operation(transaction)?; - if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast { let notification = BlockImportNotification { hash: hash, @@ -323,7 +344,6 @@ impl Client where }; self.import_notification_sinks.lock().retain(|sink| !sink.unbounded_send(notification.clone()).is_err()); } - Ok(ImportResult::Queued) } @@ -340,6 +360,11 @@ impl Client where /// Get block status. pub fn block_status(&self, id: &BlockId) -> error::Result { // TODO: more efficient implementation + if let BlockId::Hash(ref h) = id { + if self.importing_block.read().as_ref().map_or(false, |importing| h == importing) { + return Ok(BlockStatus::Queued); + } + } match self.backend.blockchain().header(*id).map_err(|e| error::Error::from_blockchain(Box::new(e)))?.is_some() { true => Ok(BlockStatus::InChain), false => Ok(BlockStatus::Unknown), diff --git a/substrate/network/src/consensus.rs b/substrate/network/src/consensus.rs index 5c4689384deb3..1d170ba9cb39f 100644 --- a/substrate/network/src/consensus.rs +++ b/substrate/network/src/consensus.rs @@ -22,12 +22,12 @@ use std::time::{Instant, Duration}; use io::SyncIo; use protocol::Protocol; use network::PeerId; -use primitives::{Hash, block::HeaderHash, block::Id as BlockId, block::Header}; +use primitives::{Hash, block::Id as BlockId, block::Header}; use message::{self, Message}; use runtime_support::Hashable; // TODO: Add additional spam/DoS attack protection. -const MESSAGE_LIFETIME_SECONDS: u64 = 600; +const MESSAGE_LIFETIME: Duration = Duration::from_secs(600); struct CandidateRequest { id: message::RequestId, @@ -48,12 +48,11 @@ pub struct Consensus { bft_message_sink: Option<(mpsc::UnboundedSender, Hash)>, messages: Vec<(Hash, Instant, message::Message)>, message_hashes: HashSet, - last_block_hash: HeaderHash, } impl Consensus { /// Create a new instance. - pub fn new(best_hash: HeaderHash) -> Consensus { + pub fn new() -> Consensus { Consensus { peers: HashMap::new(), our_candidate: None, @@ -61,7 +60,6 @@ impl Consensus { bft_message_sink: None, messages: Default::default(), message_hashes: Default::default(), - last_block_hash: best_hash, } } @@ -283,37 +281,25 @@ impl Consensus { self.peers.remove(&peer_id); } - pub fn collect_garbage(&mut self, best_hash_and_header: Option<(HeaderHash, &Header)>) { + pub fn collect_garbage(&mut self, best_header: Option<&Header>) { let hashes = &mut self.message_hashes; - let last_block_hash = &mut self.last_block_hash; let before = self.messages.len(); - let (best_hash, best_header) = best_hash_and_header.map(|(h, header)| (Some(h), Some(header))).unwrap_or((None, None)); - if best_header.as_ref().map_or(false, |header| header.parent_hash != *last_block_hash) { - trace!(target:"sync", "Clearing conensus message cache"); - self.messages.clear(); - hashes.clear(); - } else { - let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS); - let now = Instant::now(); - if let Some(hash) = best_hash { - *last_block_hash = hash; - } - self.messages.retain(|&(ref hash, timestamp, ref message)| { - timestamp < now + expiration || - best_header.map_or(true, |header| { - if match *message { - Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash, - Message::Statement(ref msg) => msg.parent_hash != header.parent_hash, - _ => true, - } { - hashes.remove(hash); - true - } else { - false - } + let now = Instant::now(); + self.messages.retain(|&(ref hash, timestamp, ref message)| { + if timestamp >= now - MESSAGE_LIFETIME && + best_header.map_or(true, |header| + match *message { + Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash, + Message::Statement(ref msg) => msg.parent_hash != header.parent_hash, + _ => true, }) - }); - } + { + true + } else { + hashes.remove(hash); + false + } + }); if self.messages.len() != before { trace!(target:"sync", "Cleaned up {} stale messages", before - self.messages.len()); } @@ -322,3 +308,68 @@ impl Consensus { } } } + +#[cfg(test)] +mod tests { + use primitives::Hash; + use primitives::bft::Justification; + use primitives::block::{HeaderHash, Header}; + use std::time::Instant; + use message::{self, Message}; + use super::{Consensus, MESSAGE_LIFETIME}; + + #[test] + fn collects_garbage() { + let prev_hash = HeaderHash::random(); + let best_hash = HeaderHash::random(); + let mut consensus = Consensus::new(); + let now = Instant::now(); + let m1_hash = Hash::random(); + let m2_hash = Hash::random(); + let m1 = Message::BftMessage(message::LocalizedBftMessage { + parent_hash: prev_hash, + message: message::BftMessage::Auxiliary(Justification { + round_number: 0, + hash: Default::default(), + signatures: Default::default(), + }), + }); + let m2 = Message::BftMessage(message::LocalizedBftMessage { + parent_hash: best_hash, + message: message::BftMessage::Auxiliary(Justification { + round_number: 0, + hash: Default::default(), + signatures: Default::default(), + }), + }); + consensus.messages.push((m1_hash, now, m1)); + consensus.messages.push((m2_hash, now, m2.clone())); + consensus.message_hashes.insert(m1_hash); + consensus.message_hashes.insert(m2_hash); + + // nothing to collect + consensus.collect_garbage(None); + assert_eq!(consensus.messages.len(), 2); + assert_eq!(consensus.message_hashes.len(), 2); + + // random header, nothing should be cleared + let mut header = Header::from_block_number(0); + consensus.collect_garbage(Some(&header)); + assert_eq!(consensus.messages.len(), 2); + assert_eq!(consensus.message_hashes.len(), 2); + + // header that matches one of the messages + header.parent_hash = prev_hash; + consensus.collect_garbage(Some(&header)); + assert_eq!(consensus.messages.len(), 1); + assert_eq!(consensus.message_hashes.len(), 1); + assert!(consensus.message_hashes.contains(&m2_hash)); + + // make timestamp expired + consensus.messages.clear(); + consensus.messages.push((m2_hash, now - MESSAGE_LIFETIME, m2)); + consensus.collect_garbage(None); + assert!(consensus.messages.is_empty()); + assert!(consensus.message_hashes.is_empty()); + } +} diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index b2b87e80ff553..a727ffcc04625 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -114,13 +114,13 @@ impl Protocol { /// Create a new instance. pub fn new(config: ProtocolConfig, chain: Arc, transaction_pool: Arc) -> error::Result { let info = chain.info()?; - let best_hash = info.chain.best_hash; + let sync = ChainSync::new(&info); let protocol = Protocol { config: config, chain: chain, genesis_hash: info.chain.genesis_hash, - sync: RwLock::new(ChainSync::new(&info)), - consensus: Mutex::new(Consensus::new(best_hash)), + sync: RwLock::new(sync), + consensus: Mutex::new(Consensus::new()), peers: RwLock::new(HashMap::new()), handshaking_peers: RwLock::new(HashMap::new()), transaction_pool: transaction_pool, @@ -520,7 +520,7 @@ impl Protocol { } } - self.consensus.lock().collect_garbage(Some((hash, &header))); + self.consensus.lock().collect_garbage(Some(&header)); } pub fn transactions_stats(&self) -> BTreeMap {