Skip to content

Commit

Permalink
Apply paritytech#5956 manually
Browse files Browse the repository at this point in the history
  • Loading branch information
liuchengxu committed Nov 28, 2024
1 parent f5245c8 commit 18378bb
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions substrate/client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ pub trait BlockImportOperation<Block: BlockT> {
/// Add a transaction index operation.
fn update_transaction_index(&mut self, index: Vec<IndexOperation>)
-> sp_blockchain::Result<()>;

/// Configure whether to commit the state changes to the underlying database.
fn set_commit_state(&mut self, commit: bool);
}

/// Interface for performing operations on the backend.
Expand Down Expand Up @@ -632,6 +635,27 @@ pub trait Backend<Block: BlockT>: AuxStore + Send + Sync {

/// Tells whether the backend requires full-sync mode.
fn requires_full_sync(&self) -> bool;

/// Import the state changes directly to the database.
///
/// # Arguments
///
/// - `at`: The block hash corresponding to the last available state before updating the trie
/// database.
/// - `storage`: The storage changes reflecting the transition from the last local state to the
/// target block's state being imported.
/// - `state_version`: The state version of the target block, which is resolved from the
/// provided `storage` data.
///
/// # Returns
///
/// Returns the state root after importing the state.
fn import_state(
&self,
at: Block::Hash,
storage: sp_runtime::Storage,
state_version: sp_runtime::StateVersion,
) -> sp_blockchain::Result<Block::Hash>;
}

/// Mark for all Backend implementations, that are making use of state data, stored locally.
Expand Down
11 changes: 11 additions & 0 deletions substrate/client/api/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ impl<Block: BlockT> backend::BlockImportOperation<Block> for BlockImportOperatio
) -> sp_blockchain::Result<()> {
Ok(())
}

fn set_commit_state(&mut self, _commit: bool) {}
}

/// In-memory backend. Keeps all states and blocks in memory.
Expand Down Expand Up @@ -774,6 +776,15 @@ impl<Block: BlockT> backend::Backend<Block> for Backend<Block> {
let mut blocks = self.pinned_blocks.write();
blocks.entry(hash).and_modify(|counter| *counter -= 1).or_insert(-1);
}

fn import_state(
&self,
_at: Block::Hash,
_storage: sp_runtime::Storage,
_state_version: sp_runtime::StateVersion,
) -> sp_blockchain::Result<Block::Hash> {
unimplemented!("Not needed for in-mem backend")
}
}

impl<Block: BlockT> backend::LocalBackend<Block> for Backend<Block> {}
Expand Down
1 change: 1 addition & 0 deletions substrate/client/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ sp-runtime.workspace = true
sp-runtime.default-features = true
sp-state-machine.workspace = true
sp-state-machine.default-features = true
sp-storage = { workspace = true, default-features = true }
sp-trie.workspace = true
sp-trie.default-features = true

Expand Down
134 changes: 133 additions & 1 deletion substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod offchain;
pub mod bench;

mod children;
mod state_importer;
mod parity_db;
mod pinned_blocks_cache;
mod record_stats_state;
Expand All @@ -55,6 +56,7 @@ use crate::{
pinned_blocks_cache::PinnedBlocksCache,
record_stats_state::RecordStatsState,
stats::StateUsageStats,
state_importer::StateImporter,
utils::{meta_keys, read_db, read_meta, DatabaseType, Meta},
};
use codec::{Decode, Encode};
Expand Down Expand Up @@ -90,7 +92,7 @@ use sp_state_machine::{
OffchainChangesCollection, StateMachineStats, StorageCollection, StorageIterator, StorageKey,
StorageValue, UsageInfo as StateUsageInfo,
};
use sp_trie::{cache::SharedTrieCache, prefixed_key, MemoryDB, MerkleValue, PrefixedMemoryDB};
use sp_trie::{cache::SharedTrieCache, prefixed_key, MemoryDB, MerkleValue, PrefixedMemoryDB, TrieError};

// Re-export the Database trait so that one can pass an implementation of it.
pub use sc_state_db::PruningMode;
Expand All @@ -113,6 +115,9 @@ const DB_HASH_LEN: usize = 32;
/// Hash type that this backend uses for the database.
pub type DbHash = sp_core::H256;

type LayoutV0<Block> = sp_trie::LayoutV0<HashingFor<Block>>;
type LayoutV1<Block> = sp_trie::LayoutV1<HashingFor<Block>>;

