From 38108909a7f1c2e83555181223b6ff2d784c2567 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Thu, 19 Jan 2023 16:13:16 +0100 Subject: [PATCH] Notification-based block pinning (#13157) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Worker * Reorganize and unpin onnotification drop * Pin in state-db, pass block number * Pin blocks in blockchain db * Switch to reference counted LRU * Disable pinning when we keep all blocks * Fix pinning hint for state-db * Remove pinning from backend layer * Improve readability * Add justifications to test * Fix justification behaviour * Remove debug prints * Convert channels to tracing_unbounded * Add comments to the test * Documentation and Cleanup * Move task start to client * Simplify cache * Improve test, remove unwanted log * Add tracing logs, remove expect for block number * Cleanup * Add conversion method for unpin handle to Finalitynotification * Revert unwanted changes * Improve naming * Make clippy happy * Fix docs Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> * Use `NumberFor` instead of u64 in API * Hand over weak reference to unpin worker task * Unwanted * &Hash -> Hash * Remove number from interface, rename `_unpin_handle`, LOG_TARGET * Move RwLock one layer up * Apply code style suggestions * Improve comments * Replace lru crate by schnellru * Only insert values for pinned items + better docs * Apply suggestions from code review Co-authored-by: Bastian Köcher * Improve comments, log target and test Co-authored-by: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Co-authored-by: Bastian Köcher --- Cargo.lock | 52 +- client/api/src/backend.rs | 22 +- client/api/src/client.rs | 109 +++- client/api/src/in_mem.rs | 6 + client/db/Cargo.toml | 1 + client/db/src/lib.rs | 531 ++++++++++++++++-- client/db/src/pinned_blocks_cache.rs | 231 ++++++++ client/finality-grandpa/src/until_imported.rs | 13 +- client/service/src/builder.rs | 4 +- client/service/src/client/client.rs | 77 ++- test-utils/client/src/lib.rs | 7 +- 11 files changed, 954 insertions(+), 99 deletions(-) create mode 100644 client/db/src/pinned_blocks_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 446638277202a..100e0d2152096 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -137,6 +137,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "ahash" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf6ccdb167abbf410dcb915cabd428929d7f6a04980b54a11f26a39f1c7f7107" +dependencies = [ + "cfg-if", + "getrandom 0.2.8", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.20" @@ -1836,7 +1848,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c24f403d068ad0b359e577a77f92392118be3f3c927538f2bb544a5ecd828c6" dependencies = [ "curve25519-dalek 3.2.0", - "hashbrown", + "hashbrown 0.12.3", "hex", "rand_core 0.6.4", "sha2 0.9.9", @@ -2806,9 +2818,15 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ - "ahash", + "ahash 0.7.6", ] +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "heck" version = "0.4.0" @@ -3122,7 +3140,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", "serde", ] @@ -4172,7 +4190,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6e8aaa3f231bb4bd57b84b2d5dc3ae7f350265df8aa96492e0bc394a1571909" dependencies = [ - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -4310,7 +4328,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e0c7cba9ce19ac7ffd2053ac9f49843bbd3f4318feedfd74e85c19d5fb0ba66" dependencies = [ "hash-db", - "hashbrown", + "hashbrown 0.12.3", ] [[package]] @@ -5085,7 +5103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21158b2c33aa6d4561f1c0a6ea283ca92bc54802a93b263e910746d679a7eb53" dependencies = [ "crc32fast", - "hashbrown", + "hashbrown 0.12.3", "indexmap", "memchr", ] @@ -7991,6 +8009,7 @@ dependencies = [ "rand 0.8.5", "sc-client-api", "sc-state-db", + "schnellru", "sp-arithmetic", "sp-blockchain", "sp-core", @@ -8334,7 +8353,7 @@ dependencies = [ name = "sc-finality-grandpa" version = "0.10.0-dev" dependencies = [ - "ahash", + "ahash 0.7.6", "array-bytes", "assert_matches", "async-trait", @@ -8534,7 +8553,7 @@ dependencies = [ name = "sc-network-gossip" version = "0.10.0-dev" dependencies = [ - "ahash", + "ahash 0.7.6", "futures", "futures-timer", "libp2p", @@ -9132,6 +9151,17 @@ dependencies = [ "windows-sys 0.36.1", ] +[[package]] +name = "schnellru" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "772575a524feeb803e5b0fcbc6dd9f367e579488197c94c6e4023aad2305774d" +dependencies = [ + "ahash 0.8.2", + "cfg-if", + "hashbrown 0.13.2", +] + [[package]] name = "schnorrkel" version = "0.9.1" @@ -10275,11 +10305,11 @@ dependencies = [ name = "sp-trie" version = "7.0.0" dependencies = [ - "ahash", + "ahash 0.7.6", "array-bytes", "criterion", "hash-db", - "hashbrown", + "hashbrown 0.12.3", "lazy_static", "lru", "memory-db", @@ -11279,7 +11309,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "004e1e8f92535694b4cb1444dc5a8073ecf0815e3357f729638b9f8fc4062908" dependencies = [ "hash-db", - "hashbrown", + "hashbrown 0.12.3", "log", "rustc-hex", "smallvec", diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index 21d213ffb15cf..4ef609bdd4525 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -436,12 +436,24 @@ pub trait StorageProvider> { /// /// Manages the data layer. /// -/// Note on state pruning: while an object from `state_at` is alive, the state +/// # State Pruning +/// +/// While an object from `state_at` is alive, the state /// should not be pruned. The backend should internally reference-count /// its state objects. /// /// The same applies for live `BlockImportOperation`s: while an import operation building on a /// parent `P` is alive, the state for `P` should not be pruned. +/// +/// # Block Pruning +/// +/// Users can pin blocks in memory by calling `pin_block`. When +/// a block would be pruned, its value is kept in an in-memory cache +/// until it is unpinned via `unpin_block`. +/// +/// While a block is pinned, its state is also preserved. +/// +/// The backend should internally reference count the number of pin / unpin calls. pub trait Backend: AuxStore + Send + Sync { /// Associated block insertion operation type. type BlockImportOperation: BlockImportOperation; @@ -502,6 +514,14 @@ pub trait Backend: AuxStore + Send + Sync { /// Returns a handle to offchain storage. fn offchain_storage(&self) -> Option; + /// Pin the block to keep body, justification and state available after pruning. + /// Number of pins are reference counted. Users need to make sure to perform + /// one call to [`Self::unpin_block`] per call to [`Self::pin_block`]. + fn pin_block(&self, hash: Block::Hash) -> sp_blockchain::Result<()>; + + /// Unpin the block to allow pruning. + fn unpin_block(&self, hash: Block::Hash); + /// Returns true if state for given block is available. fn have_state_at(&self, hash: Block::Hash, _number: NumberFor) -> bool { self.state_at(hash).is_ok() diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 0d00257fa7b06..8e7ceb68704b2 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -30,7 +30,7 @@ use std::{collections::HashSet, fmt, sync::Arc}; use crate::{blockchain::Info, notifications::StorageEventStream, FinalizeSummary, ImportSummary}; use sc_transaction_pool_api::ChainEvent; -use sc_utils::mpsc::TracingUnboundedReceiver; +use sc_utils::mpsc::{TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain; /// Type that implements `futures::Stream` of block import events. @@ -264,6 +264,53 @@ impl fmt::Display for UsageInfo { } } +/// Sends a message to the pinning-worker once dropped to unpin a block in the backend. +#[derive(Debug)] +pub struct UnpinHandleInner { + /// Hash of the block pinned by this handle + hash: Block::Hash, + unpin_worker_sender: TracingUnboundedSender, +} + +impl UnpinHandleInner { + /// Create a new [`UnpinHandleInner`] + pub fn new( + hash: Block::Hash, + unpin_worker_sender: TracingUnboundedSender, + ) -> Self { + Self { hash, unpin_worker_sender } + } +} + +impl Drop for UnpinHandleInner { + fn drop(&mut self) { + if let Err(err) = self.unpin_worker_sender.unbounded_send(self.hash) { + log::debug!(target: "db", "Unable to unpin block with hash: {}, error: {:?}", self.hash, err); + }; + } +} + +/// Keeps a specific block pinned while the handle is alive. +/// Once the last handle instance for a given block is dropped, the +/// block is unpinned in the [`Backend`](crate::backend::Backend::unpin_block). +#[derive(Debug, Clone)] +pub struct UnpinHandle(Arc>); + +impl UnpinHandle { + /// Create a new [`UnpinHandle`] + pub fn new( + hash: Block::Hash, + unpin_worker_sender: TracingUnboundedSender, + ) -> UnpinHandle { + UnpinHandle(Arc::new(UnpinHandleInner::new(hash, unpin_worker_sender))) + } + + /// Hash of the block this handle is unpinning on drop + pub fn hash(&self) -> Block::Hash { + self.0.hash + } +} + /// Summary of an imported block #[derive(Clone, Debug)] pub struct BlockImportNotification { @@ -279,6 +326,36 @@ pub struct BlockImportNotification { /// /// If `None`, there was no re-org while importing. pub tree_route: Option>>, + /// Handle to unpin the block this notification is for + unpin_handle: UnpinHandle, +} + +impl BlockImportNotification { + /// Create new notification + pub fn new( + hash: Block::Hash, + origin: BlockOrigin, + header: Block::Header, + is_new_best: bool, + tree_route: Option>>, + unpin_worker_sender: TracingUnboundedSender, + ) -> Self { + Self { + hash, + origin, + header, + is_new_best, + tree_route, + unpin_handle: UnpinHandle::new(hash, unpin_worker_sender), + } + } + + /// Consume this notification and extract the unpin handle. + /// + /// Note: Only use this if you want to keep the block pinned in the backend. + pub fn into_unpin_handle(self) -> UnpinHandle { + self.unpin_handle + } } /// Summary of a finalized block. @@ -294,6 +371,8 @@ pub struct FinalityNotification { pub tree_route: Arc<[Block::Hash]>, /// Stale branches heads. pub stale_heads: Arc<[Block::Hash]>, + /// Handle to unpin the block this notification is for + unpin_handle: UnpinHandle, } impl TryFrom> for ChainEvent { @@ -314,26 +393,44 @@ impl From> for ChainEvent { } } -impl From> for FinalityNotification { - fn from(mut summary: FinalizeSummary) -> Self { +impl FinalityNotification { + /// Create finality notification from finality summary. + pub fn from_summary( + mut summary: FinalizeSummary, + unpin_worker_sender: TracingUnboundedSender, + ) -> FinalityNotification { let hash = summary.finalized.pop().unwrap_or_default(); FinalityNotification { hash, header: summary.header, tree_route: Arc::from(summary.finalized), stale_heads: Arc::from(summary.stale_heads), + unpin_handle: UnpinHandle::new(hash, unpin_worker_sender), } } + + /// Consume this notification and extract the unpin handle. + /// + /// Note: Only use this if you want to keep the block pinned in the backend. + pub fn into_unpin_handle(self) -> UnpinHandle { + self.unpin_handle + } } -impl From> for BlockImportNotification { - fn from(summary: ImportSummary) -> Self { +impl BlockImportNotification { + /// Create finality notification from finality summary. + pub fn from_summary( + summary: ImportSummary, + unpin_worker_sender: TracingUnboundedSender, + ) -> BlockImportNotification { + let hash = summary.hash; BlockImportNotification { - hash: summary.hash, + hash, origin: summary.origin, header: summary.header, is_new_best: summary.is_new_best, tree_route: summary.tree_route.map(Arc::new), + unpin_handle: UnpinHandle::new(hash, unpin_worker_sender), } } } diff --git a/client/api/src/in_mem.rs b/client/api/src/in_mem.rs index 144aa352f5533..5e82757e7d9cd 100644 --- a/client/api/src/in_mem.rs +++ b/client/api/src/in_mem.rs @@ -788,6 +788,12 @@ where fn requires_full_sync(&self) -> bool { false } + + fn pin_block(&self, _: ::Hash) -> blockchain::Result<()> { + Ok(()) + } + + fn unpin_block(&self, _: ::Hash) {} } impl backend::LocalBackend for Backend where Block::Hash: Ord {} diff --git a/client/db/Cargo.toml b/client/db/Cargo.toml index 562a94f1900d6..2d7291a014d6e 100644 --- a/client/db/Cargo.toml +++ b/client/db/Cargo.toml @@ -26,6 +26,7 @@ parity-db = "0.4.2" parking_lot = "0.12.1" sc-client-api = { version = "4.0.0-dev", path = "../api" } sc-state-db = { version = "0.10.0-dev", path = "../state-db" } +schnellru = "0.2.1" sp-arithmetic = { version = "6.0.0", path = "../../primitives/arithmetic" } sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } sp-core = { version = "7.0.0", path = "../../primitives/core" } diff --git a/client/db/src/lib.rs b/client/db/src/lib.rs index 99fa786e04036..09ccfef1cc28b 100644 --- a/client/db/src/lib.rs +++ b/client/db/src/lib.rs @@ -34,6 +34,7 @@ pub mod bench; mod children; mod parity_db; +mod pinned_blocks_cache; mod record_stats_state; mod stats; #[cfg(any(feature = "rocksdb", test))] @@ -51,6 +52,7 @@ use std::{ }; use crate::{ + pinned_blocks_cache::PinnedBlocksCache, record_stats_state::RecordStatsState, stats::StateUsageStats, utils::{meta_keys, read_db, read_meta, DatabaseType, Meta}, @@ -481,6 +483,7 @@ pub struct BlockchainDb { leaves: RwLock>>, header_metadata_cache: Arc>, header_cache: Mutex>>, + pinned_blocks_cache: Arc>>, } impl BlockchainDb { @@ -493,6 +496,7 @@ impl BlockchainDb { meta: Arc::new(RwLock::new(meta)), header_metadata_cache: Arc::new(HeaderMetadataCache::default()), header_cache: Default::default(), + pinned_blocks_cache: Arc::new(RwLock::new(PinnedBlocksCache::new())), }) } @@ -521,62 +525,83 @@ impl BlockchainDb { let mut meta = self.meta.write(); meta.block_gap = gap; } -} -impl sc_client_api::blockchain::HeaderBackend for BlockchainDb { - fn header(&self, hash: Block::Hash) -> ClientResult> { - let mut cache = self.header_cache.lock(); - if let Some(result) = cache.get_refresh(&hash) { - return Ok(result.clone()) + /// Empty the cache of pinned items. + fn clear_pinning_cache(&self) { + self.pinned_blocks_cache.write().clear(); + } + + /// Load a justification into the cache of pinned items. + /// Reference count of the item will not be increased. Use this + /// to load values for items into the cache which have already been pinned. + fn insert_justifications_if_pinned(&self, hash: Block::Hash, justification: Justification) { + let mut cache = self.pinned_blocks_cache.write(); + if !cache.contains(hash) { + return } - let header = utils::read_header( - &*self.db, - columns::KEY_LOOKUP, - columns::HEADER, - BlockId::::Hash(hash), - )?; - cache_header(&mut cache, hash, header.clone()); - Ok(header) + + let justifications = Justifications::from(justification); + cache.insert_justifications(hash, Some(justifications)); } - fn info(&self) -> sc_client_api::blockchain::Info { - let meta = self.meta.read(); - sc_client_api::blockchain::Info { - best_hash: meta.best_hash, - best_number: meta.best_number, - genesis_hash: meta.genesis_hash, - finalized_hash: meta.finalized_hash, - finalized_number: meta.finalized_number, - finalized_state: meta.finalized_state, - number_leaves: self.leaves.read().count(), - block_gap: meta.block_gap, + /// Load a justification from the db into the cache of pinned items. + /// Reference count of the item will not be increased. Use this + /// to load values for items into the cache which have already been pinned. + fn insert_persisted_justifications_if_pinned(&self, hash: Block::Hash) -> ClientResult<()> { + let mut cache = self.pinned_blocks_cache.write(); + if !cache.contains(hash) { + return Ok(()) } + + let justifications = self.justifications_uncached(hash)?; + cache.insert_justifications(hash, justifications); + Ok(()) } - fn status(&self, hash: Block::Hash) -> ClientResult { - match self.header(hash)?.is_some() { - true => Ok(sc_client_api::blockchain::BlockStatus::InChain), - false => Ok(sc_client_api::blockchain::BlockStatus::Unknown), + /// Load a block body from the db into the cache of pinned items. + /// Reference count of the item will not be increased. Use this + /// to load values for items items into the cache which have already been pinned. + fn insert_persisted_body_if_pinned(&self, hash: Block::Hash) -> ClientResult<()> { + let mut cache = self.pinned_blocks_cache.write(); + if !cache.contains(hash) { + return Ok(()) } + + let body = self.body_uncached(hash)?; + cache.insert_body(hash, body); + Ok(()) } - fn number(&self, hash: Block::Hash) -> ClientResult>> { - Ok(self.header_metadata(hash).ok().map(|header_metadata| header_metadata.number)) + /// Bump reference count for pinned item. + fn bump_ref(&self, hash: Block::Hash) { + self.pinned_blocks_cache.write().pin(hash); } - fn hash(&self, number: NumberFor) -> ClientResult> { - Ok(utils::read_header::( + /// Decrease reference count for pinned item and remove if reference count is 0. + fn unpin(&self, hash: Block::Hash) { + self.pinned_blocks_cache.write().unpin(hash); + } + + fn justifications_uncached(&self, hash: Block::Hash) -> ClientResult> { + match read_db( &*self.db, columns::KEY_LOOKUP, - columns::HEADER, - BlockId::Number(number), - )? - .map(|header| header.hash())) + columns::JUSTIFICATIONS, + BlockId::::Hash(hash), + )? { + Some(justifications) => match Decode::decode(&mut &justifications[..]) { + Ok(justifications) => Ok(Some(justifications)), + Err(err) => + return Err(sp_blockchain::Error::Backend(format!( + "Error decoding justifications: {}", + err + ))), + }, + None => Ok(None), + } } -} -impl sc_client_api::blockchain::Backend for BlockchainDb { - fn body(&self, hash: Block::Hash) -> ClientResult>> { + fn body_uncached(&self, hash: Block::Hash) -> ClientResult>> { if let Some(body) = read_db(&*self.db, columns::KEY_LOOKUP, columns::BODY, BlockId::Hash::(hash))? { @@ -640,24 +665,77 @@ impl sc_client_api::blockchain::Backend for BlockchainDb ClientResult> { - match read_db( +impl sc_client_api::blockchain::HeaderBackend for BlockchainDb { + fn header(&self, hash: Block::Hash) -> ClientResult> { + let mut cache = self.header_cache.lock(); + if let Some(result) = cache.get_refresh(&hash) { + return Ok(result.clone()) + } + let header = utils::read_header( &*self.db, columns::KEY_LOOKUP, - columns::JUSTIFICATIONS, + columns::HEADER, BlockId::::Hash(hash), - )? { - Some(justifications) => match Decode::decode(&mut &justifications[..]) { - Ok(justifications) => Ok(Some(justifications)), - Err(err) => - return Err(sp_blockchain::Error::Backend(format!( - "Error decoding justifications: {}", - err - ))), - }, - None => Ok(None), + )?; + cache_header(&mut cache, hash, header.clone()); + Ok(header) + } + + fn info(&self) -> sc_client_api::blockchain::Info { + let meta = self.meta.read(); + sc_client_api::blockchain::Info { + best_hash: meta.best_hash, + best_number: meta.best_number, + genesis_hash: meta.genesis_hash, + finalized_hash: meta.finalized_hash, + finalized_number: meta.finalized_number, + finalized_state: meta.finalized_state, + number_leaves: self.leaves.read().count(), + block_gap: meta.block_gap, + } + } + + fn status(&self, hash: Block::Hash) -> ClientResult { + match self.header(hash)?.is_some() { + true => Ok(sc_client_api::blockchain::BlockStatus::InChain), + false => Ok(sc_client_api::blockchain::BlockStatus::Unknown), + } + } + + fn number(&self, hash: Block::Hash) -> ClientResult>> { + Ok(self.header_metadata(hash).ok().map(|header_metadata| header_metadata.number)) + } + + fn hash(&self, number: NumberFor) -> ClientResult> { + Ok(utils::read_header::( + &*self.db, + columns::KEY_LOOKUP, + columns::HEADER, + BlockId::Number(number), + )? + .map(|header| header.hash())) + } +} + +impl sc_client_api::blockchain::Backend for BlockchainDb { + fn body(&self, hash: Block::Hash) -> ClientResult>> { + let cache = self.pinned_blocks_cache.read(); + if let Some(result) = cache.body(&hash) { + return Ok(result.clone()) + } + + self.body_uncached(hash) + } + + fn justifications(&self, hash: Block::Hash) -> ClientResult> { + let cache = self.pinned_blocks_cache.read(); + if let Some(result) = cache.justifications(&hash) { + return Ok(result.clone()) } + + self.justifications_uncached(hash) } fn last_finalized(&self) -> ClientResult { @@ -1291,20 +1369,28 @@ impl Backend { header: &Block::Header, last_finalized: Option, justification: Option, + current_transaction_justifications: &mut HashMap, ) -> ClientResult> { // TODO: ensure best chain contains this block. let number = *header.number(); self.ensure_sequential_finalization(header, last_finalized)?; let with_state = sc_client_api::Backend::have_state_at(self, hash, number); - self.note_finalized(transaction, header, hash, with_state)?; + self.note_finalized( + transaction, + header, + hash, + with_state, + current_transaction_justifications, + )?; if let Some(justification) = justification { transaction.set_from_vec( columns::JUSTIFICATIONS, &utils::number_and_hash_to_lookup_key(number, hash)?, - Justifications::from(justification).encode(), + Justifications::from(justification.clone()).encode(), ); + current_transaction_justifications.insert(hash, justification); } Ok(MetaUpdate { hash, number, is_best: false, is_finalized: true, with_state }) } @@ -1371,6 +1457,8 @@ impl Backend { (meta.best_number, meta.finalized_hash, meta.finalized_number, meta.block_gap) }; + let mut current_transaction_justifications: HashMap = + HashMap::new(); for (block_hash, justification) in operation.finalized_blocks { let block_header = self.blockchain.expect_header(block_hash)?; meta_updates.push(self.finalize_block_with_transaction( @@ -1379,6 +1467,7 @@ impl Backend { &block_header, Some(last_finalized_hash), justification, + &mut current_transaction_justifications, )?); last_finalized_hash = block_hash; last_finalized_num = *block_header.number(); @@ -1551,7 +1640,14 @@ impl Backend { if finalized { // TODO: ensure best chain contains this block. self.ensure_sequential_finalization(header, Some(last_finalized_hash))?; - self.note_finalized(&mut transaction, header, hash, operation.commit_state)?; + let mut current_transaction_justifications = HashMap::new(); + self.note_finalized( + &mut transaction, + header, + hash, + operation.commit_state, + &mut current_transaction_justifications, + )?; } else { // canonicalize blocks which are old enough, regardless of finality. self.force_delayed_canonicalize(&mut transaction)? @@ -1684,6 +1780,7 @@ impl Backend { f_header: &Block::Header, f_hash: Block::Hash, with_state: bool, + current_transaction_justifications: &mut HashMap, ) -> ClientResult<()> { let f_num = *f_header.number(); @@ -1709,7 +1806,7 @@ impl Backend { } let new_displaced = self.blockchain.leaves.write().finalize_height(f_num); - self.prune_blocks(transaction, f_num, &new_displaced)?; + self.prune_blocks(transaction, f_num, &new_displaced, current_transaction_justifications)?; Ok(()) } @@ -1717,22 +1814,39 @@ impl Backend { fn prune_blocks( &self, transaction: &mut Transaction, - finalized: NumberFor, + finalized_number: NumberFor, displaced: &FinalizationOutcome>, + current_transaction_justifications: &mut HashMap, ) -> ClientResult<()> { match self.blocks_pruning { BlocksPruning::KeepAll => {}, BlocksPruning::Some(blocks_pruning) => { // Always keep the last finalized block let keep = std::cmp::max(blocks_pruning, 1); - if finalized >= keep.into() { - let number = finalized.saturating_sub(keep.into()); + if finalized_number >= keep.into() { + let number = finalized_number.saturating_sub(keep.into()); + + // Before we prune a block, check if it is pinned + if let Some(hash) = self.blockchain.hash(number)? { + self.blockchain.insert_persisted_body_if_pinned(hash)?; + + // If the block was finalized in this transaction, it will not be in the db + // yet. + if let Some(justification) = + current_transaction_justifications.remove(&hash) + { + self.blockchain.insert_justifications_if_pinned(hash, justification); + } else { + self.blockchain.insert_persisted_justifications_if_pinned(hash)?; + } + }; + self.prune_block(transaction, BlockId::::number(number))?; } - self.prune_displaced_branches(transaction, finalized, displaced)?; + self.prune_displaced_branches(transaction, finalized_number, displaced)?; }, BlocksPruning::KeepFinalized => { - self.prune_displaced_branches(transaction, finalized, displaced)?; + self.prune_displaced_branches(transaction, finalized_number, displaced)?; }, } Ok(()) @@ -1755,6 +1869,8 @@ impl Backend { while self.blockchain.hash(number)? != Some(hash) { match self.blockchain.header(hash)? { Some(header) => { + self.blockchain.insert_persisted_body_if_pinned(hash)?; + self.prune_block(transaction, BlockId::::hash(hash))?; number = header.number().saturating_sub(One::one()); hash = *header.parent_hash(); @@ -1985,6 +2101,7 @@ impl sc_client_api::backend::Backend for Backend { .state_db .reset(state_meta_db) .map_err(sp_blockchain::Error::from_state_db)?; + self.blockchain.clear_pinning_cache(); Err(e) } else { self.storage.state_db.sync(); @@ -2000,12 +2117,14 @@ impl sc_client_api::backend::Backend for Backend { let mut transaction = Transaction::new(); let header = self.blockchain.expect_header(hash)?; + let mut current_transaction_justifications = HashMap::new(); let m = self.finalize_block_with_transaction( &mut transaction, hash, &header, None, justification, + &mut current_transaction_justifications, )?; self.storage.db.commit(transaction)?; @@ -2382,6 +2501,49 @@ impl sc_client_api::backend::Backend for Backend { PruningMode::ArchiveAll | PruningMode::ArchiveCanonical ) } + + fn pin_block(&self, hash: ::Hash) -> sp_blockchain::Result<()> { + let hint = || { + let header_metadata = self.blockchain.header_metadata(hash); + header_metadata + .map(|hdr| { + sc_state_db::NodeDb::get(self.storage.as_ref(), hdr.state_root.as_ref()) + .unwrap_or(None) + .is_some() + }) + .unwrap_or(false) + }; + + if let Some(number) = self.blockchain.number(hash)? { + self.storage.state_db.pin(&hash, number.saturated_into::(), hint).map_err( + |_| { + sp_blockchain::Error::UnknownBlock(format!( + "State already discarded for `{:?}`", + hash + )) + }, + )?; + } else { + return Err(ClientError::UnknownBlock(format!( + "Can not pin block with hash `{:?}`. Block not found.", + hash + ))) + } + + if self.blocks_pruning != BlocksPruning::KeepAll { + // Only increase reference count for this hash. Value is loaded once we prune. + self.blockchain.bump_ref(hash); + } + Ok(()) + } + + fn unpin_block(&self, hash: ::Hash) { + self.storage.state_db.unpin(&hash); + + if self.blocks_pruning != BlocksPruning::KeepAll { + self.blockchain.unpin(hash); + } + } } impl sc_client_api::backend::LocalBackend for Backend {} @@ -4009,4 +4171,249 @@ pub(crate) mod tests { assert_eq!(block4, backend.blockchain().hash(4).unwrap().unwrap()); } } + + #[test] + fn test_pinned_blocks_on_finalize() { + let backend = Backend::::new_test_with_tx_storage(BlocksPruning::Some(1), 10); + let mut blocks = Vec::new(); + let mut prev_hash = Default::default(); + + let build_justification = |i: u64| ([0, 0, 0, 0], vec![i.try_into().unwrap()]); + // Block tree: + // 0 -> 1 -> 2 -> 3 -> 4 + for i in 0..5 { + let hash = insert_block( + &backend, + i, + prev_hash, + None, + Default::default(), + vec![i.into()], + None, + ) + .unwrap(); + blocks.push(hash); + // Avoid block pruning. + backend.pin_block(blocks[i as usize]).unwrap(); + + prev_hash = hash; + } + + let bc = backend.blockchain(); + + // Check that we can properly access values when there is reference count + // but no value. + assert_eq!(Some(vec![1.into()]), bc.body(blocks[1]).unwrap()); + + // Block 1 gets pinned three times + backend.pin_block(blocks[1]).unwrap(); + backend.pin_block(blocks[1]).unwrap(); + + // Finalize all blocks. This will trigger pruning. + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, blocks[4]).unwrap(); + for i in 1..5 { + op.mark_finalized(blocks[i], Some(build_justification(i.try_into().unwrap()))) + .unwrap(); + } + backend.commit_operation(op).unwrap(); + + // Block 0, 1, 2, 3 are pinned, so all values should be cached. + // Block 4 is inside the pruning window, its value is in db. + assert_eq!(Some(vec![0.into()]), bc.body(blocks[0]).unwrap()); + + assert_eq!(Some(vec![1.into()]), bc.body(blocks[1]).unwrap()); + assert_eq!( + Some(Justifications::from(build_justification(1))), + bc.justifications(blocks[1]).unwrap() + ); + + assert_eq!(Some(vec![2.into()]), bc.body(blocks[2]).unwrap()); + assert_eq!( + Some(Justifications::from(build_justification(2))), + bc.justifications(blocks[2]).unwrap() + ); + + assert_eq!(Some(vec![3.into()]), bc.body(blocks[3]).unwrap()); + assert_eq!( + Some(Justifications::from(build_justification(3))), + bc.justifications(blocks[3]).unwrap() + ); + + assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap()); + assert_eq!( + Some(Justifications::from(build_justification(4))), + bc.justifications(blocks[4]).unwrap() + ); + + // Unpin all blocks. Values should be removed from cache. + for block in &blocks { + backend.unpin_block(*block); + } + + assert!(bc.body(blocks[0]).unwrap().is_none()); + // Block 1 was pinned twice, we expect it to be still cached + assert!(bc.body(blocks[1]).unwrap().is_some()); + assert!(bc.justifications(blocks[1]).unwrap().is_some()); + // Headers should also be available while pinned + assert!(bc.header(blocks[1]).ok().flatten().is_some()); + assert!(bc.body(blocks[2]).unwrap().is_none()); + assert!(bc.justifications(blocks[2]).unwrap().is_none()); + assert!(bc.body(blocks[3]).unwrap().is_none()); + assert!(bc.justifications(blocks[3]).unwrap().is_none()); + + // After these unpins, block 1 should also be removed + backend.unpin_block(blocks[1]); + assert!(bc.body(blocks[1]).unwrap().is_some()); + assert!(bc.justifications(blocks[1]).unwrap().is_some()); + backend.unpin_block(blocks[1]); + assert!(bc.body(blocks[1]).unwrap().is_none()); + assert!(bc.justifications(blocks[1]).unwrap().is_none()); + + // Block 4 is inside the pruning window and still kept + assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap()); + assert_eq!( + Some(Justifications::from(build_justification(4))), + bc.justifications(blocks[4]).unwrap() + ); + + // Block tree: + // 0 -> 1 -> 2 -> 3 -> 4 -> 5 + let hash = + insert_block(&backend, 5, prev_hash, None, Default::default(), vec![5.into()], None) + .unwrap(); + blocks.push(hash); + + backend.pin_block(blocks[4]).unwrap(); + // Mark block 5 as finalized. + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, blocks[5]).unwrap(); + op.mark_finalized(blocks[5], Some(build_justification(5))).unwrap(); + backend.commit_operation(op).unwrap(); + + assert!(bc.body(blocks[0]).unwrap().is_none()); + assert!(bc.body(blocks[1]).unwrap().is_none()); + assert!(bc.body(blocks[2]).unwrap().is_none()); + assert!(bc.body(blocks[3]).unwrap().is_none()); + + assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap()); + assert_eq!( + Some(Justifications::from(build_justification(4))), + bc.justifications(blocks[4]).unwrap() + ); + assert_eq!(Some(vec![5.into()]), bc.body(blocks[5]).unwrap()); + assert!(bc.header(blocks[5]).ok().flatten().is_some()); + + backend.unpin_block(blocks[4]); + assert!(bc.body(blocks[4]).unwrap().is_none()); + assert!(bc.justifications(blocks[4]).unwrap().is_none()); + + // Append a justification to block 5. + backend.append_justification(blocks[5], ([0, 0, 0, 1], vec![42])).unwrap(); + + let hash = + insert_block(&backend, 6, blocks[5], None, Default::default(), vec![6.into()], None) + .unwrap(); + blocks.push(hash); + + // Pin block 5 so it gets loaded into the cache on prune + backend.pin_block(blocks[5]).unwrap(); + + // Finalize block 6 so block 5 gets pruned. Since it is pinned both justifications should be + // in memory. + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, blocks[6]).unwrap(); + op.mark_finalized(blocks[6], None).unwrap(); + backend.commit_operation(op).unwrap(); + + assert_eq!(Some(vec![5.into()]), bc.body(blocks[5]).unwrap()); + assert!(bc.header(blocks[5]).ok().flatten().is_some()); + let mut expected = Justifications::from(build_justification(5)); + expected.append(([0, 0, 0, 1], vec![42])); + assert_eq!(Some(expected), bc.justifications(blocks[5]).unwrap()); + } + + #[test] + fn test_pinned_blocks_on_finalize_with_fork() { + let backend = Backend::::new_test_with_tx_storage(BlocksPruning::Some(1), 10); + let mut blocks = Vec::new(); + let mut prev_hash = Default::default(); + + // Block tree: + // 0 -> 1 -> 2 -> 3 -> 4 + for i in 0..5 { + let hash = insert_block( + &backend, + i, + prev_hash, + None, + Default::default(), + vec![i.into()], + None, + ) + .unwrap(); + blocks.push(hash); + + // Avoid block pruning. + backend.pin_block(blocks[i as usize]).unwrap(); + + prev_hash = hash; + } + + // Insert a fork at the second block. + // Block tree: + // 0 -> 1 -> 2 -> 3 -> 4 + // \ -> 2 -> 3 + let fork_hash_root = + insert_block(&backend, 2, blocks[1], None, H256::random(), vec![2.into()], None) + .unwrap(); + let fork_hash_3 = insert_block( + &backend, + 3, + fork_hash_root, + None, + H256::random(), + vec![3.into(), 11.into()], + None, + ) + .unwrap(); + + // Do not prune the fork hash. + backend.pin_block(fork_hash_3).unwrap(); + + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, blocks[4]).unwrap(); + op.mark_head(blocks[4]).unwrap(); + backend.commit_operation(op).unwrap(); + + for i in 1..5 { + let mut op = backend.begin_operation().unwrap(); + backend.begin_state_operation(&mut op, blocks[4]).unwrap(); + op.mark_finalized(blocks[i], None).unwrap(); + backend.commit_operation(op).unwrap(); + } + + let bc = backend.blockchain(); + assert_eq!(Some(vec![0.into()]), bc.body(blocks[0]).unwrap()); + assert_eq!(Some(vec![1.into()]), bc.body(blocks[1]).unwrap()); + assert_eq!(Some(vec![2.into()]), bc.body(blocks[2]).unwrap()); + assert_eq!(Some(vec![3.into()]), bc.body(blocks[3]).unwrap()); + assert_eq!(Some(vec![4.into()]), bc.body(blocks[4]).unwrap()); + // Check the fork hashes. + assert_eq!(None, bc.body(fork_hash_root).unwrap()); + assert_eq!(Some(vec![3.into(), 11.into()]), bc.body(fork_hash_3).unwrap()); + + // Unpin all blocks, except the forked one. + for block in &blocks { + backend.unpin_block(*block); + } + assert!(bc.body(blocks[0]).unwrap().is_none()); + assert!(bc.body(blocks[1]).unwrap().is_none()); + assert!(bc.body(blocks[2]).unwrap().is_none()); + assert!(bc.body(blocks[3]).unwrap().is_none()); + + assert!(bc.body(fork_hash_3).unwrap().is_some()); + backend.unpin_block(fork_hash_3); + assert!(bc.body(fork_hash_3).unwrap().is_none()); + } } diff --git a/client/db/src/pinned_blocks_cache.rs b/client/db/src/pinned_blocks_cache.rs new file mode 100644 index 0000000000000..39ff1c5277871 --- /dev/null +++ b/client/db/src/pinned_blocks_cache.rs @@ -0,0 +1,231 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2023 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use schnellru::{Limiter, LruMap}; +use sp_runtime::{traits::Block as BlockT, Justifications}; + +const LOG_TARGET: &str = "db::pin"; +const PINNING_CACHE_SIZE: usize = 1024; + +/// Entry for pinned blocks cache. +struct PinnedBlockCacheEntry { + /// How many times this item has been pinned + ref_count: u32, + + /// Cached justifications for this block + pub justifications: Option>, + + /// Cached body for this block + pub body: Option>>, +} + +impl Default for PinnedBlockCacheEntry { + fn default() -> Self { + Self { ref_count: 0, justifications: None, body: None } + } +} + +impl PinnedBlockCacheEntry { + pub fn decrease_ref(&mut self) { + self.ref_count = self.ref_count.saturating_sub(1); + } + + pub fn increase_ref(&mut self) { + self.ref_count = self.ref_count.saturating_add(1); + } + + pub fn has_no_references(&self) -> bool { + self.ref_count == 0 + } +} + +/// A limiter for a map which is limited by the number of elements. +#[derive(Copy, Clone, Debug)] +struct LoggingByLengthLimiter { + max_length: usize, +} + +impl LoggingByLengthLimiter { + /// Creates a new length limiter with a given `max_length`. + pub const fn new(max_length: usize) -> LoggingByLengthLimiter { + LoggingByLengthLimiter { max_length } + } +} + +impl Limiter> for LoggingByLengthLimiter { + type KeyToInsert<'a> = Block::Hash; + type LinkType = usize; + + fn is_over_the_limit(&self, length: usize) -> bool { + length > self.max_length + } + + fn on_insert( + &mut self, + _length: usize, + key: Self::KeyToInsert<'_>, + value: PinnedBlockCacheEntry, + ) -> Option<(Block::Hash, PinnedBlockCacheEntry)> { + if self.max_length > 0 { + Some((key, value)) + } else { + None + } + } + + fn on_replace( + &mut self, + _length: usize, + _old_key: &mut Block::Hash, + _new_key: Block::Hash, + _old_value: &mut PinnedBlockCacheEntry, + _new_value: &mut PinnedBlockCacheEntry, + ) -> bool { + true + } + + fn on_removed(&mut self, key: &mut Block::Hash, value: &mut PinnedBlockCacheEntry) { + // If reference count was larger than 0 on removal, + // the item was removed due to capacity limitations. + // Since the cache should be large enough for pinned items, + // we want to know about these evictions. + if value.ref_count > 0 { + log::warn!( + target: LOG_TARGET, + "Pinned block cache limit reached. Evicting value. hash = {}", + key + ); + } else { + log::trace!( + target: LOG_TARGET, + "Evicting value from pinned block cache. hash = {}", + key + ) + } + } + + fn on_cleared(&mut self) {} + + fn on_grow(&mut self, _new_memory_usage: usize) -> bool { + true + } +} + +/// Reference counted cache for pinned block bodies and justifications. +pub struct PinnedBlocksCache { + cache: LruMap, LoggingByLengthLimiter>, +} + +impl PinnedBlocksCache { + pub fn new() -> Self { + Self { cache: LruMap::new(LoggingByLengthLimiter::new(PINNING_CACHE_SIZE)) } + } + + /// Increase reference count of an item. + /// Create an entry with empty value in the cache if necessary. + pub fn pin(&mut self, hash: Block::Hash) { + match self.cache.get_or_insert(hash, Default::default) { + Some(entry) => { + entry.increase_ref(); + log::trace!( + target: LOG_TARGET, + "Bumped cache refcount. hash = {}, num_entries = {}", + hash, + self.cache.len() + ); + }, + None => + log::warn!(target: LOG_TARGET, "Unable to bump reference count. hash = {}", hash), + }; + } + + /// Clear the cache + pub fn clear(&mut self) { + self.cache.clear(); + } + + /// Check if item is contained in the cache + pub fn contains(&self, hash: Block::Hash) -> bool { + self.cache.peek(&hash).is_some() + } + + /// Attach body to an existing cache item + pub fn insert_body(&mut self, hash: Block::Hash, extrinsics: Option>) { + match self.cache.peek_mut(&hash) { + Some(mut entry) => { + entry.body = Some(extrinsics); + log::trace!( + target: LOG_TARGET, + "Cached body. hash = {}, num_entries = {}", + hash, + self.cache.len() + ); + }, + None => log::warn!( + target: LOG_TARGET, + "Unable to insert body for uncached item. hash = {}", + hash + ), + } + } + + /// Attach justification to an existing cache item + pub fn insert_justifications( + &mut self, + hash: Block::Hash, + justifications: Option, + ) { + match self.cache.peek_mut(&hash) { + Some(mut entry) => { + entry.justifications = Some(justifications); + log::trace!( + target: LOG_TARGET, + "Cached justification. hash = {}, num_entries = {}", + hash, + self.cache.len() + ); + }, + None => log::warn!( + target: LOG_TARGET, + "Unable to insert justifications for uncached item. hash = {}", + hash + ), + } + } + + /// Decreases reference count of an item. + /// If the count hits 0, the item is removed. + pub fn unpin(&mut self, hash: Block::Hash) { + if let Some(entry) = self.cache.peek_mut(&hash) { + entry.decrease_ref(); + if entry.has_no_references() { + self.cache.remove(&hash); + } + } + } + + /// Get justifications for cached block + pub fn justifications(&self, hash: &Block::Hash) -> Option<&Option> { + self.cache.peek(hash).and_then(|entry| entry.justifications.as_ref()) + } + + /// Get body for cached block + pub fn body(&self, hash: &Block::Hash) -> Option<&Option>> { + self.cache.peek(hash).and_then(|entry| entry.body.as_ref()) + } +} diff --git a/client/finality-grandpa/src/until_imported.rs b/client/finality-grandpa/src/until_imported.rs index 776411f8fb493..3bca77ae8393f 100644 --- a/client/finality-grandpa/src/until_imported.rs +++ b/client/finality-grandpa/src/until_imported.rs @@ -593,16 +593,17 @@ mod tests { fn import_header(&self, header: Header) { let hash = header.hash(); let number = *header.number(); - + let (tx, _rx) = tracing_unbounded("unpin-worker-channel", 10_000); self.known_blocks.lock().insert(hash, number); self.sender - .unbounded_send(BlockImportNotification { + .unbounded_send(BlockImportNotification::::new( hash, - origin: BlockOrigin::File, + BlockOrigin::File, header, - is_new_best: false, - tree_route: None, - }) + false, + None, + tx, + )) .unwrap(); } } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 1f94f96fae89e..0b09f550ce338 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -329,13 +329,15 @@ where let executor = crate::client::LocalCallExecutor::new( backend.clone(), executor, - spawn_handle, + spawn_handle.clone(), config.clone(), execution_extensions, )?; + crate::client::Client::new( backend, executor, + spawn_handle, genesis_block_builder, fork_blocks, bad_blocks, diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 8e10a7b2eda0a..d32baa671f54c 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -22,7 +22,8 @@ use super::{ block_rules::{BlockRules, LookupResult as BlockLookupResult}, genesis::BuildGenesisBlock, }; -use log::{info, trace, warn}; +use futures::{FutureExt, StreamExt}; +use log::{error, info, trace, warn}; use parking_lot::{Mutex, RwLock}; use prometheus_endpoint::Registry; use rand::Rng; @@ -58,9 +59,12 @@ use sp_blockchain::{ use sp_consensus::{BlockOrigin, BlockStatus, Error as ConsensusError}; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedSender}; -use sp_core::storage::{ - well_known_keys, ChildInfo, ChildType, PrefixedStorageKey, Storage, StorageChild, StorageData, - StorageKey, +use sp_core::{ + storage::{ + well_known_keys, ChildInfo, ChildType, PrefixedStorageKey, Storage, StorageChild, + StorageData, StorageKey, + }, + traits::SpawnNamed, }; #[cfg(feature = "test-helpers")] use sp_keystore::SyncCryptoStorePtr; @@ -88,9 +92,7 @@ use std::{ #[cfg(feature = "test-helpers")] use { - super::call_executor::LocalCallExecutor, - sc_client_api::in_mem, - sp_core::traits::{CodeExecutor, SpawnNamed}, + super::call_executor::LocalCallExecutor, sc_client_api::in_mem, sp_core::traits::CodeExecutor, }; type NotificationSinks = Mutex>>; @@ -116,6 +118,7 @@ where block_rules: BlockRules, config: ClientConfig, telemetry: Option, + unpin_worker_sender: TracingUnboundedSender, _phantom: PhantomData, } @@ -246,7 +249,7 @@ where let call_executor = LocalCallExecutor::new( backend.clone(), executor, - spawn_handle, + spawn_handle.clone(), config.clone(), extensions, )?; @@ -254,6 +257,7 @@ where Client::new( backend, call_executor, + spawn_handle, genesis_block_builder, Default::default(), Default::default(), @@ -296,11 +300,20 @@ where let ClientImportOperation { mut op, notify_imported, notify_finalized } = op; - let finality_notification = notify_finalized.map(|summary| summary.into()); + let finality_notification = notify_finalized.map(|summary| { + FinalityNotification::from_summary(summary, self.unpin_worker_sender.clone()) + }); + let (import_notification, storage_changes) = match notify_imported { Some(mut summary) => { let storage_changes = summary.storage_changes.take(); - (Some(summary.into()), storage_changes) + ( + Some(BlockImportNotification::from_summary( + summary, + self.unpin_worker_sender.clone(), + )), + storage_changes, + ) }, None => (None, None), }; @@ -318,6 +331,27 @@ where self.backend.commit_operation(op)?; + // We need to pin the block in the backend once + // for each notification. Once all notifications are + // dropped, the block will be unpinned automatically. + if let Some(ref notification) = finality_notification { + if let Err(err) = self.backend.pin_block(notification.hash) { + error!( + "Unable to pin block for finality notification. hash: {}, Error: {}", + notification.hash, err + ); + }; + } + + if let Some(ref notification) = import_notification { + if let Err(err) = self.backend.pin_block(notification.hash) { + error!( + "Unable to pin block for import notification. hash: {}, Error: {}", + notification.hash, err + ); + }; + } + self.notify_finalized(finality_notification)?; self.notify_imported(import_notification, storage_changes)?; @@ -357,6 +391,7 @@ where pub fn new( backend: Arc, executor: E, + spawn_handle: Box, genesis_block_builder: G, fork_blocks: ForkBlocks, bad_blocks: BadBlocks, @@ -369,6 +404,7 @@ where Block, BlockImportOperation = >::BlockImportOperation, >, + B: 'static, { let info = backend.blockchain().info(); if info.finalized_state.is_none() { @@ -390,6 +426,26 @@ where backend.commit_operation(op)?; } + let (unpin_worker_sender, mut rx) = + tracing_unbounded::("unpin-worker-channel", 10_000); + let task_backend = Arc::downgrade(&backend); + spawn_handle.spawn( + "unpin-worker", + None, + async move { + while let Some(message) = rx.next().await { + if let Some(backend) = task_backend.upgrade() { + backend.unpin_block(message); + } else { + log::debug!("Terminating unpin-worker, backend reference was dropped."); + return + } + } + log::debug!("Terminating unpin-worker, stream terminated.") + } + .boxed(), + ); + Ok(Client { backend, executor, @@ -402,6 +458,7 @@ where block_rules: BlockRules::new(fork_blocks, bad_blocks), config, telemetry, + unpin_worker_sender, _phantom: Default::default(), }) } diff --git a/test-utils/client/src/lib.rs b/test-utils/client/src/lib.rs index 5dc93da13fe74..ff744b80cbff4 100644 --- a/test-utils/client/src/lib.rs +++ b/test-utils/client/src/lib.rs @@ -41,7 +41,7 @@ use futures::{future::Future, stream::StreamExt}; use sc_client_api::BlockchainEvents; use sc_service::client::{ClientConfig, LocalCallExecutor}; use serde::Deserialize; -use sp_core::storage::ChildInfo; +use sp_core::{storage::ChildInfo, testing::TaskExecutor}; use sp_runtime::{codec::Encode, traits::Block as BlockT, OpaqueExtrinsic}; use std::{ collections::{HashMap, HashSet}, @@ -62,7 +62,7 @@ impl GenesisInit for () { } /// A builder for creating a test client instance. -pub struct TestClientBuilder { +pub struct TestClientBuilder { execution_strategies: ExecutionStrategies, genesis_init: G, /// The key is an unprefixed storage key, this only contains @@ -237,9 +237,12 @@ impl ) .expect("Creates genesis block builder"); + let spawn_handle = Box::new(TaskExecutor::new()); + let client = client::Client::new( self.backend.clone(), executor, + spawn_handle, genesis_block_builder, self.fork_blocks, self.bad_blocks,