From 00ac1d3e140bdf6f7f48ff0ae53c319c3c76c5e8 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Wed, 20 Nov 2024 07:57:11 +0800 Subject: [PATCH] Pure state sync refactoring (part-2) (#6521) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR is the second part of the pure state sync refactoring, encapsulating `StateSyncMetadata` as a separate entity. Now it's pretty straightforward what changes are needed for the persistent state sync as observed in the struct `StateSync`: - `state`: redirect directly to the DB layer instead of being accumulated in the memory. - `metadata`: handle the state sync metadata on disk whenever the state is forwarded to the DB, resume an ongoing state sync on a restart, etc. --------- Co-authored-by: Bastian Köcher Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> --- prdoc/pr_6521.prdoc | 10 ++ .../network/sync/src/strategy/state_sync.rs | 136 ++++++++++-------- 2 files changed, 90 insertions(+), 56 deletions(-) create mode 100644 prdoc/pr_6521.prdoc diff --git a/prdoc/pr_6521.prdoc b/prdoc/pr_6521.prdoc new file mode 100644 index 000000000000..6f4acf8d028b --- /dev/null +++ b/prdoc/pr_6521.prdoc @@ -0,0 +1,10 @@ +title: Pure state sync refactoring (part-2) + +doc: +- audience: Node Dev + description: | + This is the last part of the pure refactoring of state sync, focusing on encapsulating `StateSyncMetadata` as a separate entity. + +crates: +- name: sc-network-sync + bump: none diff --git a/substrate/client/network/sync/src/strategy/state_sync.rs b/substrate/client/network/sync/src/strategy/state_sync.rs index 7a0cc1191609..47d859a1b7c6 100644 --- a/substrate/client/network/sync/src/strategy/state_sync.rs +++ b/substrate/client/network/sync/src/strategy/state_sync.rs @@ -89,22 +89,62 @@ pub enum ImportResult { BadResponse, } -/// State sync state machine. Accumulates partial state data until it -/// is ready to be imported. -pub struct StateSync { - target_block: B::Hash, +struct StateSyncMetadata { + last_key: SmallVec<[Vec; 2]>, target_header: B::Header, - target_root: B::Hash, target_body: Option>, target_justifications: Option, - last_key: SmallVec<[Vec; 2]>, - state: HashMap, (Vec<(Vec, Vec)>, Vec>)>, complete: bool, - client: Arc, imported_bytes: u64, skip_proof: bool, } +impl StateSyncMetadata { + fn target_hash(&self) -> B::Hash { + self.target_header.hash() + } + + /// Returns target block number. + fn target_number(&self) -> NumberFor { + *self.target_header.number() + } + + fn target_root(&self) -> B::Hash { + *self.target_header.state_root() + } + + fn next_request(&self) -> StateRequest { + StateRequest { + block: self.target_hash().encode(), + start: self.last_key.clone().into_vec(), + no_proof: self.skip_proof, + } + } + + fn progress(&self) -> StateSyncProgress { + let cursor = *self.last_key.get(0).and_then(|last| last.get(0)).unwrap_or(&0u8); + let percent_done = cursor as u32 * 100 / 256; + StateSyncProgress { + percentage: percent_done, + size: self.imported_bytes, + phase: if self.complete { + StateSyncPhase::ImportingState + } else { + StateSyncPhase::DownloadingState + }, + } + } +} + +/// State sync state machine. +/// +/// Accumulates partial state data until it is ready to be imported. +pub struct StateSync { + metadata: StateSyncMetadata, + state: HashMap, (Vec<(Vec, Vec)>, Vec>)>, + client: Arc, +} + impl StateSync where B: BlockT, @@ -120,16 +160,16 @@ where ) -> Self { Self { client, - target_block: target_header.hash(), - target_root: *target_header.state_root(), - target_header, - target_body, - target_justifications, - last_key: SmallVec::default(), + metadata: StateSyncMetadata { + last_key: SmallVec::default(), + target_header, + target_body, + target_justifications, + complete: false, + imported_bytes: 0, + skip_proof, + }, state: HashMap::default(), - complete: false, - imported_bytes: 0, - skip_proof, } } @@ -155,7 +195,7 @@ where if is_top && well_known_keys::is_child_storage_key(key.as_slice()) { child_storage_roots.push((value, key)); } else { - self.imported_bytes += key.len() as u64; + self.metadata.imported_bytes += key.len() as u64; entry.0.push((key, value)); } } @@ -177,11 +217,11 @@ where // the parent cursor stays valid. // Empty parent trie content only happens when all the response content // is part of a single child trie. - if self.last_key.len() == 2 && response.entries[0].entries.is_empty() { + if self.metadata.last_key.len() == 2 && response.entries[0].entries.is_empty() { // Do not remove the parent trie position. - self.last_key.pop(); + self.metadata.last_key.pop(); } else { - self.last_key.clear(); + self.metadata.last_key.clear(); } for state in response.entries { debug!( @@ -193,7 +233,7 @@ where if !state.complete { if let Some(e) = state.entries.last() { - self.last_key.push(e.key.clone()); + self.metadata.last_key.push(e.key.clone()); } complete = false; } @@ -219,11 +259,11 @@ where debug!(target: LOG_TARGET, "Bad state response"); return ImportResult::BadResponse } - if !self.skip_proof && response.proof.is_empty() { + if !self.metadata.skip_proof && response.proof.is_empty() { debug!(target: LOG_TARGET, "Missing proof"); return ImportResult::BadResponse } - let complete = if !self.skip_proof { + let complete = if !self.metadata.skip_proof { debug!(target: LOG_TARGET, "Importing state from {} trie nodes", response.proof.len()); let proof_size = response.proof.len() as u64; let proof = match CompactProof::decode(&mut response.proof.as_ref()) { @@ -234,9 +274,9 @@ where }, }; let (values, completed) = match self.client.verify_range_proof( - self.target_root, + self.metadata.target_root(), proof, - self.last_key.as_slice(), + self.metadata.last_key.as_slice(), ) { Err(e) => { debug!( @@ -251,27 +291,25 @@ where debug!(target: LOG_TARGET, "Imported with {} keys", values.len()); let complete = completed == 0; - if !complete && !values.update_last_key(completed, &mut self.last_key) { + if !complete && !values.update_last_key(completed, &mut self.metadata.last_key) { debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed); }; self.process_state_verified(values); - self.imported_bytes += proof_size; + self.metadata.imported_bytes += proof_size; complete } else { self.process_state_unverified(response) }; if complete { - self.complete = true; + self.metadata.complete = true; + let target_hash = self.metadata.target_hash(); ImportResult::Import( - self.target_block, - self.target_header.clone(), - ImportedState { - block: self.target_block, - state: std::mem::take(&mut self.state).into(), - }, - self.target_body.clone(), - self.target_justifications.clone(), + target_hash, + self.metadata.target_header.clone(), + ImportedState { block: target_hash, state: std::mem::take(&mut self.state).into() }, + self.metadata.target_body.clone(), + self.metadata.target_justifications.clone(), ) } else { ImportResult::Continue @@ -280,40 +318,26 @@ where /// Produce next state request. fn next_request(&self) -> StateRequest { - StateRequest { - block: self.target_block.encode(), - start: self.last_key.clone().into_vec(), - no_proof: self.skip_proof, - } + self.metadata.next_request() } /// Check if the state is complete. fn is_complete(&self) -> bool { - self.complete + self.metadata.complete } /// Returns target block number. fn target_number(&self) -> NumberFor { - *self.target_header.number() + self.metadata.target_number() } /// Returns target block hash. fn target_hash(&self) -> B::Hash { - self.target_block + self.metadata.target_hash() } /// Returns state sync estimated progress. fn progress(&self) -> StateSyncProgress { - let cursor = *self.last_key.get(0).and_then(|last| last.get(0)).unwrap_or(&0u8); - let percent_done = cursor as u32 * 100 / 256; - StateSyncProgress { - percentage: percent_done, - size: self.imported_bytes, - phase: if self.complete { - StateSyncPhase::ImportingState - } else { - StateSyncPhase::DownloadingState - }, - } + self.metadata.progress() } }