/// An extrinsic entry in the database.
#[derive(Debug, Encode, Decode)]
enum DbExtrinsic<B: BlockT> {
Expand Down Expand Up @@ -995,6 +1000,10 @@ impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block>
self.index_ops = index_ops;
Ok(())
}

fn set_commit_state(&mut self, commit: bool) {
self.commit_state = commit;
}
}

struct StorageDb<Block: BlockT> {
Expand Down Expand Up @@ -2455,6 +2464,129 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
}
}

fn import_state(
&self,
at: Block::Hash,
storage: sp_runtime::Storage,
state_version: sp_runtime::StateVersion,
) -> sp_blockchain::Result<Block::Hash> {
let root = self.blockchain.header_metadata(at).map(|header| header.state_root)?;

let storage_db: Arc<dyn sp_state_machine::Storage<HashingFor<Block>>> =
self.storage.clone();
let mut state_importer = StateImporter::new(&storage_db, self.storage.db.clone());

let trie_err =
|err: Box<TrieError<LayoutV0<Block>>>| sp_blockchain::Error::Application(err);

let child_deltas = storage.children_default.values().map(|child_content| {
(
&child_content.child_info,
child_content.data.iter().map(|(k, v)| (&k[..], Some(&v[..]))),
)
});

let mut child_roots = Vec::new();

// child first
for (child_info, child_delta) in child_deltas {
let default_root = match child_info.child_type() {
sp_storage::ChildType::ParentKeyId =>
sp_trie::empty_child_trie_root::<LayoutV1<Block>>(),
};

let new_child_root = match state_version {
StateVersion::V0 => {
let child_root = match crate::state_importer::read_child_root::<
_,
_,
LayoutV0<Block>,
>(&state_importer, &root, &child_info)
{
Ok(Some(hash)) => hash,
Ok(None) => default_root,
Err(e) => {
warn!(target: "trie", "Failed to read child storage root: {}", e);
default_root
},
};

sp_trie::child_delta_trie_root::<LayoutV0<Block>, _, _, _, _, _, _>(
child_info.keyspace(),
&mut state_importer,
child_root,
child_delta,
None,
None,
)
.map_err(trie_err)?
},
StateVersion::V1 => {
let child_root = match crate::state_importer::read_child_root::<
_,
_,
LayoutV1<Block>,
>(&state_importer, &root, &child_info)
{
Ok(Some(hash)) => hash,
Ok(None) => default_root,
Err(e) => {
warn!(target: "trie", "Failed to read child storage root: {}", e);
default_root
},
};

sp_trie::child_delta_trie_root::<LayoutV1<Block>, _, _, _, _, _, _>(
child_info.keyspace(),
&mut state_importer,
child_root,
child_delta,
None,
None,
)
.map_err(trie_err)?
},
};

let is_default = new_child_root == default_root;

let prefixed_storage_key = child_info.prefixed_storage_key().into_inner();

if is_default {
child_roots.push((prefixed_storage_key, None));
} else {
child_roots.push((prefixed_storage_key, Some(new_child_root.encode())));
}
}

let delta = storage
.top
.into_iter()
.map(|(k, v)| (k, Some(v)))
.chain(child_roots.into_iter());

let state_root = match state_version {
StateVersion::V0 => sp_trie::delta_trie_root::<LayoutV0<Block>, _, _, _, _, _>(
&mut state_importer,
root,
delta,
None,
None,
)
.map_err(trie_err)?,
StateVersion::V1 => sp_trie::delta_trie_root::<LayoutV1<Block>, _, _, _, _, _>(
&mut state_importer,
root,
delta,
None,
None,
)
.map_err(trie_err)?,
};

Ok(state_root)
}

