diff --git a/Cargo.lock b/Cargo.lock index 3d22e0c0b6bd9..11359c078140b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1714,7 +1714,7 @@ dependencies = [ "num-traits", "parity-scale-codec", "parking_lot 0.11.1", - "rand 0.8.3", + "rand 0.8.4", ] [[package]] @@ -1724,7 +1724,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfcf0ed7fe52a17a03854ec54a9f76d6d84508d1c0e66bc1793301c73fc8493c" dependencies = [ "byteorder", - "rand 0.8.3", + "rand 0.8.4", "rustc-hex", "static_assertions", ] @@ -3384,7 +3384,7 @@ dependencies = [ "libp2p-core", "libp2p-swarm", "log", - "rand 0.8.3", + "rand 0.8.4", "smallvec 1.6.1", "socket2 0.4.0", "void", @@ -4056,7 +4056,7 @@ dependencies = [ "num-complex", "num-rational 0.4.0", "num-traits", - "rand 0.8.3", + "rand 0.8.4", "rand_distr", "simba", "typenum", @@ -4216,7 +4216,6 @@ dependencies = [ "sc-consensus-slots", "sc-consensus-uncles", "sc-finality-grandpa", - "sc-finality-grandpa-warp-sync", "sc-keystore", "sc-network", "sc-offchain", @@ -4903,7 +4902,7 @@ dependencies = [ "paste 1.0.4", "pretty_assertions 0.7.2", "pwasm-utils", - "rand 0.8.3", + "rand 0.8.4", "rand_pcg 0.3.0", "serde", "smallvec 1.6.1", @@ -5753,7 +5752,7 @@ dependencies = [ "log", "memmap2", "parking_lot 0.11.1", - "rand 0.8.3", + "rand 0.8.4", ] [[package]] @@ -6482,7 +6481,7 @@ checksum = "588f6378e4dd99458b60ec275b4477add41ce4fa9f64dcba6f15adccb19b50d6" dependencies = [ "env_logger 0.8.3", "log", - "rand 0.8.3", + "rand 0.8.4", ] [[package]] @@ -6550,9 +6549,9 @@ dependencies = [ [[package]] name = "rand" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" +checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" dependencies = [ "libc", "rand_chacha 0.3.0", @@ -6620,7 +6619,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "051b398806e42b9cd04ad9ec8f81e355d0a382c543ac6672c62f5a5b452ef142" dependencies = [ "num-traits", - "rand 0.8.3", + "rand 0.8.4", ] [[package]] @@ -7638,7 +7637,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.11.1", "pin-project 1.0.5", - "rand 0.7.3", + "rand 0.8.4", "sc-block-builder", "sc-client-api", "sc-consensus", @@ -7700,33 +7699,6 @@ dependencies = [ "substrate-test-runtime-client", ] -[[package]] -name = "sc-finality-grandpa-warp-sync" -version = "0.10.0-dev" -dependencies = [ - "derive_more", - "finality-grandpa", - "futures 0.3.15", - "log", - "num-traits", - "parity-scale-codec", - "parking_lot 0.11.1", - "prost", - "rand 0.8.3", - "sc-block-builder", - "sc-client-api", - "sc-consensus", - "sc-finality-grandpa", - "sc-network", - "sc-service", - "sp-blockchain", - "sp-consensus", - "sp-finality-grandpa", - "sp-keyring", - "sp-runtime", - "substrate-test-runtime-client", -] - [[package]] name = "sc-informant" version = "0.10.0-dev" @@ -7827,6 +7799,7 @@ dependencies = [ "sp-blockchain", "sp-consensus", "sp-core", + "sp-finality-grandpa", "sp-keyring", "sp-runtime", "sp-test-primitives", @@ -8710,7 +8683,7 @@ dependencies = [ "futures 0.3.15", "httparse", "log", - "rand 0.8.3", + "rand 0.8.4", "sha-1 0.9.4", ] @@ -9550,7 +9523,7 @@ dependencies = [ "lazy_static", "nalgebra", "num-traits", - "rand 0.8.3", + "rand 0.8.4", ] [[package]] @@ -9966,7 +9939,7 @@ checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ "cfg-if 1.0.0", "libc", - "rand 0.8.3", + "rand 0.8.4", "redox_syscall 0.2.5", "remove_dir_all", "winapi 0.3.9", @@ -10012,6 +9985,7 @@ dependencies = [ "sp-consensus-babe", "sp-core", "sp-externalities", + "sp-finality-grandpa", "sp-inherents", "sp-keyring", "sp-keystore", @@ -10638,7 +10612,7 @@ dependencies = [ "ipnet", "lazy_static", "log", - "rand 0.8.3", + "rand 0.8.4", "smallvec 1.6.1", "thiserror", "tinyvec", @@ -11287,7 +11261,7 @@ dependencies = [ "mach", "memoffset 0.6.1", "more-asserts", - "rand 0.8.3", + "rand 0.8.4", "region", "thiserror", "wasmtime-environ", @@ -11455,7 +11429,7 @@ dependencies = [ "log", "nohash-hasher", "parking_lot 0.11.1", - "rand 0.8.3", + "rand 0.8.4", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index 03115fe5593f1..2834344153a8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,6 @@ members = [ "client/executor/wasmi", "client/executor/wasmtime", "client/finality-grandpa", - "client/finality-grandpa-warp-sync", "client/informant", "client/keystore", "client/light", diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index dbdb3074d6863..9eba1d0e9e05f 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -162,6 +162,10 @@ pub fn new_full(mut config: Configuration) -> Result } config.network.extra_sets.push(sc_finality_grandpa::grandpa_peers_set_config()); + let warp_sync = Arc::new(sc_finality_grandpa::warp_proof::NetworkProvider::new( + backend.clone(), + grandpa_link.shared_authority_set().clone(), + )); let (network, system_rpc_tx, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { @@ -172,6 +176,7 @@ pub fn new_full(mut config: Configuration) -> Result import_queue, on_demand: None, block_announce_validator_builder: None, + warp_sync: Some(warp_sync), })?; if config.offchain_worker.enabled { @@ -380,6 +385,11 @@ pub fn new_light(mut config: Configuration) -> Result telemetry: telemetry.as_ref().map(|x| x.handle()), })?; + let warp_sync = Arc::new(sc_finality_grandpa::warp_proof::NetworkProvider::new( + backend.clone(), + grandpa_link.shared_authority_set().clone(), + )); + let (network, system_rpc_tx, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -389,6 +399,7 @@ pub fn new_light(mut config: Configuration) -> Result import_queue, on_demand: Some(on_demand.clone()), block_announce_validator_builder: None, + warp_sync: Some(warp_sync), })?; if config.offchain_worker.enabled { diff --git a/bin/node-template/runtime/src/lib.rs b/bin/node-template/runtime/src/lib.rs index 63da72102df3d..908c5ea455cc7 100644 --- a/bin/node-template/runtime/src/lib.rs +++ b/bin/node-template/runtime/src/lib.rs @@ -402,6 +402,10 @@ impl_runtime_apis! { Grandpa::grandpa_authorities() } + fn current_set_id() -> fg_primitives::SetId { + Grandpa::current_set_id() + } + fn submit_report_equivocation_unsigned_extrinsic( _equivocation_proof: fg_primitives::EquivocationProof< ::Hash, diff --git a/bin/node/cli/Cargo.toml b/bin/node/cli/Cargo.toml index 7c8c2d0e3d863..12a76cf323e4b 100644 --- a/bin/node/cli/Cargo.toml +++ b/bin/node/cli/Cargo.toml @@ -77,7 +77,6 @@ sc-service = { version = "0.10.0-dev", default-features = false, path = "../../. sc-tracing = { version = "4.0.0-dev", path = "../../../client/tracing" } sc-telemetry = { version = "4.0.0-dev", path = "../../../client/telemetry" } sc-authority-discovery = { version = "0.10.0-dev", path = "../../../client/authority-discovery" } -sc-finality-grandpa-warp-sync = { version = "0.10.0-dev", path = "../../../client/finality-grandpa-warp-sync", optional = true } # frame dependencies pallet-indices = { version = "4.0.0-dev", path = "../../../frame/indices" } @@ -161,7 +160,6 @@ cli = [ "frame-benchmarking-cli", "substrate-frame-cli", "sc-service/db", - "sc-finality-grandpa-warp-sync", "structopt", "substrate-build-script-utils", "try-runtime-cli", diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index e7181d3caec38..301df01c55f80 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -228,16 +228,10 @@ pub fn new_full_base( let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht; config.network.extra_sets.push(grandpa::grandpa_peers_set_config()); - - #[cfg(feature = "cli")] - config.network.request_response_protocols.push( - sc_finality_grandpa_warp_sync::request_response_config_for_chain( - &config, - task_manager.spawn_handle(), - backend.clone(), - import_setup.1.shared_authority_set().clone(), - ), - ); + let warp_sync = Arc::new(grandpa::warp_proof::NetworkProvider::new( + backend.clone(), + import_setup.1.shared_authority_set().clone(), + )); let (network, system_rpc_tx, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { @@ -248,6 +242,7 @@ pub fn new_full_base( import_queue, on_demand: None, block_announce_validator_builder: None, + warp_sync: Some(warp_sync), })?; if config.offchain_worker.enabled { @@ -512,6 +507,11 @@ pub fn new_light_base( telemetry.as_ref().map(|x| x.handle()), )?; + let warp_sync = Arc::new(grandpa::warp_proof::NetworkProvider::new( + backend.clone(), + grandpa_link.shared_authority_set().clone(), + )); + let (network, system_rpc_tx, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -521,6 +521,7 @@ pub fn new_light_base( import_queue, on_demand: Some(on_demand.clone()), block_announce_validator_builder: None, + warp_sync: Some(warp_sync), })?; let enable_grandpa = !config.disable_grandpa; diff --git a/bin/node/runtime/src/lib.rs b/bin/node/runtime/src/lib.rs index 181f5fd423767..37b4b24fa6a2a 100644 --- a/bin/node/runtime/src/lib.rs +++ b/bin/node/runtime/src/lib.rs @@ -1336,6 +1336,10 @@ impl_runtime_apis! { Grandpa::grandpa_authorities() } + fn current_set_id() -> fg_primitives::SetId { + Grandpa::current_set_id() + } + fn submit_report_equivocation_unsigned_extrinsic( equivocation_proof: fg_primitives::EquivocationProof< ::Hash, diff --git a/client/cli/src/arg_enums.rs b/client/cli/src/arg_enums.rs index 83b1c57e071a4..72741d7bea2bb 100644 --- a/client/cli/src/arg_enums.rs +++ b/client/cli/src/arg_enums.rs @@ -242,6 +242,8 @@ arg_enum! { Fast, // Download blocks without executing them. Download latest state without proofs. FastUnsafe, + // Prove finality and download the latest state. + Warp, } } @@ -253,6 +255,7 @@ impl Into for SyncMode { sc_network::config::SyncMode::Fast { skip_proofs: false, storage_chain_mode: false }, SyncMode::FastUnsafe => sc_network::config::SyncMode::Fast { skip_proofs: true, storage_chain_mode: false }, + SyncMode::Warp => sc_network::config::SyncMode::Warp, } } } diff --git a/client/consensus/aura/src/import_queue.rs b/client/consensus/aura/src/import_queue.rs index 96045fde43a9f..a8b0462709767 100644 --- a/client/consensus/aura/src/import_queue.rs +++ b/client/consensus/aura/src/import_queue.rs @@ -35,7 +35,7 @@ use sp_blockchain::{ well_known_cache_keys::{self, Id as CacheKeyId}, HeaderBackend, ProvideCache, }; -use sp_consensus::{BlockOrigin, CanAuthorWith, Error as ConsensusError}; +use sp_consensus::{CanAuthorWith, Error as ConsensusError}; use sp_consensus_aura::{ digests::CompatibleDigestItem, inherents::AuraInherentData, AuraApi, ConsensusLog, AURA_ENGINE_ID, @@ -46,7 +46,6 @@ use sp_inherents::{CreateInherentDataProviders, InherentDataProvider as _}; use sp_runtime::{ generic::{BlockId, OpaqueDigestItemId}, traits::{Block as BlockT, DigestItemFor, Header}, - Justifications, }; use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; @@ -206,13 +205,10 @@ where { async fn verify( &mut self, - origin: BlockOrigin, - header: B::Header, - justifications: Option, - mut body: Option>, + mut block: BlockImportParams, ) -> Result<(BlockImportParams, Option)>>), String> { - let hash = header.hash(); - let parent_hash = *header.parent_hash(); + let hash = block.header.hash(); + let parent_hash = *block.header.parent_hash(); let authorities = authorities(self.client.as_ref(), &BlockId::Hash(parent_hash)) .map_err(|e| format!("Could not fetch authorities at {:?}: {:?}", parent_hash, e))?; @@ -234,7 +230,7 @@ where let checked_header = check_header::( &self.client, slot_now + 1, - header, + block.header, hash, &authorities[..], self.check_for_equivocation, @@ -245,8 +241,8 @@ where // if the body is passed through, we need to use the runtime // to check that the internally-set timestamp in the inherents // actually matches the slot set in the seal. - if let Some(inner_body) = body.take() { - let block = B::new(pre_header.clone(), inner_body); + if let Some(inner_body) = block.body.take() { + let new_block = B::new(pre_header.clone(), inner_body); inherent_data.aura_replace_inherent_data(slot); @@ -261,7 +257,7 @@ where .map_err(|e| format!("{:?}", e))? { self.check_inherents( - block.clone(), + new_block.clone(), BlockId::Hash(parent_hash), inherent_data, create_inherent_data_providers, @@ -270,8 +266,8 @@ where .map_err(|e| e.to_string())?; } - let (_, inner_body) = block.deconstruct(); - body = Some(inner_body); + let (_, inner_body) = new_block.deconstruct(); + block.body = Some(inner_body); } trace!(target: "aura", "Checked {:?}; importing.", pre_header); @@ -298,14 +294,12 @@ where _ => None, }); - let mut import_block = BlockImportParams::new(origin, pre_header); - import_block.post_digests.push(seal); - import_block.body = body; - import_block.justifications = justifications; - import_block.fork_choice = Some(ForkChoiceStrategy::LongestChain); - import_block.post_hash = Some(hash); + block.header = pre_header; + block.post_digests.push(seal); + block.fork_choice = Some(ForkChoiceStrategy::LongestChain); + block.post_hash = Some(hash); - Ok((import_block, maybe_keys)) + Ok((block, maybe_keys)) }, CheckedHeader::Deferred(a, b) => { debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b); diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index b09cd6ad86b82..172bad669daa8 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -118,7 +118,6 @@ use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; use sp_runtime::{ generic::{BlockId, OpaqueDigestItemId}, traits::{Block as BlockT, DigestItemFor, Header, Zero}, - Justifications, }; pub use sc_consensus_slots::SlotProportion; @@ -187,6 +186,19 @@ impl EpochT for Epoch { } } +impl From for Epoch { + fn from(epoch: sp_consensus_babe::Epoch) -> Self { + Epoch { + epoch_index: epoch.epoch_index, + start_slot: epoch.start_slot, + duration: epoch.duration, + authorities: epoch.authorities, + randomness: epoch.randomness, + config: epoch.config, + } + } +} + impl Epoch { /// Create the genesis epoch (epoch #0). This is defined to start at the slot of /// the first block, so that has to be provided. @@ -1128,24 +1140,29 @@ where { async fn verify( &mut self, - origin: BlockOrigin, - header: Block::Header, - justifications: Option, - mut body: Option>, + mut block: BlockImportParams, ) -> BlockVerificationResult { trace!( target: "babe", "Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}", - origin, - header, - justifications, - body, + block.origin, + block.header, + block.justifications, + block.body, ); - let hash = header.hash(); - let parent_hash = *header.parent_hash(); + let hash = block.header.hash(); + let parent_hash = *block.header.parent_hash(); + + if block.with_state() { + // When importing whole state we don't calculate epoch descriptor, but rather + // read it from the state after import. We also skip all verifications + // because there's no parent state and we trust the sync module to verify + // that the state is correct and finalized. + return Ok((block, Default::default())) + } - debug!(target: "babe", "We have {:?} logs in this header", header.digest().logs().len()); + debug!(target: "babe", "We have {:?} logs in this header", block.header.digest().logs().len()); let create_inherent_data_providers = self .create_inherent_data_providers @@ -1160,7 +1177,7 @@ where .header_metadata(parent_hash) .map_err(Error::::FetchParentHeader)?; - let pre_digest = find_pre_digest::(&header)?; + let pre_digest = find_pre_digest::(&block.header)?; let (check_header, epoch_descriptor) = { let epoch_changes = self.epoch_changes.shared_data(); let epoch_descriptor = epoch_changes @@ -1179,7 +1196,7 @@ where // We add one to the current slot to allow for some small drift. // FIXME #1019 in the future, alter this queue to allow deferring of headers let v_params = verification::VerificationParams { - header: header.clone(), + header: block.header.clone(), pre_digest: Some(pre_digest), slot_now: slot_now + 1, epoch: viable_epoch.as_ref(), @@ -1203,9 +1220,9 @@ where .check_and_report_equivocation( slot_now, slot, - &header, + &block.header, &verified_info.author, - &origin, + &block.origin, ) .await { @@ -1215,23 +1232,23 @@ where // if the body is passed through, we need to use the runtime // to check that the internally-set timestamp in the inherents // actually matches the slot set in the seal. - if let Some(inner_body) = body.take() { + if let Some(inner_body) = block.body { let mut inherent_data = create_inherent_data_providers .create_inherent_data() .map_err(Error::::CreateInherents)?; inherent_data.babe_replace_inherent_data(slot); - let block = Block::new(pre_header.clone(), inner_body); + let new_block = Block::new(pre_header.clone(), inner_body); self.check_inherents( - block.clone(), + new_block.clone(), BlockId::Hash(parent_hash), inherent_data, create_inherent_data_providers, ) .await?; - let (_, inner_body) = block.deconstruct(); - body = Some(inner_body); + let (_, inner_body) = new_block.deconstruct(); + block.body = Some(inner_body); } trace!(target: "babe", "Checked {:?}; importing.", pre_header); @@ -1242,17 +1259,15 @@ where "pre_header" => ?pre_header, ); - let mut import_block = BlockImportParams::new(origin, pre_header); - import_block.post_digests.push(verified_info.seal); - import_block.body = body; - import_block.justifications = justifications; - import_block.intermediates.insert( + block.header = pre_header; + block.post_digests.push(verified_info.seal); + block.intermediates.insert( Cow::from(INTERMEDIATE_KEY), Box::new(BabeIntermediate:: { epoch_descriptor }) as Box<_>, ); - import_block.post_hash = Some(hash); + block.post_hash = Some(hash); - Ok((import_block, Default::default())) + Ok((block, Default::default())) }, CheckedHeader::Deferred(a, b) => { debug!(target: "babe", "Checking {:?} failed; {:?}, {:?}.", hash, a, b); @@ -1305,6 +1320,72 @@ impl BabeBlockImport { } } +impl BabeBlockImport +where + Block: BlockT, + Inner: BlockImport> + Send + Sync, + Inner::Error: Into, + Client: HeaderBackend + + HeaderMetadata + + AuxStore + + ProvideRuntimeApi + + ProvideCache + + Send + + Sync, + Client::Api: BabeApi + ApiExt, +{ + /// Import whole state after warp sync. + // This function makes multiple transactions to the DB. If one of them fails we may + // end up in an inconsistent state and have to resync. + async fn import_state( + &mut self, + mut block: BlockImportParams>, + new_cache: HashMap>, + ) -> Result { + let hash = block.post_hash(); + let parent_hash = *block.header.parent_hash(); + let number = *block.header.number(); + + block.fork_choice = Some(ForkChoiceStrategy::Custom(true)); + // Reset block weight. + aux_schema::write_block_weight(hash, 0, |values| { + block + .auxiliary + .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec())))) + }); + + // First make the client import the state. + let import_result = self.inner.import_block(block, new_cache).await; + let aux = match import_result { + Ok(ImportResult::Imported(aux)) => aux, + Ok(r) => + return Err(ConsensusError::ClientImport(format!( + "Unexpected import result: {:?}", + r + ))), + Err(r) => return Err(r.into()), + }; + + // Read epoch info from the imported state. + let block_id = BlockId::hash(hash); + let current_epoch = self.client.runtime_api().current_epoch(&block_id).map_err(|e| { + ConsensusError::ClientImport(babe_err::(Error::RuntimeApi(e)).into()) + })?; + let next_epoch = self.client.runtime_api().next_epoch(&block_id).map_err(|e| { + ConsensusError::ClientImport(babe_err::(Error::RuntimeApi(e)).into()) + })?; + + let mut epoch_changes = self.epoch_changes.shared_data_locked(); + epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into()); + aux_schema::write_epoch_changes::(&*epoch_changes, |insert| { + self.client.insert_aux(insert, []) + }) + .map_err(|e| ConsensusError::ClientImport(e.to_string()))?; + + Ok(ImportResult::Imported(aux)) + } +} + #[async_trait::async_trait] impl BlockImport for BabeBlockImport where @@ -1336,7 +1417,7 @@ where match self.client.status(BlockId::Hash(hash)) { Ok(sp_blockchain::BlockStatus::InChain) => { // When re-importing existing block strip away intermediates. - let _ = block.take_intermediate::>(INTERMEDIATE_KEY)?; + let _ = block.take_intermediate::>(INTERMEDIATE_KEY); block.fork_choice = Some(ForkChoiceStrategy::Custom(false)); return self.inner.import_block(block, new_cache).await.map_err(Into::into) }, @@ -1344,6 +1425,10 @@ where Err(e) => return Err(ConsensusError::ClientImport(e.to_string())), } + if block.with_state() { + return self.import_state(block, new_cache).await + } + let pre_digest = find_pre_digest::(&block.header).expect( "valid babe headers must contain a predigest; header has been already verified; qed", ); diff --git a/client/consensus/babe/src/tests.rs b/client/consensus/babe/src/tests.rs index d21911a7fe50b..4b4e0a9d0f3d3 100644 --- a/client/consensus/babe/src/tests.rs +++ b/client/consensus/babe/src/tests.rs @@ -228,7 +228,6 @@ pub struct BabeTestNet { } type TestHeader = ::Header; -type TestExtrinsic = ::Extrinsic; type TestSelectChain = substrate_test_runtime_client::LongestChain; @@ -257,14 +256,11 @@ impl Verifier for TestVerifier { /// presented to the User in the logs. async fn verify( &mut self, - origin: BlockOrigin, - mut header: TestHeader, - justifications: Option, - body: Option>, + mut block: BlockImportParams, ) -> Result<(BlockImportParams, Option)>>), String> { // apply post-sealing mutations (i.e. stripping seal, if desired). - (self.mutator)(&mut header, Stage::PostSeal); - self.inner.verify(origin, header, justifications, body).await + (self.mutator)(&mut block.header, Stage::PostSeal); + self.inner.verify(block).await } } diff --git a/client/consensus/common/src/block_import.rs b/client/consensus/common/src/block_import.rs index 616378fc9b184..83fb11834dae6 100644 --- a/client/consensus/common/src/block_import.rs +++ b/client/consensus/common/src/block_import.rs @@ -112,6 +112,8 @@ pub struct BlockCheckParams { pub parent_hash: Block::Hash, /// Allow importing the block skipping state verification if parent state is missing. pub allow_missing_state: bool, + /// Allow importing the block if parent block is missing. + pub allow_missing_parent: bool, /// Re-validate existing block. pub import_existing: bool, } @@ -306,6 +308,11 @@ impl BlockImportParams { .downcast_mut::() .ok_or(Error::InvalidIntermediate) } + + /// Check if this block contains state import action + pub fn with_state(&self) -> bool { + matches!(self.state_action, StateAction::ApplyChanges(StorageChanges::Import(_))) + } } /// Block import trait. diff --git a/client/consensus/common/src/import_queue.rs b/client/consensus/common/src/import_queue.rs index b1a24e5620d3f..57d80cd41c649 100644 --- a/client/consensus/common/src/import_queue.rs +++ b/client/consensus/common/src/import_queue.rs @@ -26,7 +26,7 @@ //! instantiated. The `BasicQueue` and `BasicVerifier` traits allow serial //! queues to be instantiated simply. -use std::collections::HashMap; +use std::{collections::HashMap, iter::FromIterator}; use log::{debug, trace}; use sp_runtime::{ @@ -97,10 +97,7 @@ pub trait Verifier: Send + Sync { /// presented to the User in the logs. async fn verify( &mut self, - origin: BlockOrigin, - header: B::Header, - justifications: Option, - body: Option>, + block: BlockImportParams, ) -> Result<(BlockImportParams, Option)>>), String>; } @@ -222,7 +219,7 @@ pub(crate) async fn import_single_block_metered< trace!(target: "sync", "Header {} has {:?} logs", block.hash, header.digest().logs().len()); let number = header.number().clone(); - let hash = header.hash(); + let hash = block.hash; let parent_hash = header.parent_hash().clone(); let import_handler = |import| match import { @@ -260,6 +257,7 @@ pub(crate) async fn import_single_block_metered< parent_hash, allow_missing_state: block.allow_missing_state, import_existing: block.import_existing, + allow_missing_parent: block.state.is_some(), }) .await, )? { @@ -268,32 +266,14 @@ pub(crate) async fn import_single_block_metered< } let started = wasm_timer::Instant::now(); - let (mut import_block, maybe_keys) = verifier - .verify(block_origin, header, justifications, block.body) - .await - .map_err(|msg| { - if let Some(ref peer) = peer { - trace!(target: "sync", "Verifying {}({}) from {} failed: {}", number, hash, peer, msg); - } else { - trace!(target: "sync", "Verifying {}({}) failed: {}", number, hash, msg); - } - if let Some(metrics) = metrics.as_ref() { - metrics.report_verification(false, started.elapsed()); - } - BlockImportError::VerificationFailed(peer.clone(), msg) - })?; - - if let Some(metrics) = metrics.as_ref() { - metrics.report_verification(true, started.elapsed()); - } - let mut cache = HashMap::new(); - if let Some(keys) = maybe_keys { - cache.extend(keys.into_iter()); - } + let mut import_block = BlockImportParams::new(block_origin, header); + import_block.body = block.body; + import_block.justifications = justifications; + import_block.post_hash = Some(hash); import_block.import_existing = block.import_existing; import_block.indexed_body = block.indexed_body; - let mut import_block = import_block.clear_storage_changes_and_mutate(); + if let Some(state) = block.state { let changes = crate::block_import::StorageChanges::Import(state); import_block.state_action = StateAction::ApplyChanges(changes); @@ -303,6 +283,24 @@ pub(crate) async fn import_single_block_metered< import_block.state_action = StateAction::ExecuteIfPossible; } + let (import_block, maybe_keys) = verifier.verify(import_block).await.map_err(|msg| { + if let Some(ref peer) = peer { + trace!(target: "sync", "Verifying {}({}) from {} failed: {}", number, hash, peer, msg); + } else { + trace!(target: "sync", "Verifying {}({}) failed: {}", number, hash, msg); + } + if let Some(metrics) = metrics.as_ref() { + metrics.report_verification(false, started.elapsed()); + } + BlockImportError::VerificationFailed(peer.clone(), msg) + })?; + + if let Some(metrics) = metrics.as_ref() { + metrics.report_verification(true, started.elapsed()); + } + + let cache = HashMap::from_iter(maybe_keys.unwrap_or_default()); + let import_block = import_block.clear_storage_changes_and_mutate(); let imported = import_handle.import_block(import_block, cache).await; if let Some(metrics) = metrics.as_ref() { metrics.report_verification_and_import(started.elapsed()); diff --git a/client/consensus/common/src/import_queue/basic_queue.rs b/client/consensus/common/src/import_queue/basic_queue.rs index 2de5f578a7a66..dbf779c074f27 100644 --- a/client/consensus/common/src/import_queue/basic_queue.rs +++ b/client/consensus/common/src/import_queue/basic_queue.rs @@ -455,12 +455,9 @@ mod tests { impl Verifier for () { async fn verify( &mut self, - origin: BlockOrigin, - header: Header, - _justifications: Option, - _body: Option>, + block: BlockImportParams, ) -> Result<(BlockImportParams, Option)>>), String> { - Ok((BlockImportParams::new(origin, header), None)) + Ok((BlockImportParams::new(block.origin, block.header), None)) } } diff --git a/client/consensus/epochs/src/lib.rs b/client/consensus/epochs/src/lib.rs index e93724e5895f2..52327dbbf60e6 100644 --- a/client/consensus/epochs/src/lib.rs +++ b/client/consensus/epochs/src/lib.rs @@ -28,7 +28,7 @@ use sp_runtime::traits::{Block as BlockT, NumberFor, One, Zero}; use std::{ borrow::{Borrow, BorrowMut}, collections::BTreeMap, - ops::Add, + ops::{Add, Sub}, }; /// A builder for `is_descendent_of` functions. @@ -228,7 +228,7 @@ impl ViableEpochDescriptor { } /// Persisted epoch stored in EpochChanges. -#[derive(Clone, Encode, Decode, Debug)] +#[derive(Clone, Encode, Decode)] pub enum PersistedEpoch { /// Genesis persisted epoch data. epoch_0, epoch_1. Genesis(E, E), @@ -322,7 +322,7 @@ where impl EpochChanges where Hash: PartialEq + Ord + AsRef<[u8]> + AsMut<[u8]> + Copy, - Number: Ord + One + Zero + Add + Copy, + Number: Ord + One + Zero + Add + Sub + Copy, { /// Create a new epoch change. pub fn new() -> Self { @@ -614,6 +614,25 @@ where pub fn tree(&self) -> &ForkTree> { &self.inner } + + /// Reset to a specified pair of epochs, as if they were announced at blocks `parent_hash` and `hash`. + pub fn reset(&mut self, parent_hash: Hash, hash: Hash, number: Number, current: E, next: E) { + self.inner = ForkTree::new(); + self.epochs.clear(); + let persisted = PersistedEpoch::Regular(current); + let header = PersistedEpochHeader::from(&persisted); + let _res = self.inner.import(parent_hash, number - One::one(), header, &|_, _| { + Ok(false) as Result> + }); + self.epochs.insert((parent_hash, number - One::one()), persisted); + + let persisted = PersistedEpoch::Regular(next); + let header = PersistedEpochHeader::from(&persisted); + let _res = self.inner.import(hash, number, header, &|_, _| { + Ok(true) as Result> + }); + self.epochs.insert((hash, number), persisted); + } } /// Type alias to produce the epoch-changes tree from a block type. @@ -694,6 +713,7 @@ mod tests { #[test] fn genesis_epoch_is_created_but_not_imported() { + // // A - B // \ // — C @@ -735,6 +755,7 @@ mod tests { #[test] fn epoch_changes_between_blocks() { + // // A - B // \ // — C diff --git a/client/consensus/manual-seal/src/consensus/babe.rs b/client/consensus/manual-seal/src/consensus/babe.rs index 9edcb8fd13a17..d18170e9a0d6f 100644 --- a/client/consensus/manual-seal/src/consensus/babe.rs +++ b/client/consensus/manual-seal/src/consensus/babe.rs @@ -39,7 +39,7 @@ use std::{ use sc_consensus::{BlockImportParams, ForkChoiceStrategy, Verifier}; use sp_api::{ProvideRuntimeApi, TransactionFor}; use sp_blockchain::{HeaderBackend, HeaderMetadata}; -use sp_consensus::{BlockOrigin, CacheKeyId}; +use sp_consensus::CacheKeyId; use sp_consensus_babe::{ digests::{NextEpochDescriptor, PreDigest, SecondaryPlainPreDigest}, inherents::BabeInherentData, @@ -50,7 +50,6 @@ use sp_inherents::{InherentData, InherentDataProvider, InherentIdentifier}; use sp_runtime::{ generic::{BlockId, Digest}, traits::{Block as BlockT, DigestFor, DigestItemFor, Header, Zero}, - Justifications, }; use sp_timestamp::{InherentType, TimestampInherentData, INHERENT_IDENTIFIER}; @@ -98,20 +97,14 @@ where { async fn verify( &mut self, - origin: BlockOrigin, - header: B::Header, - justifications: Option, - body: Option>, + mut import_params: BlockImportParams, ) -> Result<(BlockImportParams, Option)>>), String> { - let mut import_params = BlockImportParams::new(origin, header.clone()); - import_params.justifications = justifications; - import_params.body = body; import_params.finalized = false; import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain); - let pre_digest = find_pre_digest::(&header)?; + let pre_digest = find_pre_digest::(&import_params.header)?; - let parent_hash = header.parent_hash(); + let parent_hash = import_params.header.parent_hash(); let parent = self .client .header(BlockId::Hash(*parent_hash)) diff --git a/client/consensus/manual-seal/src/lib.rs b/client/consensus/manual-seal/src/lib.rs index 7d4dfefe50c66..4f23bdcf65925 100644 --- a/client/consensus/manual-seal/src/lib.rs +++ b/client/consensus/manual-seal/src/lib.rs @@ -27,9 +27,9 @@ use sc_consensus::{ import_queue::{BasicQueue, BoxBlockImport, Verifier}, }; use sp_blockchain::HeaderBackend; -use sp_consensus::{BlockOrigin, CacheKeyId, Environment, Proposer, SelectChain}; +use sp_consensus::{CacheKeyId, Environment, Proposer, SelectChain}; use sp_inherents::CreateInherentDataProviders; -use sp_runtime::{traits::Block as BlockT, ConsensusEngineId, Justifications}; +use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::{marker::PhantomData, sync::Arc}; mod error; @@ -59,18 +59,11 @@ struct ManualSealVerifier; impl Verifier for ManualSealVerifier { async fn verify( &mut self, - origin: BlockOrigin, - header: B::Header, - justifications: Option, - body: Option>, + mut block: BlockImportParams, ) -> Result<(BlockImportParams, Option)>>), String> { - let mut import_params = BlockImportParams::new(origin, header); - import_params.justifications = justifications; - import_params.body = body; - import_params.finalized = false; - import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain); - - Ok((import_params, None)) + block.finalized = false; + block.fork_choice = Some(ForkChoiceStrategy::LongestChain); + Ok((block, None)) } } diff --git a/client/consensus/pow/src/lib.rs b/client/consensus/pow/src/lib.rs index 85a37e73535ad..17bd02f6a5651 100644 --- a/client/consensus/pow/src/lib.rs +++ b/client/consensus/pow/src/lib.rs @@ -58,15 +58,14 @@ use sp_api::ProvideRuntimeApi; use sp_block_builder::BlockBuilder as BlockBuilderApi; use sp_blockchain::{well_known_cache_keys::Id as CacheKeyId, HeaderBackend, ProvideCache}; use sp_consensus::{ - BlockOrigin, CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain, - SyncOracle, + CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain, SyncOracle, }; use sp_consensus_pow::{Seal, TotalDifficulty, POW_ENGINE_ID}; use sp_inherents::{CreateInherentDataProviders, InherentDataProvider}; use sp_runtime::{ generic::{BlockId, Digest, DigestItem}, traits::{Block as BlockT, Header as HeaderT}, - Justifications, RuntimeString, + RuntimeString, }; use std::{ borrow::Cow, cmp::Ordering, collections::HashMap, marker::PhantomData, sync::Arc, @@ -461,26 +460,20 @@ where { async fn verify( &mut self, - origin: BlockOrigin, - header: B::Header, - justifications: Option, - body: Option>, + mut block: BlockImportParams, ) -> Result<(BlockImportParams, Option)>>), String> { - let hash = header.hash(); - let (checked_header, seal) = self.check_header(header)?; + let hash = block.header.hash(); + let (checked_header, seal) = self.check_header(block.header)?; let intermediate = PowIntermediate:: { difficulty: None }; - - let mut import_block = BlockImportParams::new(origin, checked_header); - import_block.post_digests.push(seal); - import_block.body = body; - import_block.justifications = justifications; - import_block + block.header = checked_header; + block.post_digests.push(seal); + block .intermediates .insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>); - import_block.post_hash = Some(hash); + block.post_hash = Some(hash); - Ok((import_block, None)) + Ok((block, None)) } } diff --git a/client/db/src/cache/list_cache.rs b/client/db/src/cache/list_cache.rs index 9499ae2a89f45..1808d431dd056 100644 --- a/client/db/src/cache/list_cache.rs +++ b/client/db/src/cache/list_cache.rs @@ -302,6 +302,7 @@ impl> ListCache let prev_operation = operations.operations.last(); debug_assert!( entry_type != EntryType::Final || + self.unfinalized.is_empty() || self.best_finalized_block.hash == parent.hash || match prev_operation { Some(&CommitOperation::BlockFinalized(ref best_finalized_block, _, _)) => diff --git a/client/db/src/changes_tries_storage.rs b/client/db/src/changes_tries_storage.rs index a02e1cf7add98..c0649853160f6 100644 --- a/client/db/src/changes_tries_storage.rs +++ b/client/db/src/changes_tries_storage.rs @@ -358,18 +358,23 @@ impl DbChangesTrieStorage { let next_config = match cache_tx { Some(cache_tx) if config_for_new_block && cache_tx.new_config.is_some() => { let config = cache_tx.new_config.clone().expect("guarded by is_some(); qed"); - ChangesTrieConfigurationRange { + Ok(ChangesTrieConfigurationRange { zero: (block_num, block_hash), end: None, config, - } + }) }, _ if config_for_new_block => self.configuration_at(&BlockId::Hash( *new_header .expect("config_for_new_block is only true when new_header is passed; qed") .parent_hash(), - ))?, - _ => self.configuration_at(&BlockId::Hash(next_digest_range_start_hash))?, + )), + _ => self.configuration_at(&BlockId::Hash(next_digest_range_start_hash)), + }; + let next_config = match next_config { + Ok(next_config) => next_config, + Err(ClientError::UnknownBlock(_)) => break, // No block means nothing to prune. + Err(e) => return Err(e), }; if let Some(config) = next_config.config { let mut oldest_digest_range = config diff --git a/client/db/src/lib.rs b/client/db/src/lib.rs index 455ec1ef6b9d2..dda469f4fd336 100644 --- a/client/db/src/lib.rs +++ b/client/db/src/lib.rs @@ -692,7 +692,10 @@ impl HeaderMetadata for BlockchainDb { header_metadata }) .ok_or_else(|| { - ClientError::UnknownBlock(format!("header not found in db: {}", hash)) + ClientError::UnknownBlock(format!( + "Header was not found in the database: {:?}", + hash + )) }) }, Ok, @@ -1210,8 +1213,11 @@ impl Backend { return Err(sp_blockchain::Error::SetHeadTooOld.into()) } - // cannot find tree route with empty DB. - if meta.best_hash != Default::default() { + let parent_exists = + self.blockchain.status(BlockId::Hash(route_to))? == sp_blockchain::BlockStatus::InChain; + + // Cannot find tree route with empty DB or when imported a detached block. + if meta.best_hash != Default::default() && parent_exists { let tree_route = sp_blockchain::tree_route(&self.blockchain, meta.best_hash, route_to)?; // uncanonicalize: check safety violations and ensure the numbers no longer @@ -1261,8 +1267,10 @@ impl Backend { ) -> ClientResult<()> { let last_finalized = last_finalized.unwrap_or_else(|| self.blockchain.meta.read().finalized_hash); - if *header.parent_hash() != last_finalized { - return Err(::sp_blockchain::Error::NonSequentialFinalization(format!( + if last_finalized != self.blockchain.meta.read().genesis_hash && + *header.parent_hash() != last_finalized + { + return Err(sp_blockchain::Error::NonSequentialFinalization(format!( "Last finalized {:?} not parent of {:?}", last_finalized, header.hash() @@ -1588,7 +1596,7 @@ impl Backend { columns::META, meta_keys::LEAF_PREFIX, ); - }; + } let mut children = children::read_children( &*self.storage.db, @@ -1598,14 +1606,14 @@ impl Backend { )?; if !children.contains(&hash) { children.push(hash); + children::write_children( + &mut transaction, + columns::META, + meta_keys::CHILDREN_PREFIX, + parent_hash, + children, + ); } - children::write_children( - &mut transaction, - columns::META, - meta_keys::CHILDREN_PREFIX, - parent_hash, - children, - ); } meta_updates.push(MetaUpdate { @@ -1615,7 +1623,6 @@ impl Backend { is_finalized: finalized, with_state: operation.commit_state, }); - Some((pending_block.header, number, hash, enacted, retracted, is_best, cache)) } else { None diff --git a/client/finality-grandpa/Cargo.toml b/client/finality-grandpa/Cargo.toml index 706538e807243..66432a7aa51c8 100644 --- a/client/finality-grandpa/Cargo.toml +++ b/client/finality-grandpa/Cargo.toml @@ -22,7 +22,7 @@ futures = "0.3.9" futures-timer = "3.0.1" log = "0.4.8" parking_lot = "0.11.1" -rand = "0.7.2" +rand = "0.8.4" parity-scale-codec = { version = "2.0.0", features = ["derive"] } sp-application-crypto = { version = "4.0.0-dev", path = "../../primitives/application-crypto" } sp-arithmetic = { version = "4.0.0-dev", path = "../../primitives/arithmetic" } diff --git a/client/finality-grandpa/src/environment.rs b/client/finality-grandpa/src/environment.rs index 9cfd49eeb796c..f27a530ed2f40 100644 --- a/client/finality-grandpa/src/environment.rs +++ b/client/finality-grandpa/src/environment.rs @@ -1087,7 +1087,7 @@ where // random between `[0, 2 * gossip_duration]` seconds. let delay: u64 = - thread_rng().gen_range(0, 2 * self.config.gossip_duration.as_millis() as u64); + thread_rng().gen_range(0..2 * self.config.gossip_duration.as_millis() as u64); Box::pin(Delay::new(Duration::from_millis(delay)).map(Ok)) } diff --git a/client/finality-grandpa/src/import.rs b/client/finality-grandpa/src/import.rs index 84e6fa9e1fba5..a86421b4a0ef0 100644 --- a/client/finality-grandpa/src/import.rs +++ b/client/finality-grandpa/src/import.rs @@ -19,7 +19,7 @@ use std::{collections::HashMap, marker::PhantomData, sync::Arc}; use log::debug; -use parity_scale_codec::Encode; +use parity_scale_codec::{Decode, Encode}; use sc_client_api::{backend::Backend, utils::is_descendent_of}; use sc_consensus::{ @@ -27,10 +27,11 @@ use sc_consensus::{ BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport, }; use sc_telemetry::TelemetryHandle; -use sp_api::TransactionFor; +use sp_api::{Core, RuntimeApiInfo, TransactionFor}; use sp_blockchain::{well_known_cache_keys, BlockStatus}; use sp_consensus::{BlockOrigin, Error as ConsensusError, SelectChain}; -use sp_finality_grandpa::{ConsensusLog, ScheduledChange, SetId, GRANDPA_ENGINE_ID}; +use sp_core::hashing::twox_128; +use sp_finality_grandpa::{ConsensusLog, GrandpaApi, ScheduledChange, SetId, GRANDPA_ENGINE_ID}; use sp_runtime::{ generic::{BlockId, OpaqueDigestItemId}, traits::{Block as BlockT, DigestFor, Header as HeaderT, NumberFor, Zero}, @@ -43,7 +44,7 @@ use crate::{ environment::finalize_block, justification::GrandpaJustification, notification::GrandpaJustificationSender, - ClientForGrandpa, CommandOrError, Error, NewAuthoritySet, VoterCommand, + AuthoritySetChanges, ClientForGrandpa, CommandOrError, Error, NewAuthoritySet, VoterCommand, }; /// A block-import handler for GRANDPA. @@ -230,6 +231,10 @@ where DigestFor: Encode, BE: Backend, Client: ClientForGrandpa, + Client::Api: GrandpaApi, + for<'a> &'a Client: + BlockImport>, + TransactionFor: 'static, { // check for a new authority set change. fn check_new_change( @@ -418,6 +423,91 @@ where Ok(PendingSetChanges { just_in_case, applied_changes, do_pause }) } + + /// Read current set id form a given state. + fn current_set_id(&self, id: &BlockId) -> Result { + let runtime_version = self.inner.runtime_api().version(id).map_err(|e| { + ConsensusError::ClientImport(format!( + "Unable to retrieve current runtime version. {}", + e + )) + })?; + if runtime_version + .api_version(&>::ID) + .map_or(false, |v| v < 3) + { + // The new API is not supported in this runtime. Try reading directly from storage. + // This code may be removed once warp sync to an old runtime is no longer needed. + for prefix in ["GrandpaFinality", "Grandpa"] { + let k = [twox_128(prefix.as_bytes()), twox_128(b"CurrentSetId")].concat(); + if let Ok(Some(id)) = + self.inner.storage(&id, &sc_client_api::StorageKey(k.to_vec())) + { + if let Ok(id) = SetId::decode(&mut id.0.as_ref()) { + return Ok(id) + } + } + } + Err(ConsensusError::ClientImport("Unable to retrieve current set id.".into())) + } else { + self.inner + .runtime_api() + .current_set_id(&id) + .map_err(|e| ConsensusError::ClientImport(e.to_string())) + } + } + + /// Import whole new state and reset authority set. + async fn import_state( + &mut self, + mut block: BlockImportParams>, + new_cache: HashMap>, + ) -> Result { + let hash = block.post_hash(); + let number = *block.header.number(); + // Force imported state finality. + block.finalized = true; + let import_result = (&*self.inner).import_block(block, new_cache).await; + match import_result { + Ok(ImportResult::Imported(aux)) => { + // We've just imported a new state. We trust the sync module has verified + // finality proofs and that the state is correct and final. + // So we can read the authority list and set id from the state. + self.authority_set_hard_forks.clear(); + let block_id = BlockId::hash(hash); + let authorities = self + .inner + .runtime_api() + .grandpa_authorities(&block_id) + .map_err(|e| ConsensusError::ClientImport(e.to_string()))?; + let set_id = self.current_set_id(&block_id)?; + let authority_set = AuthoritySet::new( + authorities.clone(), + set_id, + fork_tree::ForkTree::new(), + Vec::new(), + AuthoritySetChanges::empty(), + ) + .ok_or_else(|| ConsensusError::ClientImport("Invalid authority list".into()))?; + *self.authority_set.inner_locked() = authority_set.clone(); + + crate::aux_schema::update_authority_set::( + &authority_set, + None, + |insert| self.inner.insert_aux(insert, []), + ) + .map_err(|e| ConsensusError::ClientImport(e.to_string()))?; + let new_set = + NewAuthoritySet { canon_number: number, canon_hash: hash, set_id, authorities }; + let _ = self + .send_voter_commands + .unbounded_send(VoterCommand::ChangeAuthorities(new_set)); + Ok(ImportResult::Imported(aux)) + }, + Ok(r) => Ok(r), + Err(e) => Err(ConsensusError::ClientImport(e.to_string())), + } + } } #[async_trait::async_trait] @@ -427,6 +517,7 @@ where DigestFor: Encode, BE: Backend, Client: ClientForGrandpa, + Client::Api: GrandpaApi, for<'a> &'a Client: BlockImport>, TransactionFor: 'static, @@ -455,6 +546,10 @@ where Err(e) => return Err(ConsensusError::ClientImport(e.to_string())), } + if block.with_state() { + return self.import_state(block, new_cache).await + } + // on initial sync we will restrict logging under info to avoid spam. let initial_sync = block.origin == BlockOrigin::NetworkInitialSync; diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index 8f8ce25b60a5f..2a10dfc0d50d8 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -64,7 +64,7 @@ use prometheus_endpoint::{PrometheusError, Registry}; use sc_client_api::{ backend::{AuxStore, Backend}, BlockchainEvents, CallExecutor, ExecutionStrategy, ExecutorProvider, Finalizer, LockImportRun, - TransactionFor, + StorageProvider, TransactionFor, }; use sc_consensus::BlockImport; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO}; @@ -119,6 +119,7 @@ mod notification; mod observer; mod until_imported; mod voting_rule; +pub mod warp_proof; pub use authorities::{AuthoritySet, AuthoritySetChanges, SharedAuthoritySet}; pub use aux_schema::best_justification; @@ -335,6 +336,7 @@ pub trait ClientForGrandpa: + ProvideRuntimeApi + ExecutorProvider + BlockImport, Error = sp_consensus::Error> + + StorageProvider where BE: Backend, Block: BlockT, @@ -353,7 +355,8 @@ where + BlockchainEvents + ProvideRuntimeApi + ExecutorProvider - + BlockImport, Error = sp_consensus::Error>, + + BlockImport, Error = sp_consensus::Error> + + StorageProvider, { } diff --git a/client/finality-grandpa/src/observer.rs b/client/finality-grandpa/src/observer.rs index cbea6c138c90f..dd120fdd1450f 100644 --- a/client/finality-grandpa/src/observer.rs +++ b/client/finality-grandpa/src/observer.rs @@ -177,12 +177,11 @@ where { let LinkHalf { client, - select_chain: _, persistent_data, voter_commands_rx, justification_sender, - justification_stream: _, telemetry, + .. } = link; let network = NetworkBridge::new( diff --git a/client/finality-grandpa/src/tests.rs b/client/finality-grandpa/src/tests.rs index bf9faec707533..6b151f314b5c5 100644 --- a/client/finality-grandpa/src/tests.rs +++ b/client/finality-grandpa/src/tests.rs @@ -203,6 +203,10 @@ sp_api::mock_impl_runtime_apis! { self.inner.genesis_authorities.clone() } + fn current_set_id(&self) -> SetId { + 0 + } + fn submit_report_equivocation_unsigned_extrinsic( _equivocation_proof: EquivocationProof, _key_owner_proof: OpaqueKeyOwnershipProof, diff --git a/client/finality-grandpa-warp-sync/src/proof.rs b/client/finality-grandpa/src/warp_proof.rs similarity index 68% rename from client/finality-grandpa-warp-sync/src/proof.rs rename to client/finality-grandpa/src/warp_proof.rs index d2484a800e63b..86b57c78a43e5 100644 --- a/client/finality-grandpa-warp-sync/src/proof.rs +++ b/client/finality-grandpa/src/warp_proof.rs @@ -14,12 +14,16 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use codec::{Decode, Encode}; +//! Utilities for generating and verifying GRANDPA warp sync proofs. -use sc_client_api::Backend as ClientBackend; -use sc_finality_grandpa::{ - find_scheduled_change, AuthoritySetChanges, BlockNumberOps, GrandpaJustification, +use sp_runtime::codec::{self, Decode, Encode}; + +use crate::{ + best_justification, find_scheduled_change, AuthoritySetChanges, BlockNumberOps, + GrandpaJustification, SharedAuthoritySet, }; +use sc_client_api::Backend as ClientBackend; +use sc_network::warp_request_handler::{EncodedProof, VerificationResult, WarpSyncProvider}; use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend}; use sp_finality_grandpa::{AuthorityList, SetId, GRANDPA_ENGINE_ID}; use sp_runtime::{ @@ -27,13 +31,34 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT, NumberFor, One}, }; -use crate::HandleRequestError; +use std::sync::Arc; + +/// Warp proof processing error. +#[derive(Debug, derive_more::Display, derive_more::From)] +pub enum Error { + /// Decoding error. + #[display(fmt = "Failed to decode block hash: {}.", _0)] + DecodeScale(codec::Error), + /// Client backend error. + Client(sp_blockchain::Error), + /// Invalid request data. + #[from(ignore)] + InvalidRequest(String), + /// Invalid warp proof. + #[from(ignore)] + InvalidProof(String), + /// Missing header or authority set change data. + #[display(fmt = "Missing required data to be able to answer request.")] + MissingData, +} + +impl std::error::Error for Error {} /// The maximum size in bytes of the `WarpSyncProof`. -pub(super) const MAX_WARP_SYNC_PROOF_SIZE: usize = 16 * 1024 * 1024; +pub(super) const MAX_WARP_SYNC_PROOF_SIZE: usize = 8 * 1024 * 1024; /// A proof of an authority set change. -#[derive(Decode, Encode)] +#[derive(Decode, Encode, Debug)] pub struct WarpSyncFragment { /// The last block that the given authority set finalized. This block should contain a digest /// signaling an authority set change from which we can fetch the next authority set. @@ -54,11 +79,11 @@ impl WarpSyncProof { /// Generates a warp sync proof starting at the given block. It will generate authority set /// change proofs for all changes that happened from `begin` until the current authority set /// (capped by MAX_WARP_SYNC_PROOF_SIZE). - pub fn generate( + fn generate( backend: &Backend, begin: Block::Hash, set_changes: &AuthoritySetChanges>, - ) -> Result, HandleRequestError> + ) -> Result, Error> where Backend: ClientBackend, { @@ -67,12 +92,10 @@ impl WarpSyncProof { let begin_number = blockchain .block_number_from_id(&BlockId::Hash(begin))? - .ok_or_else(|| HandleRequestError::InvalidRequest("Missing start block".to_string()))?; + .ok_or_else(|| Error::InvalidRequest("Missing start block".to_string()))?; if begin_number > blockchain.info().finalized_number { - return Err(HandleRequestError::InvalidRequest( - "Start block is not finalized".to_string(), - )) + return Err(Error::InvalidRequest("Start block is not finalized".to_string())) } let canon_hash = blockchain.hash(begin_number)?.expect( @@ -82,7 +105,7 @@ impl WarpSyncProof { ); if canon_hash != begin { - return Err(HandleRequestError::InvalidRequest( + return Err(Error::InvalidRequest( "Start block is not in the finalized chain".to_string(), )) } @@ -91,8 +114,7 @@ impl WarpSyncProof { let mut proofs_encoded_len = 0; let mut proof_limit_reached = false; - let set_changes = - set_changes.iter_from(begin_number).ok_or(HandleRequestError::MissingData)?; + let set_changes = set_changes.iter_from(begin_number).ok_or(Error::MissingData)?; for (_, last_block) in set_changes { let header = blockchain.header(BlockId::Number(*last_block))?.expect( @@ -137,19 +159,18 @@ impl WarpSyncProof { let is_finished = if proof_limit_reached { false } else { - let latest_justification = - sc_finality_grandpa::best_justification(backend)?.filter(|justification| { - // the existing best justification must be for a block higher than the - // last authority set change. if we didn't prove any authority set - // change then we fallback to make sure it's higher or equal to the - // initial warp sync block. - let limit = proofs - .last() - .map(|proof| proof.justification.target().0 + One::one()) - .unwrap_or(begin_number); - - justification.target().0 >= limit - }); + let latest_justification = best_justification(backend)?.filter(|justification| { + // the existing best justification must be for a block higher than the + // last authority set change. if we didn't prove any authority set + // change then we fallback to make sure it's higher or equal to the + // initial warp sync block. + let limit = proofs + .last() + .map(|proof| proof.justification.target().0 + One::one()) + .unwrap_or(begin_number); + + justification.target().0 >= limit + }); if let Some(latest_justification) = latest_justification { let header = blockchain.header(BlockId::Hash(latest_justification.target().1))? @@ -167,12 +188,13 @@ impl WarpSyncProof { } /// Verifies the warp sync proof starting at the given set id and with the given authorities. + /// Verification stops when either the proof is exhausted or finality for the target header can be proven. /// If the proof is valid the new set id and authorities is returned. - pub fn verify( + fn verify( &self, set_id: SetId, authorities: AuthorityList, - ) -> Result<(SetId, AuthorityList), HandleRequestError> + ) -> Result<(SetId, AuthorityList), Error> where NumberFor: BlockNumberOps, { @@ -183,37 +205,107 @@ impl WarpSyncProof { proof .justification .verify(current_set_id, ¤t_authorities) - .map_err(|err| HandleRequestError::InvalidProof(err.to_string()))?; + .map_err(|err| Error::InvalidProof(err.to_string()))?; if proof.justification.target().1 != proof.header.hash() { - return Err(HandleRequestError::InvalidProof( - "mismatch between header and justification".to_owned(), + return Err(Error::InvalidProof( + "Mismatch between header and justification".to_owned(), )) } if let Some(scheduled_change) = find_scheduled_change::(&proof.header) { current_authorities = scheduled_change.next_authorities; current_set_id += 1; - } else if fragment_num != self.proofs.len() - 1 { - // Only the last fragment of the proof is allowed to be missing the authority - // set change. - return Err(HandleRequestError::InvalidProof( + } else if fragment_num != self.proofs.len() - 1 || !self.is_finished { + // Only the last fragment of the last proof message is allowed to be missing + // the authority set change. + return Err(Error::InvalidProof( "Header is missing authority set change digest".to_string(), )) } } - Ok((current_set_id, current_authorities)) } } +/// Implements network API for warp sync. +pub struct NetworkProvider> +where + NumberFor: BlockNumberOps, +{ + backend: Arc, + authority_set: SharedAuthoritySet>, +} + +impl> NetworkProvider +where + NumberFor: BlockNumberOps, +{ + /// Create a new istance for a given backend and authority set. + pub fn new( + backend: Arc, + authority_set: SharedAuthoritySet>, + ) -> Self { + NetworkProvider { backend, authority_set } + } +} + +impl> WarpSyncProvider + for NetworkProvider +where + NumberFor: BlockNumberOps, +{ + fn generate( + &self, + start: Block::Hash, + ) -> Result> { + let proof = WarpSyncProof::::generate( + &*self.backend, + start, + &self.authority_set.authority_set_changes(), + ) + .map_err(Box::new)?; + Ok(EncodedProof(proof.encode())) + } + + fn verify( + &self, + proof: &EncodedProof, + set_id: SetId, + authorities: AuthorityList, + ) -> Result, Box> { + let EncodedProof(proof) = proof; + let proof = WarpSyncProof::::decode(&mut proof.as_slice()) + .map_err(|e| format!("Proof decoding error: {:?}", e))?; + let last_header = proof + .proofs + .last() + .map(|p| p.header.clone()) + .ok_or_else(|| "Empty proof".to_string())?; + let (next_set_id, next_authorities) = + proof.verify(set_id, authorities).map_err(Box::new)?; + if proof.is_finished { + Ok(VerificationResult::::Complete(next_set_id, next_authorities, last_header)) + } else { + Ok(VerificationResult::::Partial( + next_set_id, + next_authorities, + last_header.hash(), + )) + } + } + + fn current_authorities(&self) -> AuthorityList { + self.authority_set.inner().current_authorities.clone() + } +} + #[cfg(test)] mod tests { - use crate::WarpSyncProof; - use codec::Encode; + use super::{codec::Encode, WarpSyncProof}; + use crate::{AuthoritySetChanges, GrandpaJustification}; use rand::prelude::*; use sc_block_builder::BlockBuilderProvider; - use sc_finality_grandpa::{AuthoritySetChanges, GrandpaJustification}; use sp_blockchain::HeaderBackend; use sp_consensus::BlockOrigin; use sp_finality_grandpa::GRANDPA_ENGINE_ID; diff --git a/client/informant/src/display.rs b/client/informant/src/display.rs index 4e91c22f9efd7..76e21215c2457 100644 --- a/client/informant/src/display.rs +++ b/client/informant/src/display.rs @@ -40,6 +40,7 @@ use wasm_timer::Instant; /// /// Call `InformantDisplay::new` to initialize the state, then regularly call `display` with the /// information to display. +/// pub struct InformantDisplay { /// Head of chain block number from the last time `display` has been called. /// `None` if `display` has never been called. @@ -91,23 +92,36 @@ impl InformantDisplay { (diff_bytes_inbound, diff_bytes_outbound) }; - let (level, status, target) = - match (net_status.sync_state, net_status.best_seen_block, net_status.state_sync) { - (_, _, Some(state)) => ( - "⚙️ ", - "Downloading state".into(), - format!( - ", {}%, ({:.2}) Mib", - state.percentage, - (state.size as f32) / (1024f32 * 1024f32) - ), + let (level, status, target) = match ( + net_status.sync_state, + net_status.best_seen_block, + net_status.state_sync, + net_status.warp_sync, + ) { + (_, _, _, Some(warp)) => ( + "⏩", + "Warping".into(), + format!( + ", {}, ({:.2}) Mib", + warp.phase, + (warp.total_bytes as f32) / (1024f32 * 1024f32) + ), + ), + (_, _, Some(state), _) => ( + "⚙️ ", + "Downloading state".into(), + format!( + ", {}%, ({:.2}) Mib", + state.percentage, + (state.size as f32) / (1024f32 * 1024f32) ), - (SyncState::Idle, _, _) => ("💤", "Idle".into(), "".into()), - (SyncState::Downloading, None, _) => - ("⚙️ ", format!("Preparing{}", speed), "".into()), - (SyncState::Downloading, Some(n), None) => - ("⚙️ ", format!("Syncing{}", speed), format!(", target=#{}", n)), - }; + ), + (SyncState::Idle, _, _, _) => ("💤", "Idle".into(), "".into()), + (SyncState::Downloading, None, _, _) => + ("⚙️ ", format!("Preparing{}", speed), "".into()), + (SyncState::Downloading, Some(n), None, _) => + ("⚙️ ", format!("Syncing{}", speed), format!(", target=#{}", n)), + }; if self.format.enable_color { info!( diff --git a/client/informant/src/lib.rs b/client/informant/src/lib.rs index 6a91f583cd3df..c7c90a626a34a 100644 --- a/client/informant/src/lib.rs +++ b/client/informant/src/lib.rs @@ -21,7 +21,7 @@ use ansi_term::Colour; use futures::prelude::*; use futures_timer::Delay; -use log::{info, trace, warn}; +use log::{debug, info, trace}; use parity_util_mem::MallocSizeOf; use sc_client_api::{BlockchainEvents, UsageProvider}; use sc_network::NetworkService; @@ -143,7 +143,7 @@ where ancestor.hash, ), Ok(_) => {}, - Err(e) => warn!("Error computing tree route: {}", e), + Err(e) => debug!("Error computing tree route: {}", e), } } } diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 9c6b580fb9c66..a24b8fe5310a1 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -57,6 +57,7 @@ sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } sp-core = { version = "4.0.0-dev", path = "../../primitives/core" } sp-runtime = { version = "4.0.0-dev", path = "../../primitives/runtime" } sp-utils = { version = "4.0.0-dev", path = "../../primitives/utils" } +sp-finality-grandpa = { version = "4.0.0-dev", path = "../../primitives/finality-grandpa" } thiserror = "1" unsigned-varint = { version = "0.6.0", features = ["futures", "asynchronous_codec"] } void = "1.0.2" diff --git a/client/network/README.md b/client/network/README.md index 914720f53e2a9..c361bc9249f71 100644 --- a/client/network/README.md +++ b/client/network/README.md @@ -203,6 +203,69 @@ integer representing the role of the node: In the future, though, these restrictions will be removed. +# Sync + +The crate implements a number of syncing algorithms. The main purpose of the syncing algorithm is +get the chain to the latest state and keep it synced with the rest of the network by downloading and +importing new data as soon as it becomes available. Once the node starts it catches up with the network +with one of the initial sync methods listed below, and once it is completed uses a keep-up sync to +download new blocks. + +## Full and light sync + +This is the default syncing method for the initial and keep-up sync. The algorithm starts with the +current best block and downloads block data progressively from multiple peers if available. Once +there's a sequence of blocks ready to be imported they are fed to the import queue. Full nodes download +and execute full blocks, while light nodes only download and import headers. This continues until each peers +has no more new blocks to give. + +For each peer the sync maintains the number of our common best block with that peer. This number is updates +whenever peer announce new blocks or our best block advances. This allows to keep track of peers that have new +block data and request new information as soon as it is announced. In keep-up mode, we also track peers that +announce blocks on all branches and not just the best branch. The sync algorithm tries to be greedy and download +All data that's announced. + +## Fast sync + +In this mode the initial downloads and verifies full header history. This allows to validate +authority set transitions and arrive at a recent header. After header chain is verified and imported +the node starts downloading a state snapshot using the state request protocol. Each `StateRequest` +contains a starting storage key, which is empty for the first request. +`StateResponse` contains a storage proof for a sequence of keys and values in the storage +starting (but not including) from the key that is in the request. After iterating the proof trie against +the storage root that is in the target header, the node issues The next `StateRequest` with set starting +key set to the last key from the previous response. This continues until trie iteration reaches the end. +The state is then imported into the database and the keep-up sync starts in normal full/light sync mode. + +## Warp sync + +This is similar to fast sync, but instead of downloading and verifying full header chain, the algorithm +only downloads finalized authority set changes. + +### GRANDPA warp sync. + +GRANDPA keeps justifications for each finalized authority set change. Each change is signed by the +authorities from the previous set. By downloading and verifying these signed hand-offs starting from genesis, +we arrive at a recent header faster than downloading full header chain. Each `WarpSyncRequest` contains a block +hash to a to start collecting proofs from. `WarpSyncResponse` contains a sequence of block headers and +justifications. The proof downloader checks the justifications and continues requesting proofs from the last +header hash, until it arrives at some recent header. + +Once the finality chain is proved for a header, the state matching the header is downloaded much like during +the fast sync. The state is verified to match the header storage root. After the state is imported into the +database it is queried for the information that allows GRANDPA and BABE to continue operating from that state. +This includes BABE epoch information and GRANDPA authority set id. + +### Background block download. + +After the latest state has been imported the node is fully operational, but is still missing historic block +data. I.e. it is unable to serve bock bodies and headers other than the most recent one. To make sure all +nodes have block history available, a background sync process is started that downloads all the missing blocks. +It is run in parallel with the keep-up sync and does not interfere with downloading of the recent blocks. +During this download we also import GRANPA justifications for blocks with authority set changes, so that +The warp-synced node has all the data to serve for other nodes nodes that might want to sync from it with +any method. + # Usage Using the `sc-network` crate is done through the [`NetworkWorker`] struct. Create this diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 73d5ec357b2c7..c181ee4e63396 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -26,6 +26,7 @@ use crate::{ }; use bytes::Bytes; +use codec::Encode; use futures::{channel::oneshot, stream::StreamExt}; use libp2p::{ core::{Multiaddr, PeerId, PublicKey}, @@ -87,6 +88,11 @@ pub struct Behaviour { /// [`request_responses::RequestResponsesBehaviour`]. #[behaviour(ignore)] state_request_protocol_name: String, + + /// Protocol name used to send out warp sync requests via + /// [`request_responses::RequestResponsesBehaviour`]. + #[behaviour(ignore)] + warp_sync_protocol_name: Option, } /// Event generated by `Behaviour`. @@ -195,6 +201,7 @@ impl Behaviour { disco_config: DiscoveryConfig, block_request_protocol_config: request_responses::ProtocolConfig, state_request_protocol_config: request_responses::ProtocolConfig, + warp_sync_protocol_config: Option, bitswap: Option>, light_client_request_protocol_config: request_responses::ProtocolConfig, // All remaining request protocol configs. @@ -203,9 +210,16 @@ impl Behaviour { // Extract protocol name and add to `request_response_protocols`. let block_request_protocol_name = block_request_protocol_config.name.to_string(); let state_request_protocol_name = state_request_protocol_config.name.to_string(); + let warp_sync_protocol_name = match warp_sync_protocol_config { + Some(config) => { + let name = config.name.to_string(); + request_response_protocols.push(config); + Some(name) + }, + None => None, + }; request_response_protocols.push(block_request_protocol_config); request_response_protocols.push(state_request_protocol_config); - request_response_protocols.push(light_client_request_protocol_config); Ok(Behaviour { @@ -220,6 +234,7 @@ impl Behaviour { events: VecDeque::new(), block_request_protocol_name, state_request_protocol_name, + warp_sync_protocol_name, }) } @@ -368,6 +383,24 @@ impl NetworkBehaviourEventProcess> for Behavi IfDisconnected::ImmediateError, ); }, + CustomMessageOutcome::WarpSyncRequest { target, request, pending_response } => + match &self.warp_sync_protocol_name { + Some(name) => self.request_responses.send_request( + &target, + name, + request.encode(), + pending_response, + IfDisconnected::ImmediateError, + ), + None => { + log::warn!( + target: "sync", + "Trying to send warp sync request when no protocol is configured {:?}", + request, + ); + return + }, + }, CustomMessageOutcome::NotificationStreamOpened { remote, protocol, diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 2581a08d42460..dd60f329128fb 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -27,6 +27,7 @@ pub use crate::{ request_responses::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, + warp_request_handler::WarpSyncProvider, }; pub use libp2p::{build_multiaddr, core::PublicKey, identity, wasm_ext::ExtTransport}; @@ -137,6 +138,9 @@ pub struct Params { /// [`crate::state_request_handler::StateRequestHandler::new`] allowing /// both outgoing and incoming requests. pub state_request_protocol_config: RequestResponseConfig, + + /// Optional warp sync protocol support. Include protocol config and sync provider. + pub warp_sync: Option<(Arc>, RequestResponseConfig)>, } /// Role of the local node. @@ -268,6 +272,7 @@ impl fmt::Debug for ProtocolId { /// assert_eq!(peer_id, "QmSk5HQbn6LhUwDiNMseVUjuRYhEtYj4aUZ6WfWoGURpdV".parse::().unwrap()); /// assert_eq!(addr, "/ip4/198.51.100.19/tcp/30333".parse::().unwrap()); /// ``` +/// pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> { let addr: Multiaddr = addr_str.parse()?; parse_addr(addr) @@ -391,6 +396,8 @@ pub enum SyncMode { /// Download indexed transactions for recent blocks. storage_chain_mode: bool, }, + /// Warp sync - verify authority set transitions and the latest state. + Warp, } impl Default for SyncMode { diff --git a/client/network/src/gossip/tests.rs b/client/network/src/gossip/tests.rs index f4f96b863d624..88c4160bc5066 100644 --- a/client/network/src/gossip/tests.rs +++ b/client/network/src/gossip/tests.rs @@ -53,10 +53,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration) impl sc_consensus::Verifier for PassThroughVerifier { async fn verify( &mut self, - origin: sp_consensus::BlockOrigin, - header: B::Header, - justifications: Option, - body: Option>, + mut block: sp_consensus::BlockImportParams, ) -> Result< ( sc_consensus::BlockImportParams, @@ -64,7 +61,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration) ), String, > { - let maybe_keys = header + let maybe_keys = block.header .digest() .log(|l| { l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"aura")) @@ -79,12 +76,9 @@ fn build_test_full_node(network_config: config::NetworkConfiguration) )] }); - let mut import = sc_consensus::BlockImportParams::new(origin, header); - import.body = body; - import.finalized = self.0; - import.justifications = justifications; - import.fork_choice = Some(sc_consensus::ForkChoiceStrategy::LongestChain); - Ok((import, maybe_keys)) + block.finalized = self.0; + block.fork_choice = Some(sc_consensus::ForkChoiceStrategy::LongestChain); + Ok((block, maybe_keys)) } } @@ -144,6 +138,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration) block_request_protocol_config, state_request_protocol_config, light_client_request_protocol_config, + warp_sync: None, }) .unwrap(); diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index c812390ec6a65..633baaca47aab 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -243,6 +243,7 @@ //! - Calling `trigger_repropagate` when a transaction is added to the pool. //! //! More precise usage details are still being worked on and will likely change in the future. +//! mod behaviour; mod chain; @@ -264,12 +265,13 @@ pub mod light_client_requests; pub mod network_state; pub mod state_request_handler; pub mod transactions; +pub mod warp_request_handler; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; pub use protocol::{ event::{DhtEvent, Event, ObservedRole}, - sync::{StateDownloadProgress, SyncState}, + sync::{StateDownloadProgress, SyncState, WarpSyncPhase, WarpSyncProgress}, PeerInfo, }; pub use service::{ @@ -326,4 +328,6 @@ pub struct NetworkStatus { pub total_bytes_outbound: u64, /// State sync in progress. pub state_sync: Option, + /// Warp sync in progress. + pub warp_sync: Option, } diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 2af33cd1c5a15..a5675dbdc34d6 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -18,11 +18,12 @@ use crate::{ chain::Client, - config::{self, ProtocolId}, + config::{self, ProtocolId, WarpSyncProvider}, error, request_responses::RequestFailure, schema::v1::StateResponse, utils::{interval, LruHashSet}, + warp_request_handler::EncodedProof, }; use bytes::Bytes; @@ -196,6 +197,7 @@ pub struct Protocol { enum PeerRequest { Block(message::BlockRequest), State, + WarpProof, } /// Peer information @@ -239,6 +241,7 @@ impl ProtocolConfig { config::SyncMode::Full => sync::SyncMode::Full, config::SyncMode::Fast { skip_proofs, storage_chain_mode } => sync::SyncMode::LightState { skip_proofs, storage_chain_mode }, + config::SyncMode::Warp => sync::SyncMode::Warp, } } } @@ -293,6 +296,7 @@ impl Protocol { notifications_protocols_handshakes: Vec>, block_announce_validator: Box + Send>, metrics_registry: Option<&Registry>, + warp_sync_provider: Option>>, ) -> error::Result<(Protocol, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let info = chain.info(); let sync = ChainSync::new( @@ -300,6 +304,7 @@ impl Protocol { chain.clone(), block_announce_validator, config.max_parallel_downloads, + warp_sync_provider, ) .map_err(Box::new)?; @@ -724,6 +729,26 @@ impl Protocol { } } + /// Must be called in response to a [`CustomMessageOutcome::WarpSyncRequest`] being emitted. + /// Must contain the same `PeerId` and request that have been emitted. + pub fn on_warp_sync_response( + &mut self, + peer_id: PeerId, + response: crate::warp_request_handler::EncodedProof, + ) -> CustomMessageOutcome { + match self.sync.on_warp_sync_data(&peer_id, response) { + Ok(sync::OnWarpSyncData::WarpProofRequest(peer, req)) => + prepare_warp_sync_request::(&mut self.peers, peer, req), + Ok(sync::OnWarpSyncData::StateRequest(peer, req)) => + prepare_state_request::(&mut self.peers, peer, req), + Err(sync::BadPeer(id, repu)) => { + self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); + self.peerset_handle.report_peer(id, repu); + CustomMessageOutcome::None + }, + } + } + /// Perform time based maintenance. /// /// > **Note**: This method normally doesn't have to be called except for testing purposes. @@ -1248,6 +1273,19 @@ fn prepare_state_request( CustomMessageOutcome::StateRequest { target: who, request, pending_response: tx } } +fn prepare_warp_sync_request( + peers: &mut HashMap>, + who: PeerId, + request: crate::warp_request_handler::Request, +) -> CustomMessageOutcome { + let (tx, rx) = oneshot::channel(); + + if let Some(ref mut peer) = peers.get_mut(&who) { + peer.request = Some((PeerRequest::WarpProof, rx)); + } + CustomMessageOutcome::WarpSyncRequest { target: who, request, pending_response: tx } +} + /// Outcome of an incoming custom message. #[derive(Debug)] #[must_use] @@ -1291,6 +1329,12 @@ pub enum CustomMessageOutcome { request: crate::schema::v1::StateRequest, pending_response: oneshot::Sender, RequestFailure>>, }, + /// A new warp sync request must be emitted. + WarpSyncRequest { + target: PeerId, + request: crate::warp_request_handler::Request, + pending_response: oneshot::Sender, RequestFailure>>, + }, /// Peer has a reported a new head of chain. PeerNewBest(PeerId, NumberFor), /// Now connected to a new peer for syncing purposes. @@ -1364,6 +1408,7 @@ impl NetworkBehaviour for Protocol { // Check for finished outgoing requests. let mut finished_block_requests = Vec::new(); let mut finished_state_requests = Vec::new(); + let mut finished_warp_sync_requests = Vec::new(); for (id, peer) in self.peers.iter_mut() { if let Peer { request: Some((_, pending_response)), .. } = peer { match pending_response.poll_unpin(cx) { @@ -1412,6 +1457,9 @@ impl NetworkBehaviour for Protocol { finished_state_requests.push((id.clone(), protobuf_response)); }, + PeerRequest::WarpProof => { + finished_warp_sync_requests.push((id.clone(), resp)); + }, } }, Poll::Ready(Ok(Err(e))) => { @@ -1474,6 +1522,10 @@ impl NetworkBehaviour for Protocol { let ev = self.on_state_response(id, protobuf_response); self.pending_messages.push_back(ev); } + for (id, response) in finished_warp_sync_requests { + let ev = self.on_warp_sync_response(id, EncodedProof(response)); + self.pending_messages.push_back(ev); + } while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) { self.tick(); @@ -1491,6 +1543,10 @@ impl NetworkBehaviour for Protocol { let event = prepare_block_request(&mut self.peers, id, request); self.pending_messages.push_back(event); } + if let Some((id, request)) = self.sync.warp_sync_request() { + let event = prepare_warp_sync_request(&mut self.peers, id, request); + self.pending_messages.push_back(event); + } // Check if there is any block announcement validation finished. while let Poll::Ready(result) = self.sync.poll_block_announce_validation(cx) { diff --git a/client/network/src/protocol/sync.rs b/client/network/src/protocol/sync.rs index 8918d7adde097..e9bf14a623b60 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/src/protocol/sync.rs @@ -27,6 +27,7 @@ //! The `ChainSync` struct maintains the state of the block requests. Whenever something happens on //! the network, or whenever a block has been successfully verified, call the appropriate method in //! order to update it. +//! use crate::{ protocol::message::{self, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse}, @@ -62,10 +63,12 @@ use std::{ pin::Pin, sync::Arc, }; +use warp::{WarpProofRequest, WarpSync, WarpSyncProvider}; mod blocks; mod extra_requests; mod state; +mod warp; /// Maximum blocks to request in a single packet. const MAX_BLOCKS_TO_REQUEST: usize = 128; @@ -101,6 +104,9 @@ const STATE_SYNC_FINALITY_THRESHOLD: u32 = 8; /// so far behind. const MAJOR_SYNC_BLOCKS: u8 = 5; +/// Number of peers that need to be connected before warp sync is started. +const MIN_PEERS_TO_START_WARP_SYNC: usize = 3; + mod rep { use sc_peerset::ReputationChange as Rep; /// Reputation change when a peer sent us a message that led to a @@ -217,6 +223,10 @@ pub struct ChainSync { block_announce_validation_per_peer_stats: HashMap, /// State sync in progress, if any. state_sync: Option>, + /// Warp sync in progress, if any. + warp_sync: Option>, + /// Warp sync provider. + warp_sync_provider: Option>>, /// Enable importing existing blocks. This is used used after the state download to /// catch up to the latest state while re-importing blocks. import_existing: bool, @@ -290,6 +300,8 @@ pub enum PeerSyncState { DownloadingJustification(B::Hash), /// Downloading state. DownloadingState, + /// Downloading warp proof. + DownloadingWarpProof, } impl PeerSyncState { @@ -316,6 +328,39 @@ pub struct StateDownloadProgress { pub size: u64, } +/// Reported warp sync phase. +#[derive(Clone, Eq, PartialEq, Debug)] +pub enum WarpSyncPhase { + /// Waiting for peers to connect. + AwaitingPeers, + /// Downloading and verifying grandpa warp proofs. + DownloadingWarpProofs, + /// Downloading state data. + DownloadingState, + /// Importing state. + ImportingState, +} + +impl fmt::Display for WarpSyncPhase { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + WarpSyncPhase::AwaitingPeers => write!(f, "Waiting for peers"), + WarpSyncPhase::DownloadingWarpProofs => write!(f, "Downloading finality proofs"), + WarpSyncPhase::DownloadingState => write!(f, "Downloading state"), + WarpSyncPhase::ImportingState => write!(f, "Importing state"), + } + } +} + +/// Reported warp sync progress. +#[derive(Clone, Eq, PartialEq, Debug)] +pub struct WarpSyncProgress { + /// Estimated download percentage. + pub phase: WarpSyncPhase, + /// Total bytes downloaded so far. + pub total_bytes: u64, +} + /// Syncing status and statistics. #[derive(Clone)] pub struct Status { @@ -329,6 +374,8 @@ pub struct Status { pub queued_blocks: u32, /// State sync status in progress, if any. pub state_sync: Option, + /// Warp sync in progress, if any. + pub warp_sync: Option, } /// A peer did not behave as expected and should be reported. @@ -373,6 +420,15 @@ pub enum OnStateData { Request(PeerId, StateRequest), } +/// Result of [`ChainSync::on_warp_sync_data`]. +#[derive(Debug)] +pub enum OnWarpSyncData { + /// Warp proof request is issued. + WarpProofRequest(PeerId, warp::WarpProofRequest), + /// A new state request needs to be made to the given peer. + StateRequest(PeerId, StateRequest), +} + /// Result of [`ChainSync::poll_block_announce_validation`]. #[derive(Debug, Clone, PartialEq, Eq)] pub enum PollBlockAnnounceValidation { @@ -460,6 +516,8 @@ pub enum SyncMode { Full, // Sync headers and the last finalied state LightState { storage_chain_mode: bool, skip_proofs: bool }, + // Warp sync mode. + Warp, } /// Result of [`ChainSync::has_slot_for_block_announce_validation`]. @@ -479,6 +537,7 @@ impl ChainSync { client: Arc>, block_announce_validator: Box + Send>, max_parallel_downloads: u32, + warp_sync_provider: Option>>, ) -> Result { let mut sync = ChainSync { client, @@ -497,6 +556,8 @@ impl ChainSync { block_announce_validation: Default::default(), block_announce_validation_per_peer_stats: Default::default(), state_sync: None, + warp_sync: None, + warp_sync_provider, import_existing: false, }; sync.reset_sync_start_point()?; @@ -508,7 +569,7 @@ impl ChainSync { SyncMode::Full => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, SyncMode::Light => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION, - SyncMode::LightState { storage_chain_mode: false, .. } => + SyncMode::LightState { storage_chain_mode: false, .. } | SyncMode::Warp => BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION | BlockAttributes::BODY, SyncMode::LightState { storage_chain_mode: true, .. } => BlockAttributes::HEADER | @@ -522,6 +583,7 @@ impl ChainSync { SyncMode::Full => false, SyncMode::Light => true, SyncMode::LightState { .. } => true, + SyncMode::Warp => true, } } @@ -550,12 +612,20 @@ impl ChainSync { SyncState::Idle }; + let warp_sync_progress = match (&self.warp_sync, &self.mode) { + (None, SyncMode::Warp) => + Some(WarpSyncProgress { phase: WarpSyncPhase::AwaitingPeers, total_bytes: 0 }), + (Some(sync), _) => Some(sync.progress()), + _ => None, + }; + Status { state: sync_state, best_seen_block: best_seen, num_peers: self.peers.len() as u32, queued_blocks: self.queue_blocks.len() as u32, state_sync: self.state_sync.as_ref().map(|s| s.progress()), + warp_sync: warp_sync_progress, } } @@ -620,6 +690,17 @@ impl ChainSync { return Ok(None) } + if let SyncMode::Warp = &self.mode { + if self.peers.len() >= MIN_PEERS_TO_START_WARP_SYNC && self.warp_sync.is_none() + { + log::debug!(target: "sync", "Starting warp state sync."); + if let Some(provider) = &self.warp_sync_provider { + self.warp_sync = + Some(WarpSync::new(self.client.clone(), provider.clone())); + } + } + } + // If we are at genesis, just start downloading. let (state, req) = if self.best_queued_number.is_zero() { debug!( @@ -792,7 +873,8 @@ impl ChainSync { /// Get an iterator over all block requests of all peers. pub fn block_requests(&mut self) -> impl Iterator)> + '_ { - if self.pending_requests.is_empty() || self.state_sync.is_some() { + if self.pending_requests.is_empty() || self.state_sync.is_some() || self.warp_sync.is_some() + { return Either::Left(std::iter::empty()) } if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS { @@ -876,16 +958,16 @@ impl ChainSync { Either::Right(iter) } - /// Get a state request, if any + /// Get a state request, if any. pub fn state_request(&mut self) -> Option<(PeerId, StateRequest)> { + if self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) { + // Only one pending state request is allowed. + return None + } if let Some(sync) = &self.state_sync { if sync.is_complete() { return None } - if self.peers.iter().any(|(_, peer)| peer.state == PeerSyncState::DownloadingState) { - // Only one pending state request is allowed. - return None - } for (id, peer) in self.peers.iter_mut() { if peer.state.is_available() && peer.common_number >= sync.target_block_num() { trace!(target: "sync", "New StateRequest for {}", id); @@ -895,6 +977,55 @@ impl ChainSync { } } } + if let Some(sync) = &self.warp_sync { + if sync.is_complete() { + return None + } + if let (Some(request), Some(target)) = + (sync.next_state_request(), sync.target_block_number()) + { + for (id, peer) in self.peers.iter_mut() { + if peer.state.is_available() && peer.best_number >= target { + trace!(target: "sync", "New StateRequest for {}", id); + peer.state = PeerSyncState::DownloadingState; + return Some((id.clone(), request)) + } + } + } + } + None + } + + /// Get a warp sync request, if any. + pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest)> { + if self + .peers + .iter() + .any(|(_, peer)| peer.state == PeerSyncState::DownloadingWarpProof) + { + // Only one pending state request is allowed. + return None + } + if let Some(sync) = &self.warp_sync { + if sync.is_complete() { + return None + } + if let Some(request) = sync.next_warp_poof_request() { + let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect(); + if !targets.is_empty() { + targets.sort(); + let median = targets[targets.len() / 2]; + // Find a random peer that is synced as much as peer majority. + for (id, peer) in self.peers.iter_mut() { + if peer.state.is_available() && peer.best_number >= median { + trace!(target: "sync", "New WarpProofRequest for {}", id); + peer.state = PeerSyncState::DownloadingWarpProof; + return Some((id.clone(), request)) + } + } + } + } + } None } @@ -1055,7 +1186,8 @@ impl ChainSync { }, PeerSyncState::Available | PeerSyncState::DownloadingJustification(..) | - PeerSyncState::DownloadingState => Vec::new(), + PeerSyncState::DownloadingState | + PeerSyncState::DownloadingWarpProof => Vec::new(), } } else { // When request.is_none() this is a block announcement. Just accept blocks. @@ -1105,6 +1237,15 @@ impl ChainSync { response.proof.len(), ); sync.import(response) + } else if let Some(sync) = &mut self.warp_sync { + debug!( + target: "sync", + "Importing state data from {} with {} keys, {} proof nodes.", + who, + response.entries.len(), + response.proof.len(), + ); + sync.import_state(response) } else { debug!(target: "sync", "Ignored obsolete state response from {}", who); return Err(BadPeer(who.clone(), rep::NOT_REQUESTED)) @@ -1112,12 +1253,7 @@ impl ChainSync { match import_result { state::ImportResult::Import(hash, header, state) => { - let origin = if self.status().state != SyncState::Downloading { - BlockOrigin::NetworkBroadcast - } else { - BlockOrigin::NetworkInitialSync - }; - + let origin = BlockOrigin::NetworkInitialSync; let block = IncomingBlock { hash, header: Some(header), @@ -1142,6 +1278,39 @@ impl ChainSync { } } + /// Handle a response from the remote to a warp proof request that we made. + /// + /// Returns next request. + pub fn on_warp_sync_data( + &mut self, + who: &PeerId, + response: warp::EncodedProof, + ) -> Result, BadPeer> { + let import_result = if let Some(sync) = &mut self.warp_sync { + debug!( + target: "sync", + "Importing warp proof data from {}, {} bytes.", + who, + response.0.len(), + ); + sync.import_warp_proof(response) + } else { + debug!(target: "sync", "Ignored obsolete warp sync response from {}", who); + return Err(BadPeer(who.clone(), rep::NOT_REQUESTED)) + }; + + match import_result { + warp::WarpProofImportResult::StateRequest(request) => + Ok(OnWarpSyncData::StateRequest(who.clone(), request)), + warp::WarpProofImportResult::WarpProofRequest(request) => + Ok(OnWarpSyncData::WarpProofRequest(who.clone(), request)), + warp::WarpProofImportResult::BadResponse => { + debug!(target: "sync", "Bad proof data received from {}", who); + Err(BadPeer(who.clone(), rep::BAD_BLOCK)) + }, + } + } + fn validate_and_queue_blocks( &mut self, mut new_blocks: Vec>, @@ -1308,6 +1477,20 @@ impl ChainSync { self.mode = SyncMode::Full; output.extend(self.restart()); } + let warp_sync_complete = self + .warp_sync + .as_ref() + .map_or(false, |s| s.target_block_hash() == Some(hash)); + if warp_sync_complete { + info!( + target: "sync", + "Warp sync is complete ({} MiB), restarting block sync.", + self.warp_sync.as_ref().map_or(0, |s| s.progress().total_bytes / (1024 * 1024)), + ); + self.warp_sync = None; + self.mode = SyncMode::Full; + output.extend(self.restart()); + } }, Err(BlockImportError::IncompleteHeader(who)) => if let Some(peer) = who { @@ -1349,6 +1532,7 @@ impl ChainSync { e @ Err(BlockImportError::UnknownParent) | e @ Err(BlockImportError::Other(_)) => { warn!(target: "sync", "💔 Error importing block {:?}: {:?}", hash, e); self.state_sync = None; + self.warp_sync = None; output.extend(self.restart()); }, Err(BlockImportError::Cancelled) => {}, @@ -1828,6 +2012,13 @@ impl ChainSync { ); self.mode = SyncMode::Full; } + if matches!(self.mode, SyncMode::Warp) && info.finalized_state.is_some() { + log::warn!( + target: "sync", + "Can't use warp sync mode with a partially synced database. Reverting to full sync mode." + ); + self.mode = SyncMode::Full; + } self.import_existing = false; self.best_queued_hash = info.best_hash; self.best_queued_number = info.best_number; @@ -2253,7 +2444,8 @@ mod test { let peer_id = PeerId::random(); let mut sync = - ChainSync::new(SyncMode::Full, client.clone(), block_announce_validator, 1).unwrap(); + ChainSync::new(SyncMode::Full, client.clone(), block_announce_validator, 1, None) + .unwrap(); let (a1_hash, a1_number) = { let a1 = client.new_block(Default::default()).unwrap().build().unwrap().block; @@ -2307,6 +2499,7 @@ mod test { client.clone(), Box::new(DefaultBlockAnnounceValidator), 1, + None, ) .unwrap(); @@ -2470,6 +2663,7 @@ mod test { client.clone(), Box::new(DefaultBlockAnnounceValidator), 5, + None, ) .unwrap(); @@ -2584,6 +2778,7 @@ mod test { client.clone(), Box::new(DefaultBlockAnnounceValidator), 5, + None, ) .unwrap(); @@ -2707,6 +2902,7 @@ mod test { client.clone(), Box::new(DefaultBlockAnnounceValidator), 5, + None, ) .unwrap(); @@ -2814,6 +3010,7 @@ mod test { client.clone(), Box::new(DefaultBlockAnnounceValidator), 1, + None, ) .unwrap(); diff --git a/client/network/src/protocol/sync/warp.rs b/client/network/src/protocol/sync/warp.rs new file mode 100644 index 0000000000000..fae0e2f5452a7 --- /dev/null +++ b/client/network/src/protocol/sync/warp.rs @@ -0,0 +1,181 @@ +// This file is part of Substrate. + +// Copyright (C) 2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +///! Warp sync support. +pub use super::state::ImportResult; +use super::state::StateSync; +pub use crate::warp_request_handler::{ + EncodedProof, Request as WarpProofRequest, VerificationResult, WarpSyncProvider, +}; +use crate::{ + chain::Client, + schema::v1::{StateRequest, StateResponse}, + WarpSyncPhase, WarpSyncProgress, +}; +use sp_finality_grandpa::{AuthorityList, SetId}; +use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; +use std::sync::Arc; + +enum Phase { + WarpProof { set_id: SetId, authorities: AuthorityList, last_hash: B::Hash }, + State(StateSync), +} + +/// Import warp proof result. +pub enum WarpProofImportResult { + /// Start downloading state data. + StateRequest(StateRequest), + /// Continue dowloading warp sync proofs. + WarpProofRequest(WarpProofRequest), + /// Bad proof. + BadResponse, +} + +/// Warp sync state machine. Accumulates warp proofs and state. +pub struct WarpSync { + phase: Phase, + client: Arc>, + warp_sync_provider: Arc>, + total_proof_bytes: u64, +} + +impl WarpSync { + /// Create a new instance. + pub fn new( + client: Arc>, + warp_sync_provider: Arc>, + ) -> Self { + let last_hash = client.hash(Zero::zero()).unwrap().expect("Genesis header always exists"); + let phase = Phase::WarpProof { + set_id: 0, + authorities: warp_sync_provider.current_authorities(), + last_hash, + }; + WarpSync { client, warp_sync_provider, phase, total_proof_bytes: 0 } + } + + /// Validate and import a state reponse. + pub fn import_state(&mut self, response: StateResponse) -> ImportResult { + match &mut self.phase { + Phase::WarpProof { .. } => { + log::debug!(target: "sync", "Unexpected state response"); + return ImportResult::BadResponse + }, + Phase::State(sync) => sync.import(response), + } + } + + /// Validate and import a warp proof reponse. + pub fn import_warp_proof(&mut self, response: EncodedProof) -> WarpProofImportResult { + match &mut self.phase { + Phase::State(_) => { + log::debug!(target: "sync", "Unexpected warp proof response"); + WarpProofImportResult::BadResponse + }, + Phase::WarpProof { set_id, authorities, last_hash } => { + match self.warp_sync_provider.verify( + &response, + *set_id, + std::mem::take(authorities), + ) { + Err(e) => { + log::debug!(target: "sync", "Bad warp proof response: {:?}", e); + return WarpProofImportResult::BadResponse + }, + Ok(VerificationResult::Partial(new_set_id, new_authorities, new_last_hash)) => { + log::debug!(target: "sync", "Verified partial proof, set_id={:?}", new_set_id); + *set_id = new_set_id; + *authorities = new_authorities; + *last_hash = new_last_hash.clone(); + self.total_proof_bytes += response.0.len() as u64; + WarpProofImportResult::WarpProofRequest(WarpProofRequest { + begin: new_last_hash, + }) + }, + Ok(VerificationResult::Complete(new_set_id, _, header)) => { + log::debug!(target: "sync", "Verified complete proof, set_id={:?}", new_set_id); + self.total_proof_bytes += response.0.len() as u64; + let state_sync = StateSync::new(self.client.clone(), header, false); + let request = state_sync.next_request(); + self.phase = Phase::State(state_sync); + WarpProofImportResult::StateRequest(request) + }, + } + }, + } + } + + /// Produce next state request. + pub fn next_state_request(&self) -> Option { + match &self.phase { + Phase::WarpProof { .. } => None, + Phase::State(sync) => Some(sync.next_request()), + } + } + + /// Produce next warp proof request. + pub fn next_warp_poof_request(&self) -> Option> { + match &self.phase { + Phase::State(_) => None, + Phase::WarpProof { last_hash, .. } => + Some(WarpProofRequest { begin: last_hash.clone() }), + } + } + + /// Return target block hash if it is known. + pub fn target_block_hash(&self) -> Option { + match &self.phase { + Phase::State(s) => Some(s.target()), + Phase::WarpProof { .. } => None, + } + } + + /// Return target block number if it is known. + pub fn target_block_number(&self) -> Option> { + match &self.phase { + Phase::State(s) => Some(s.target_block_num()), + Phase::WarpProof { .. } => None, + } + } + + /// Check if the state is complete. + pub fn is_complete(&self) -> bool { + match &self.phase { + Phase::WarpProof { .. } => false, + Phase::State(sync) => sync.is_complete(), + } + } + + /// Returns state sync estimated progress (percentage, bytes) + pub fn progress(&self) -> WarpSyncProgress { + match &self.phase { + Phase::WarpProof { .. } => WarpSyncProgress { + phase: WarpSyncPhase::DownloadingWarpProofs, + total_bytes: self.total_proof_bytes, + }, + Phase::State(sync) => WarpSyncProgress { + phase: if self.is_complete() { + WarpSyncPhase::ImportingState + } else { + WarpSyncPhase::DownloadingState + }, + total_bytes: self.total_proof_bytes + sync.progress().size, + }, + } + } +} diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 83cf2d675823a..31d4488bc9aac 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -186,6 +186,12 @@ impl NetworkWorker { ); let default_notif_handshake_message = Roles::from(¶ms.role).encode(); + + let (warp_sync_provider, warp_sync_protocol_config) = match params.warp_sync { + Some((p, c)) => (Some(p), Some(c)), + None => (None, None), + }; + let (protocol, peerset_handle, mut known_addresses) = Protocol::new( protocol::ProtocolConfig { roles: From::from(¶ms.role), @@ -203,6 +209,7 @@ impl NetworkWorker { .collect(), params.block_announce_validator, params.metrics_registry.as_ref(), + warp_sync_provider, )?; // List of multiaddresses that we know in the network. @@ -346,6 +353,7 @@ impl NetworkWorker { discovery_config, params.block_request_protocol_config, params.state_request_protocol_config, + warp_sync_protocol_config, bitswap, params.light_client_request_protocol_config, params.network_config.request_response_protocols, @@ -461,6 +469,7 @@ impl NetworkWorker { total_bytes_inbound: self.total_bytes_inbound(), total_bytes_outbound: self.total_bytes_outbound(), state_sync: status.state_sync, + warp_sync: status.warp_sync, } } diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index a149b09a22ddc..8cad044636c2c 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -50,10 +50,7 @@ fn build_test_full_node( impl sc_consensus::Verifier for PassThroughVerifier { async fn verify( &mut self, - origin: sp_consensus::BlockOrigin, - header: B::Header, - justifications: Option, - body: Option>, + mut block: sc_consensus::BlockImportParams, ) -> Result< ( sc_consensus::BlockImportParams, @@ -61,7 +58,8 @@ fn build_test_full_node( ), String, > { - let maybe_keys = header + let maybe_keys = block + .header .digest() .log(|l| { l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"aura")) @@ -75,12 +73,9 @@ fn build_test_full_node( vec![(sp_blockchain::well_known_cache_keys::AUTHORITIES, blob.to_vec())] }); - let mut import = sc_consensus::BlockImportParams::new(origin, header); - import.body = body; - import.finalized = self.0; - import.justifications = justifications; - import.fork_choice = Some(sc_consensus::ForkChoiceStrategy::LongestChain); - Ok((import, maybe_keys)) + block.finalized = self.0; + block.fork_choice = Some(sc_consensus::ForkChoiceStrategy::LongestChain); + Ok((block, maybe_keys)) } } @@ -132,6 +127,7 @@ fn build_test_full_node( block_request_protocol_config, state_request_protocol_config, light_client_request_protocol_config, + warp_sync: None, }) .unwrap(); diff --git a/client/finality-grandpa-warp-sync/src/lib.rs b/client/network/src/warp_request_handler.rs similarity index 51% rename from client/finality-grandpa-warp-sync/src/lib.rs rename to client/network/src/warp_request_handler.rs index c74c4d15f9f45..beb9d1ce528a8 100644 --- a/client/finality-grandpa-warp-sync/src/lib.rs +++ b/client/network/src/warp_request_handler.rs @@ -16,58 +16,61 @@ //! Helper for handling (i.e. answering) grandpa warp sync requests from a remote peer. +use crate::config::{IncomingRequest, OutgoingResponse, ProtocolId, RequestResponseConfig}; use codec::{Decode, Encode}; use futures::{ channel::{mpsc, oneshot}, stream::StreamExt, }; use log::debug; -use sc_client_api::Backend; -use sc_finality_grandpa::SharedAuthoritySet; -use sc_network::config::{IncomingRequest, OutgoingResponse, ProtocolId, RequestResponseConfig}; -use sc_service::{ - config::{Configuration, Role}, - SpawnTaskHandle, -}; -use sp_runtime::traits::{Block as BlockT, NumberFor}; +use sp_finality_grandpa::{AuthorityList, SetId}; +use sp_runtime::traits::Block as BlockT; use std::{sync::Arc, time::Duration}; -mod proof; - -pub use proof::{WarpSyncFragment, WarpSyncProof}; - -/// Generates the appropriate [`RequestResponseConfig`] for a given chain configuration. -pub fn request_response_config_for_chain + 'static>( - config: &Configuration, - spawn_handle: SpawnTaskHandle, - backend: Arc, - authority_set: SharedAuthoritySet>, -) -> RequestResponseConfig -where - NumberFor: sc_finality_grandpa::BlockNumberOps, -{ - let protocol_id = config.protocol_id(); - - if matches!(config.role, Role::Light) { - // Allow outgoing requests but deny incoming requests. - generate_request_response_config(protocol_id.clone()) - } else { - // Allow both outgoing and incoming requests. - let (handler, request_response_config) = - GrandpaWarpSyncRequestHandler::new(protocol_id.clone(), backend.clone(), authority_set); - spawn_handle.spawn("grandpa-warp-sync", handler.run()); - request_response_config - } +/// Scale-encoded warp sync proof response. +pub struct EncodedProof(pub Vec); + +/// Warp sync request +#[derive(Encode, Decode, Debug)] +pub struct Request { + /// Start collecting proofs from this block. + pub begin: B::Hash, +} + +const MAX_RESPONSE_SIZE: u64 = 16 * 1024 * 1024; + +/// Proof verification result. +pub enum VerificationResult { + /// Proof is valid, but the target was not reached. + Partial(SetId, AuthorityList, Block::Hash), + /// Target finality is proved. + Complete(SetId, AuthorityList, Block::Header), } -const LOG_TARGET: &str = "finality-grandpa-warp-sync-request-handler"; +/// Warp sync backend. Handles retrieveing and verifying warp sync proofs. +pub trait WarpSyncProvider: Send + Sync { + /// Generate proof starting at given block hash. The proof is accumulated until maximum proof size is reached. + fn generate( + &self, + start: B::Hash, + ) -> Result>; + /// Verify warp proof agains current set of authorities. + fn verify( + &self, + proof: &EncodedProof, + set_id: SetId, + authorities: AuthorityList, + ) -> Result, Box>; + /// Get current list of authorities. This is supposed to be genesis authorities when starting sync. + fn current_authorities(&self) -> AuthorityList; +} /// Generates a [`RequestResponseConfig`] for the grandpa warp sync request protocol, refusing incoming requests. pub fn generate_request_response_config(protocol_id: ProtocolId) -> RequestResponseConfig { RequestResponseConfig { name: generate_protocol_name(protocol_id).into(), max_request_size: 32, - max_response_size: proof::MAX_WARP_SYNC_PROOF_SIZE as u64, + max_response_size: MAX_RESPONSE_SIZE, request_timeout: Duration::from_secs(10), inbound_queue: None, } @@ -82,76 +85,59 @@ fn generate_protocol_name(protocol_id: ProtocolId) -> String { s } -#[derive(Decode)] -struct Request { - begin: B::Hash, -} - /// Handler for incoming grandpa warp sync requests from a remote peer. -pub struct GrandpaWarpSyncRequestHandler { - backend: Arc, - authority_set: SharedAuthoritySet>, +pub struct RequestHandler { + backend: Arc>, request_receiver: mpsc::Receiver, - _phantom: std::marker::PhantomData, } -impl> GrandpaWarpSyncRequestHandler { - /// Create a new [`GrandpaWarpSyncRequestHandler`]. +impl RequestHandler { + /// Create a new [`RequestHandler`]. pub fn new( protocol_id: ProtocolId, - backend: Arc, - authority_set: SharedAuthoritySet>, + backend: Arc>, ) -> (Self, RequestResponseConfig) { let (tx, request_receiver) = mpsc::channel(20); let mut request_response_config = generate_request_response_config(protocol_id); request_response_config.inbound_queue = Some(tx); - ( - Self { backend, request_receiver, _phantom: std::marker::PhantomData, authority_set }, - request_response_config, - ) + (Self { backend, request_receiver }, request_response_config) } fn handle_request( &self, payload: Vec, pending_response: oneshot::Sender, - ) -> Result<(), HandleRequestError> - where - NumberFor: sc_finality_grandpa::BlockNumberOps, - { + ) -> Result<(), HandleRequestError> { let request = Request::::decode(&mut &payload[..])?; - let proof = WarpSyncProof::generate( - &*self.backend, - request.begin, - &self.authority_set.authority_set_changes(), - )?; + let EncodedProof(proof) = self + .backend + .generate(request.begin) + .map_err(HandleRequestError::InvalidRequest)?; pending_response .send(OutgoingResponse { - result: Ok(proof.encode()), + result: Ok(proof), reputation_changes: Vec::new(), sent_feedback: None, }) .map_err(|_| HandleRequestError::SendResponse) } - /// Run [`GrandpaWarpSyncRequestHandler`]. - pub async fn run(mut self) - where - NumberFor: sc_finality_grandpa::BlockNumberOps, - { + /// Run [`RequestHandler`]. + pub async fn run(mut self) { while let Some(request) = self.request_receiver.next().await { let IncomingRequest { peer, payload, pending_response } = request; match self.handle_request(payload, pending_response) { Ok(()) => - debug!(target: LOG_TARGET, "Handled grandpa warp sync request from {}.", peer), + debug!(target: "sync", "Handled grandpa warp sync request from {}.", peer), Err(e) => debug!( - target: LOG_TARGET, - "Failed to handle grandpa warp sync request from {}: {}", peer, e, + target: "sync", + "Failed to handle grandpa warp sync request from {}: {}", + peer, e, ), } } @@ -159,7 +145,7 @@ impl> GrandpaWarpSyncRequestHandler), #[display(fmt = "Failed to send response.")] SendResponse, - #[display(fmt = "Missing required data to be able to answer request.")] - MissingData, } diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 553353d77ac36..7668aa8fd56e3 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -108,25 +108,19 @@ impl PassThroughVerifier { impl Verifier for PassThroughVerifier { async fn verify( &mut self, - origin: BlockOrigin, - header: B::Header, - justifications: Option, - body: Option>, + mut block: BlockImportParams, ) -> Result<(BlockImportParams, Option)>>), String> { - let maybe_keys = header + let maybe_keys = block + .header .digest() .log(|l| { l.try_as_raw(OpaqueDigestItemId::Consensus(b"aura")) .or_else(|| l.try_as_raw(OpaqueDigestItemId::Consensus(b"babe"))) }) .map(|blob| vec![(well_known_cache_keys::AUTHORITIES, blob.to_vec())]); - let mut import = BlockImportParams::new(origin, header); - import.body = body; - import.finalized = self.finalized; - import.justifications = justifications; - import.fork_choice = Some(self.fork_choice.clone()); - - Ok((import, maybe_keys)) + block.finalized = self.finalized; + block.fork_choice = Some(self.fork_choice.clone()); + Ok((block, maybe_keys)) } } @@ -389,13 +383,10 @@ where block.header.parent_hash, ); let header = block.header.clone(); - let (import_block, cache) = futures::executor::block_on(self.verifier.verify( - origin, - header.clone(), - None, - if headers_only { None } else { Some(block.extrinsics) }, - )) - .unwrap(); + let mut import_block = BlockImportParams::new(origin, header.clone()); + import_block.body = if headers_only { None } else { Some(block.extrinsics) }; + let (import_block, cache) = + futures::executor::block_on(self.verifier.verify(import_block)).unwrap(); let cache = if let Some(cache) = cache { cache.into_iter().collect() } else { @@ -631,21 +622,13 @@ struct VerifierAdapter { impl Verifier for VerifierAdapter { async fn verify( &mut self, - origin: BlockOrigin, - header: B::Header, - justifications: Option, - body: Option>, + block: BlockImportParams, ) -> Result<(BlockImportParams, Option)>>), String> { - let hash = header.hash(); - self.verifier - .lock() - .await - .verify(origin, header, justifications, body) - .await - .map_err(|e| { - self.failed_verifications.lock().insert(hash, e.clone()); - e - }) + let hash = block.header.hash(); + self.verifier.lock().await.verify(block).await.map_err(|e| { + self.failed_verifications.lock().insert(hash, e.clone()); + e + }) } } @@ -850,6 +833,7 @@ where block_request_protocol_config, state_request_protocol_config, light_client_request_protocol_config, + warp_sync: None, }) .unwrap(); @@ -939,6 +923,7 @@ where block_request_protocol_config, state_request_protocol_config, light_client_request_protocol_config, + warp_sync: None, }) .unwrap(); diff --git a/client/rpc/src/state/tests.rs b/client/rpc/src/state/tests.rs index 3990d6ea8ad3a..6754a68296a64 100644 --- a/client/rpc/src/state/tests.rs +++ b/client/rpc/src/state/tests.rs @@ -343,7 +343,10 @@ fn should_query_storage() { Err(Error::InvalidBlockRange { from: format!("{:?}", genesis_hash), to: format!("{:?}", Some(random_hash1)), - details: format!("UnknownBlock: header not found in db: {}", random_hash1), + details: format!( + "UnknownBlock: Header was not found in the database: {:?}", + random_hash1 + ), }) .map_err(|e| e.to_string()) ); @@ -356,7 +359,10 @@ fn should_query_storage() { Err(Error::InvalidBlockRange { from: format!("{:?}", random_hash1), to: format!("{:?}", Some(genesis_hash)), - details: format!("UnknownBlock: header not found in db: {}", random_hash1), + details: format!( + "UnknownBlock: Header was not found in the database: {:?}", + random_hash1 + ), }) .map_err(|e| e.to_string()), ); @@ -369,7 +375,10 @@ fn should_query_storage() { Err(Error::InvalidBlockRange { from: format!("{:?}", random_hash1), to: format!("{:?}", Some(block2_hash)), // Best block hash. - details: format!("UnknownBlock: header not found in db: {}", random_hash1), + details: format!( + "UnknownBlock: Header was not found in the database: {:?}", + random_hash1 + ), }) .map_err(|e| e.to_string()), ); @@ -382,7 +391,10 @@ fn should_query_storage() { Err(Error::InvalidBlockRange { from: format!("{:?}", random_hash1), // First hash not found. to: format!("{:?}", Some(random_hash2)), - details: format!("UnknownBlock: header not found in db: {}", random_hash1), + details: format!( + "UnknownBlock: Header was not found in the database: {:?}", + random_hash1 + ), }) .map_err(|e| e.to_string()), ); diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 1f54850059fb5..fb24a890133c6 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -44,6 +44,7 @@ use sc_network::{ config::{OnDemand, Role, SyncMode}, light_client_requests::{self, handler::LightClientRequestHandler}, state_request_handler::{self, StateRequestHandler}, + warp_request_handler::{self, RequestHandler as WarpSyncRequestHandler, WarpSyncProvider}, NetworkService, }; use sc_telemetry::{telemetry, ConnectionMessage, Telemetry, TelemetryHandle, SUBSTRATE_INFO}; @@ -354,7 +355,7 @@ where wasm_runtime_overrides: config.wasm_runtime_overrides.clone(), no_genesis: matches!( config.network.sync_mode, - sc_network::config::SyncMode::Fast { .. } + sc_network::config::SyncMode::Fast { .. } | sc_network::config::SyncMode::Warp ), wasm_runtime_substitutes, }, @@ -843,6 +844,8 @@ pub struct BuildNetworkParams<'a, TBl: BlockT, TExPool, TImpQu, TCl> { /// A block announce validator builder. pub block_announce_validator_builder: Option) -> Box + Send> + Send>>, + /// An optional warp sync provider. + pub warp_sync: Option>>, } /// Build the network service, the network status sinks and an RPC sender. @@ -878,6 +881,7 @@ where import_queue, on_demand, block_announce_validator_builder, + warp_sync, } = params; let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { @@ -928,6 +932,20 @@ where } }; + let warp_sync_params = warp_sync.map(|provider| { + let protocol_config = if matches!(config.role, Role::Light) { + // Allow outgoing requests but deny incoming requests. + warp_request_handler::generate_request_response_config(protocol_id.clone()) + } else { + // Allow both outgoing and incoming requests. + let (handler, protocol_config) = + WarpSyncRequestHandler::new(protocol_id.clone(), provider.clone()); + spawn_handle.spawn("warp_sync_request_handler", handler.run()); + protocol_config + }; + (provider, protocol_config) + }); + let light_client_request_protocol_config = { if matches!(config.role, Role::Light) { // Allow outgoing requests but deny incoming requests. @@ -965,6 +983,7 @@ where metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_request_protocol_config, state_request_protocol_config, + warp_sync: warp_sync_params, light_client_request_protocol_config, }; diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 553584b15c029..01688f0c8e702 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -769,6 +769,8 @@ where { let parent_hash = import_headers.post().parent_hash().clone(); let status = self.backend.blockchain().status(BlockId::Hash(hash))?; + let parent_exists = self.backend.blockchain().status(BlockId::Hash(parent_hash))? == + blockchain::BlockStatus::InChain; match (import_existing, status) { (false, blockchain::BlockStatus::InChain) => return Ok(ImportResult::AlreadyInChain), (false, blockchain::BlockStatus::Unknown) => {}, @@ -815,7 +817,6 @@ where if let Some(changes_trie_transaction) = changes_trie_tx { operation.op.update_changes_trie(changes_trie_transaction)?; } - Some((main_sc, child_sc)) }, sc_consensus::StorageChanges::Import(changes) => { @@ -834,10 +835,10 @@ where None }, }; - - // ensure parent block is finalized to maintain invariant that - // finality is called sequentially. - if finalized { + // Ensure parent chain is finalized to maintain invariant that + // finality is called sequentially. This will also send finality + // notifications for top 250 newly finalized blocks. + if finalized && parent_exists { self.apply_finality_with_block_hash( operation, parent_hash, @@ -868,7 +869,7 @@ where NewBlockState::Normal }; - let tree_route = if is_new_best && info.best_hash != parent_hash { + let tree_route = if is_new_best && info.best_hash != parent_hash && parent_exists { let route_from_best = sp_blockchain::tree_route(self.backend.blockchain(), info.best_hash, parent_hash)?; Some(route_from_best) @@ -932,21 +933,21 @@ where let at = BlockId::Hash(*parent_hash); let state_action = std::mem::replace(&mut import_block.state_action, StateAction::Skip); let (enact_state, storage_changes) = match (self.block_status(&at)?, state_action) { - (BlockStatus::Unknown, _) => - return Ok(PrepareStorageChangesResult::Discard(ImportResult::UnknownParent)), (BlockStatus::KnownBad, _) => return Ok(PrepareStorageChangesResult::Discard(ImportResult::KnownBad)), - (_, StateAction::Skip) => (false, None), ( BlockStatus::InChainPruned, StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(_)), ) => return Ok(PrepareStorageChangesResult::Discard(ImportResult::MissingState)), + (_, StateAction::ApplyChanges(changes)) => (true, Some(changes)), + (BlockStatus::Unknown, _) => + return Ok(PrepareStorageChangesResult::Discard(ImportResult::UnknownParent)), + (_, StateAction::Skip) => (false, None), (BlockStatus::InChainPruned, StateAction::Execute) => return Ok(PrepareStorageChangesResult::Discard(ImportResult::MissingState)), (BlockStatus::InChainPruned, StateAction::ExecuteIfPossible) => (false, None), (_, StateAction::Execute) => (true, None), (_, StateAction::ExecuteIfPossible) => (true, None), - (_, StateAction::ApplyChanges(changes)) => (true, Some(changes)), }; let storage_changes = match (enact_state, storage_changes, &import_block.body) { @@ -1912,8 +1913,14 @@ where &mut self, block: BlockCheckParams, ) -> Result { - let BlockCheckParams { hash, number, parent_hash, allow_missing_state, import_existing } = - block; + let BlockCheckParams { + hash, + number, + parent_hash, + allow_missing_state, + import_existing, + allow_missing_parent, + } = block; // Check the block against white and black lists if any are defined // (i.e. fork blocks and bad blocks respectively) @@ -1955,6 +1962,7 @@ where .map_err(|e| ConsensusError::ClientImport(e.to_string()))? { BlockStatus::InChainWithState | BlockStatus::Queued => {}, + BlockStatus::Unknown if allow_missing_parent => {}, BlockStatus::Unknown => return Ok(ImportResult::UnknownParent), BlockStatus::InChainPruned if allow_missing_state => {}, BlockStatus::InChainPruned => return Ok(ImportResult::MissingState), diff --git a/client/service/test/src/client/mod.rs b/client/service/test/src/client/mod.rs index dd0a33b7e8584..6ac149677bc11 100644 --- a/client/service/test/src/client/mod.rs +++ b/client/service/test/src/client/mod.rs @@ -1209,6 +1209,7 @@ fn import_with_justification() { .unwrap() .block; block_on(client.import(BlockOrigin::Own, a2.clone())).unwrap(); + client.finalize_block(BlockId::hash(a2.hash()), None).unwrap(); // A2 -> A3 let justification = Justifications::from((TEST_ENGINE_ID, vec![1, 2, 3])); @@ -1555,6 +1556,7 @@ fn respects_block_rules() { number: 0, parent_hash: block_ok.header().parent_hash().clone(), allow_missing_state: false, + allow_missing_parent: false, import_existing: false, }; assert_eq!(block_on(client.check_block(params)).unwrap(), ImportResult::imported(false)); @@ -1570,6 +1572,7 @@ fn respects_block_rules() { number: 0, parent_hash: block_not_ok.header().parent_hash().clone(), allow_missing_state: false, + allow_missing_parent: false, import_existing: false, }; if record_only { @@ -1592,6 +1595,7 @@ fn respects_block_rules() { number: 1, parent_hash: block_ok.header().parent_hash().clone(), allow_missing_state: false, + allow_missing_parent: false, import_existing: false, }; if record_only { @@ -1610,6 +1614,7 @@ fn respects_block_rules() { number: 1, parent_hash: block_not_ok.header().parent_hash().clone(), allow_missing_state: false, + allow_missing_parent: false, import_existing: false, }; @@ -1676,6 +1681,7 @@ fn returns_status_for_pruned_blocks() { number: 0, parent_hash: a1.header().parent_hash().clone(), allow_missing_state: false, + allow_missing_parent: false, import_existing: false, }; @@ -1712,6 +1718,7 @@ fn returns_status_for_pruned_blocks() { number: 1, parent_hash: a1.header().parent_hash().clone(), allow_missing_state: false, + allow_missing_parent: false, import_existing: false, }; @@ -1745,6 +1752,7 @@ fn returns_status_for_pruned_blocks() { number: 2, parent_hash: a2.header().parent_hash().clone(), allow_missing_state: false, + allow_missing_parent: false, import_existing: false, }; @@ -1779,6 +1787,7 @@ fn returns_status_for_pruned_blocks() { number: 0, parent_hash: b1.header().parent_hash().clone(), allow_missing_state: false, + allow_missing_parent: false, import_existing: false, }; assert_eq!( diff --git a/primitives/finality-grandpa/src/lib.rs b/primitives/finality-grandpa/src/lib.rs index a083796d659c8..353a3cd07822b 100644 --- a/primitives/finality-grandpa/src/lib.rs +++ b/primitives/finality-grandpa/src/lib.rs @@ -492,7 +492,7 @@ sp_api::decl_runtime_apis! { /// applied in the runtime after those N blocks have passed. /// /// The consensus protocol will coordinate the handoff externally. - #[api_version(2)] + #[api_version(3)] pub trait GrandpaApi { /// Get the current GRANDPA authorities and weights. This should not change except /// for when changes are scheduled and the corresponding delay has passed. @@ -530,5 +530,8 @@ sp_api::decl_runtime_apis! { set_id: SetId, authority_id: AuthorityId, ) -> Option; + + /// Get current GRANDPA authority set id. + fn current_set_id() -> SetId; } } diff --git a/test-utils/runtime/src/lib.rs b/test-utils/runtime/src/lib.rs index bdf45ceae88b4..a148ce5cb75a0 100644 --- a/test-utils/runtime/src/lib.rs +++ b/test-utils/runtime/src/lib.rs @@ -883,6 +883,10 @@ cfg_if! { Vec::new() } + fn current_set_id() -> sp_finality_grandpa::SetId { + 0 + } + fn submit_report_equivocation_unsigned_extrinsic( _equivocation_proof: sp_finality_grandpa::EquivocationProof< ::Hash, diff --git a/test-utils/test-runner/Cargo.toml b/test-utils/test-runner/Cargo.toml index 0eb02d941712f..06454ee24eaed 100644 --- a/test-utils/test-runner/Cargo.toml +++ b/test-utils/test-runner/Cargo.toml @@ -16,6 +16,7 @@ sc-basic-authorship = { path = "../../client/basic-authorship" } sc-rpc = { path = "../../client/rpc" } sc-transaction-pool = { path = "../../client/transaction-pool" } grandpa = { package = "sc-finality-grandpa", path = "../../client/finality-grandpa" } +sp-finality-grandpa = { path = "../../primitives/finality-grandpa" } sp-consensus-babe = { path = "../../primitives/consensus/babe" } sc-consensus-babe = { path = "../../client/consensus/babe" } sc-consensus = { path = "../../client/consensus/common" } diff --git a/test-utils/test-runner/src/client.rs b/test-utils/test-runner/src/client.rs index 71a156b8bc0d9..d130993bff4c7 100644 --- a/test-utils/test-runner/src/client.rs +++ b/test-utils/test-runner/src/client.rs @@ -35,6 +35,7 @@ use sc_transaction_pool_api::TransactionPool; use sp_api::{ApiExt, ConstructRuntimeApi, Core, Metadata}; use sp_block_builder::BlockBuilder; use sp_consensus_babe::BabeApi; +use sp_finality_grandpa::GrandpaApi; use sp_keyring::sr25519::Keyring::Alice; use sp_offchain::OffchainWorkerApi; use sp_runtime::traits::{Block as BlockT, Header}; @@ -90,7 +91,8 @@ where + TaggedTransactionQueue + BlockBuilder + BabeApi - + ApiExt as Backend>::State>, + + ApiExt as Backend>::State> + + GrandpaApi, ::Call: From>, <::Block as BlockT>::Hash: FromStr, <<::Block as BlockT>::Header as Header>::Number: @@ -151,6 +153,7 @@ where import_queue, on_demand: None, block_announce_validator_builder: None, + warp_sync: None, }; build_network(params)? };