fn have_state_at(&self, hash: Block::Hash, number: NumberFor<Block>) -> bool {
if self.is_archive {
match self.blockchain.header_metadata(hash) {
Expand Down
135 changes: 135 additions & 0 deletions substrate/client/db/src/state_importer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use crate::{columns, DbHash};
use hash_db::{AsHashDB, HashDB, HashDBRef, Hasher, Prefix};
use sp_database::{Change, Database, Transaction};
use sp_state_machine::TrieBackendStorage;
use sp_storage::ChildInfo;
use sp_trie::{DBValue, TrieError, TrieHash, TrieLayout};
use std::{marker::PhantomData, sync::Arc};

/// [`StateImporter`] is responsible for importing the state changes
/// directly into the database, bypassing the in-memory intermediate storage
/// (`PrefixedMemoryDB`).
///
/// This approach avoids potential OOM issues that can arise when dealing with
/// large state imports, especially when importing the state downloaded from
/// fast sync or warp sync.
pub(crate) struct StateImporter<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> {
/// Old state storage backend.
storage: &'a S,
/// Handle to the trie database where changes will be committed.
trie_database: Arc<dyn Database<DbHash>>,
/// Default child storage root.
default_child_root: H::Out,
_phantom: PhantomData<H>,
}

impl<'a, S: TrieBackendStorage<H>, H: Hasher> StateImporter<'a, S, H> {
pub fn new(storage: &'a S, trie_database: Arc<dyn Database<DbHash>>) -> Self {
let default_child_root = sp_trie::empty_child_trie_root::<sp_trie::LayoutV1<H>>();
Self { storage, trie_database, default_child_root, _phantom: Default::default() }
}
}

pub(crate) fn read_child_root<'a, S, H, L>(
state_importer: &StateImporter<'a, S, H>,
root: &TrieHash<L>,
child_info: &ChildInfo,
) -> Result<Option<H::Out>, Box<TrieError<L>>>
where
S: 'a + TrieBackendStorage<H>,
H: Hasher,
L: TrieLayout,
StateImporter<'a, S, H>: HashDBRef<<L as TrieLayout>::Hash, Vec<u8>>,
{
let key = child_info.prefixed_storage_key();
Ok(sp_trie::read_trie_value::<L, _>(state_importer, root, key.as_slice(), None, None)?.map(
|r| {
let mut hash = H::Out::default();

// root is fetched from DB, not writable by runtime, so it's always valid.
hash.as_mut().copy_from_slice(&r[..]);

hash
},
))
}

impl<'a, S: 'a + TrieBackendStorage<H>, H: Hasher> hash_db::HashDB<H, DBValue>
for StateImporter<'a, S, H>
{
fn get(&self, key: &H::Out, prefix: Prefix) -> Option<DBValue> {
// TODO: we'll run into IncompleteDatabase error without this special handling.
// Double check and provide an explanation.
if *key == self.default_child_root {
return Some([0u8].to_vec());
}

let db_key = sp_trie::prefixed_key::<H>(key, prefix);

let res = self.trie_database.get(columns::STATE, &db_key).or_else(|| {
self.storage.get(key, prefix).unwrap_or_else(|e| {
log::warn!(target: "trie", "Failed to read from DB: {}", e);
None
})
});

// TODO: we'll run into IncompleteDatabase error without this special handling.
// Double check and provide an explanation.
if prefix == sp_trie::EMPTY_PREFIX && res.is_none() {
Some([0u8].to_vec())
} else {
res
}
}

fn contains(&self, key: &H::Out, prefix: Prefix) -> bool {
HashDB::get(self, key, prefix).is_some()
}

fn insert(&mut self, prefix: Prefix, value: &[u8]) -> H::Out {
let key = H::hash(value);
self.emplace(key, prefix, value.to_vec());
key
}

fn emplace(&mut self, key: H::Out, prefix: Prefix, value: DBValue) {
let key = sp_trie::prefixed_key::<H>(&key, prefix);
let tx = Transaction(vec![Change::Set(columns::STATE, key, value)]);
// TODO: better error handling?
self.trie_database
.commit(tx)
.unwrap_or_else(|err| panic!("Failed to put value into the state database: {err:?}"))
}

fn remove(&mut self, key: &H::Out, prefix: Prefix) {
let key = sp_trie::prefixed_key::<H>(&key, prefix);
let tx = Transaction(vec![Change::Remove(columns::STATE, key)]);
// TODO: better error handling?
self.trie_database
.commit(tx)
.unwrap_or_else(|err| panic!("Failed to remove value in the state database: {err:?}"))
}
}

impl<'a, S: 'a + TrieBackendStorage<H>, H: Hasher> HashDBRef<H, DBValue>
for StateImporter<'a, S, H>
{
fn get(&self, key: &H::Out, prefix: Prefix) -> Option<DBValue> {
HashDB::get(self, key, prefix)
}

fn contains(&self, key: &H::Out, prefix: Prefix) -> bool {
HashDB::contains(self, key, prefix)
}
}

impl<'a, S: 'a + TrieBackendStorage<H>, H: 'a + Hasher> AsHashDB<H, DBValue>
for StateImporter<'a, S, H>
{
fn as_hash_db<'b>(&'b self) -> &'b (dyn HashDB<H, DBValue> + 'b) {
self
}
fn as_hash_db_mut<'b>(&'b mut self) -> &'b mut (dyn HashDB<H, DBValue> + 'b) {
self
}
}
Loading

0 comments on commit 18378bb

Please sign in to comment.