From 43cd6fd4370d3043272f64a79aeb9e6dc0edd13f Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 17 Sep 2024 12:08:50 +0300 Subject: [PATCH] Syncing strategy refactoring (part 2) (#5666) # Description Follow-up to https://github.com/paritytech/polkadot-sdk/pull/5469 and mostly covering https://github.com/paritytech/polkadot-sdk/issues/5333. The primary change here is that syncing strategy is no longer created inside of syncing engine, instead syncing strategy is an argument of syncing engine, more specifically it is an argument to `build_network` that most downstream users will use. This also extracts addition of request-response protocols outside of network construction, making sure they are physically not present when they don't need to be (imagine syncing strategy that uses none of Substrate's protocols in its implementation for example). This technically allows to completely replace syncing strategy with whatever strategy chain might need. There will be at least one follow-up PR that will simplify `SyncingStrategy` trait and other public interfaces to remove mentions of block/state/warp sync requests, replacing them with generic APIs, such that strategies where warp sync is not applicable don't have to provide dummy method implementations, etc. ## Integration Downstream projects will have to write a bit of boilerplate calling `build_polkadot_syncing_strategy` function to create previously default syncing strategy. ## Review Notes Please review PR through individual commits rather than the final diff, it will be easier that way. The changes are mostly just moving code around one step at a time. # Checklist * [x] My PR includes a detailed description as outlined in the "Description" and its two subsections above. * [x] My PR follows the [labeling requirements]( https://github.com/paritytech/polkadot-sdk/blob/master/docs/contributor/CONTRIBUTING.md#Process ) of this project (at minimum one label for `T` required) * External contributors: ask maintainers to put the right label on your PR. * [x] I have made corresponding changes to the documentation (if applicable) --- cumulus/client/service/src/lib.rs | 23 +- polkadot/node/service/src/lib.rs | 14 +- prdoc/pr_5666.prdoc | 19 ++ substrate/bin/node/cli/src/service.rs | 13 +- substrate/client/network/sync/src/engine.rs | 77 ++---- substrate/client/network/sync/src/strategy.rs | 54 +++- .../network/sync/src/strategy/chain_sync.rs | 12 +- .../sync/src/strategy/chain_sync/test.rs | 117 +++++++-- .../client/network/sync/src/strategy/state.rs | 91 +++++-- .../client/network/sync/src/strategy/warp.rs | 88 ++++--- substrate/client/network/test/src/lib.rs | 30 ++- substrate/client/network/test/src/service.rs | 17 +- substrate/client/service/src/builder.rs | 245 ++++++++++-------- substrate/client/service/src/lib.rs | 10 +- templates/minimal/node/src/service.rs | 21 +- templates/solochain/node/src/service.rs | 17 +- 16 files changed, 562 insertions(+), 286 deletions(-) create mode 100644 prdoc/pr_5666.prdoc diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index dd14ca514b3f..c95c72c370a1 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -40,7 +40,10 @@ use sc_consensus::{ use sc_network::{config::SyncMode, service::traits::NetworkService, NetworkBackend}; use sc_network_sync::SyncingService; use sc_network_transactions::TransactionsHandlerController; -use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncConfig}; +use sc_service::{ + build_polkadot_syncing_strategy, Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, + WarpSyncConfig, +}; use sc_telemetry::{log, TelemetryWorkerHandle}; use sc_utils::mpsc::TracingUnboundedSender; use sp_api::ProvideRuntimeApi; @@ -425,7 +428,7 @@ pub struct BuildNetworkParams< pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>( BuildNetworkParams { parachain_config, - net_config, + mut net_config, client, transaction_pool, para_id, @@ -462,7 +465,7 @@ where IQ: ImportQueue + 'static, Network: NetworkBackend::Hash>, { - let warp_sync_params = match parachain_config.network.sync_mode { + let warp_sync_config = match parachain_config.network.sync_mode { SyncMode::Warp => { log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block..."); @@ -493,9 +496,19 @@ where }, }; let metrics = Network::register_notification_metrics( - parachain_config.prometheus_config.as_ref().map(|cfg| &cfg.registry), + parachain_config.prometheus_config.as_ref().map(|config| &config.registry), ); + let syncing_strategy = build_polkadot_syncing_strategy( + parachain_config.protocol_id(), + parachain_config.chain_spec.fork_id(), + &mut net_config, + warp_sync_config, + client.clone(), + &spawn_handle, + parachain_config.prometheus_config.as_ref().map(|config| &config.registry), + )?; + sc_service::build_network(sc_service::BuildNetworkParams { config: parachain_config, net_config, @@ -504,7 +517,7 @@ where spawn_handle, import_queue, block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)), - warp_sync_config: warp_sync_params, + syncing_strategy, block_relay: None, metrics, }) diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index fe96d29c1ceb..dd35423e18e1 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -84,7 +84,7 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use prometheus_endpoint::Registry; #[cfg(feature = "full-node")] use sc_service::KeystoreContainer; -use sc_service::{RpcHandlers, SpawnTaskHandle}; +use sc_service::{build_polkadot_syncing_strategy, RpcHandlers, SpawnTaskHandle}; use sc_telemetry::TelemetryWorker; #[cfg(feature = "full-node")] use sc_telemetry::{Telemetry, TelemetryWorkerHandle}; @@ -1028,6 +1028,16 @@ pub fn new_full< }) }; + let syncing_strategy = build_polkadot_syncing_strategy( + config.protocol_id(), + config.chain_spec.fork_id(), + &mut net_config, + Some(WarpSyncConfig::WithProvider(warp_sync)), + client.clone(), + &task_manager.spawn_handle(), + config.prometheus_config.as_ref().map(|config| &config.registry), + )?; + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -1037,7 +1047,7 @@ pub fn new_full< spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, - warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)), + syncing_strategy, block_relay: None, metrics, })?; diff --git a/prdoc/pr_5666.prdoc b/prdoc/pr_5666.prdoc new file mode 100644 index 000000000000..08bd9815cdd4 --- /dev/null +++ b/prdoc/pr_5666.prdoc @@ -0,0 +1,19 @@ +title: Make syncing strategy an argument of the syncing engine + +doc: + - audience: Node Dev + description: | + Syncing strategy is no longer implicitly created when building network, but needs to be instantiated explicitly. + Previously default implementation can be created with new function `build_polkadot_syncing_strategy` or custom + syncing strategy could be implemented and used instead if desired, providing greater flexibility for chain + developers. + +crates: + - name: cumulus-client-service + bump: patch + - name: polkadot-service + bump: patch + - name: sc-service + bump: major + - name: sc-network-sync + bump: major diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 1b345a23f27e..69e953f54e42 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -32,6 +32,7 @@ use frame_system_rpc_runtime_api::AccountNonceApi; use futures::prelude::*; use kitchensink_runtime::RuntimeApi; use node_primitives::Block; +use polkadot_sdk::sc_service::build_polkadot_syncing_strategy; use sc_client_api::{Backend, BlockBackend}; use sc_consensus_babe::{self, SlotProportion}; use sc_network::{ @@ -506,6 +507,16 @@ pub fn new_full_base::Hash>>( Vec::default(), )); + let syncing_strategy = build_polkadot_syncing_strategy( + config.protocol_id(), + config.chain_spec.fork_id(), + &mut net_config, + Some(WarpSyncConfig::WithProvider(warp_sync)), + client.clone(), + &task_manager.spawn_handle(), + config.prometheus_config.as_ref().map(|config| &config.registry), + )?; + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -515,7 +526,7 @@ pub fn new_full_base::Hash>>( spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, - warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)), + syncing_strategy, block_relay: None, metrics, })?; diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 86c1a7abf744..aafbd950202d 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -24,7 +24,6 @@ use crate::{ BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream, }, block_relay_protocol::{BlockDownloader, BlockResponseError}, - block_request_handler::MAX_BLOCKS_IN_RESPONSE, pending_responses::{PendingResponses, ResponseEvent}, schema::v1::{StateRequest, StateResponse}, service::{ @@ -32,8 +31,8 @@ use crate::{ syncing_service::{SyncingService, ToServiceCommand}, }, strategy::{ - warp::{EncodedProof, WarpProofRequest, WarpSyncConfig}, - PolkadotSyncingStrategy, StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy, + warp::{EncodedProof, WarpProofRequest}, + StrategyKey, SyncingAction, SyncingStrategy, }, types::{ BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent, @@ -189,7 +188,7 @@ pub struct Peer { pub struct SyncingEngine { /// Syncing strategy. - strategy: PolkadotSyncingStrategy, + strategy: Box>, /// Blockchain client. client: Arc, @@ -271,12 +270,6 @@ pub struct SyncingEngine { /// Block downloader block_downloader: Arc>, - /// Protocol name used to send out state requests - state_request_protocol_name: ProtocolName, - - /// Protocol name used to send out warp sync requests - warp_sync_protocol_name: Option, - /// Handle to import queue. import_queue: Box>, } @@ -301,35 +294,15 @@ where protocol_id: ProtocolId, fork_id: &Option, block_announce_validator: Box + Send>, - warp_sync_config: Option>, + syncing_strategy: Box>, network_service: service::network::NetworkServiceHandle, import_queue: Box>, block_downloader: Arc>, - state_request_protocol_name: ProtocolName, - warp_sync_protocol_name: Option, peer_store_handle: Arc, ) -> Result<(Self, SyncingService, N::NotificationProtocolConfig), ClientError> where N: NetworkBackend::Hash>, { - let mode = net_config.network_config.sync_mode; - let max_parallel_downloads = net_config.network_config.max_parallel_downloads; - let max_blocks_per_request = - if net_config.network_config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 { - log::info!( - target: LOG_TARGET, - "clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}", - ); - MAX_BLOCKS_IN_RESPONSE as u32 - } else { - net_config.network_config.max_blocks_per_request - }; - let syncing_config = SyncingConfig { - mode, - max_parallel_downloads, - max_blocks_per_request, - metrics_registry: metrics_registry.cloned(), - }; let cache_capacity = (net_config.network_config.default_peers_set.in_peers + net_config.network_config.default_peers_set.out_peers) .max(1); @@ -388,10 +361,6 @@ where Arc::clone(&peer_store_handle), ); - // Initialize syncing strategy. - let strategy = - PolkadotSyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?; - let block_announce_protocol_name = block_announce_config.protocol_name().clone(); let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); let num_connected = Arc::new(AtomicUsize::new(0)); @@ -413,7 +382,7 @@ where Self { roles, client, - strategy, + strategy: syncing_strategy, network_service, peers: HashMap::new(), block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)), @@ -450,8 +419,6 @@ where }, pending_responses: PendingResponses::new(), block_downloader, - state_request_protocol_name, - warp_sync_protocol_name, import_queue, }, SyncingService::new(tx, num_connected, is_major_syncing), @@ -652,16 +619,16 @@ where "Processed {action:?}, response removed: {removed}.", ); }, - SyncingAction::SendStateRequest { peer_id, key, request } => { - self.send_state_request(peer_id, key, request); + SyncingAction::SendStateRequest { peer_id, key, protocol_name, request } => { + self.send_state_request(peer_id, key, protocol_name, request); trace!( target: LOG_TARGET, "Processed `ChainSyncAction::SendStateRequest` to {peer_id}.", ); }, - SyncingAction::SendWarpProofRequest { peer_id, key, request } => { - self.send_warp_proof_request(peer_id, key, request.clone()); + SyncingAction::SendWarpProofRequest { peer_id, key, protocol_name, request } => { + self.send_warp_proof_request(peer_id, key, protocol_name, request.clone()); trace!( target: LOG_TARGET, @@ -1054,6 +1021,7 @@ where &mut self, peer_id: PeerId, key: StrategyKey, + protocol_name: ProtocolName, request: OpaqueStateRequest, ) { if !self.peers.contains_key(&peer_id) { @@ -1070,7 +1038,7 @@ where Ok(data) => { self.network_service.start_request( peer_id, - self.state_request_protocol_name.clone(), + protocol_name, data, tx, IfDisconnected::ImmediateError, @@ -1089,6 +1057,7 @@ where &mut self, peer_id: PeerId, key: StrategyKey, + protocol_name: ProtocolName, request: WarpProofRequest, ) { if !self.peers.contains_key(&peer_id) { @@ -1101,21 +1070,13 @@ where self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed()); - match &self.warp_sync_protocol_name { - Some(name) => self.network_service.start_request( - peer_id, - name.clone(), - request.encode(), - tx, - IfDisconnected::ImmediateError, - ), - None => { - log::warn!( - target: LOG_TARGET, - "Trying to send warp sync request when no protocol is configured {request:?}", - ); - }, - } + self.network_service.start_request( + peer_id, + protocol_name, + request.encode(), + tx, + IfDisconnected::ImmediateError, + ); } fn encode_state_request(request: &OpaqueStateRequest) -> Result, String> { diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index f8d6976bbaa0..81998b7576bb 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -26,6 +26,7 @@ pub mod state_sync; pub mod warp; use crate::{ + block_request_handler::MAX_BLOCKS_IN_RESPONSE, types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncStatus}, LOG_TARGET, }; @@ -34,6 +35,7 @@ use log::{debug, error, info}; use prometheus_endpoint::Registry; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; +use sc_network::ProtocolName; use sc_network_common::sync::{ message::{BlockAnnounce, BlockData, BlockRequest}, SyncMode, @@ -172,6 +174,8 @@ pub struct SyncingConfig { pub max_blocks_per_request: u32, /// Prometheus metrics registry. pub metrics_registry: Option, + /// Protocol name used to send out state requests + pub state_request_protocol_name: ProtocolName, } /// The key identifying a specific strategy for responses routing. @@ -190,9 +194,19 @@ pub enum SyncingAction { /// Send block request to peer. Always implies dropping a stale block request to the same peer. SendBlockRequest { peer_id: PeerId, key: StrategyKey, request: BlockRequest }, /// Send state request to peer. - SendStateRequest { peer_id: PeerId, key: StrategyKey, request: OpaqueStateRequest }, + SendStateRequest { + peer_id: PeerId, + key: StrategyKey, + protocol_name: ProtocolName, + request: OpaqueStateRequest, + }, /// Send warp proof request to peer. - SendWarpProofRequest { peer_id: PeerId, key: StrategyKey, request: WarpProofRequest }, + SendWarpProofRequest { + peer_id: PeerId, + key: StrategyKey, + protocol_name: ProtocolName, + request: WarpProofRequest, + }, /// Drop stale request. CancelRequest { peer_id: PeerId, key: StrategyKey }, /// Peer misbehaved. Disconnect, report it and cancel any requests to it. @@ -219,8 +233,13 @@ impl SyncingAction { impl From> for SyncingAction { fn from(action: WarpSyncAction) -> Self { match action { - WarpSyncAction::SendWarpProofRequest { peer_id, request } => - SyncingAction::SendWarpProofRequest { peer_id, key: StrategyKey::Warp, request }, + WarpSyncAction::SendWarpProofRequest { peer_id, protocol_name, request } => + SyncingAction::SendWarpProofRequest { + peer_id, + key: StrategyKey::Warp, + protocol_name, + request, + }, WarpSyncAction::SendBlockRequest { peer_id, request } => SyncingAction::SendBlockRequest { peer_id, key: StrategyKey::Warp, request }, WarpSyncAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), @@ -232,8 +251,13 @@ impl From> for SyncingAction { impl From> for SyncingAction { fn from(action: StateStrategyAction) -> Self { match action { - StateStrategyAction::SendStateRequest { peer_id, request } => - SyncingAction::SendStateRequest { peer_id, key: StrategyKey::State, request }, + StateStrategyAction::SendStateRequest { peer_id, protocol_name, request } => + SyncingAction::SendStateRequest { + peer_id, + key: StrategyKey::State, + protocol_name, + request, + }, StateStrategyAction::DropPeer(bad_peer) => SyncingAction::DropPeer(bad_peer), StateStrategyAction::ImportBlocks { origin, blocks } => SyncingAction::ImportBlocks { origin, blocks }, @@ -509,14 +533,24 @@ where { /// Initialize a new syncing strategy. pub fn new( - config: SyncingConfig, + mut config: SyncingConfig, client: Arc, warp_sync_config: Option>, + warp_sync_protocol_name: Option, ) -> Result { + if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 { + info!( + target: LOG_TARGET, + "clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}", + ); + config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32; + } + if let SyncMode::Warp = config.mode { let warp_sync_config = warp_sync_config .expect("Warp sync configuration must be supplied in warp sync mode."); - let warp_sync = WarpSync::new(client.clone(), warp_sync_config); + let warp_sync = + WarpSync::new(client.clone(), warp_sync_config, warp_sync_protocol_name); Ok(Self { config, client, @@ -531,6 +565,7 @@ where client.clone(), config.max_parallel_downloads, config.max_blocks_per_request, + config.state_request_protocol_name.clone(), config.metrics_registry.as_ref(), std::iter::empty(), )?; @@ -564,6 +599,7 @@ where self.peer_best_blocks .iter() .map(|(peer_id, (_, best_number))| (*peer_id, *best_number)), + self.config.state_request_protocol_name.clone(), ); self.warp = None; @@ -580,6 +616,7 @@ where self.client.clone(), self.config.max_parallel_downloads, self.config.max_blocks_per_request, + self.config.state_request_protocol_name.clone(), self.config.metrics_registry.as_ref(), self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| { (*peer_id, *best_hash, *best_number) @@ -608,6 +645,7 @@ where self.client.clone(), self.config.max_parallel_downloads, self.config.max_blocks_per_request, + self.config.state_request_protocol_name.clone(), self.config.metrics_registry.as_ref(), self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| { (*peer_id, *best_hash, *best_number) diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index a8ba5558d1bc..fd0e3ea1a76c 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -47,6 +47,7 @@ use log::{debug, error, info, trace, warn}; use prometheus_endpoint::{register, Gauge, PrometheusError, Registry, U64}; use sc_client_api::{blockchain::BlockGap, BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; +use sc_network::ProtocolName; use sc_network_common::sync::message::{ BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, Direction, FromBlock, }; @@ -318,6 +319,8 @@ pub struct ChainSync { max_parallel_downloads: u32, /// Maximum blocks per request. max_blocks_per_request: u32, + /// Protocol name used to send out state requests + state_request_protocol_name: ProtocolName, /// Total number of downloaded blocks. downloaded_blocks: usize, /// State sync in progress, if any. @@ -880,7 +883,12 @@ where self.actions.extend(justification_requests); let state_request = self.state_request().into_iter().map(|(peer_id, request)| { - SyncingAction::SendStateRequest { peer_id, key: StrategyKey::ChainSync, request } + SyncingAction::SendStateRequest { + peer_id, + key: StrategyKey::ChainSync, + protocol_name: self.state_request_protocol_name.clone(), + request, + } }); self.actions.extend(state_request); @@ -905,6 +913,7 @@ where client: Arc, max_parallel_downloads: u32, max_blocks_per_request: u32, + state_request_protocol_name: ProtocolName, metrics_registry: Option<&Registry>, initial_peers: impl Iterator)>, ) -> Result { @@ -923,6 +932,7 @@ where allowed_requests: Default::default(), max_parallel_downloads, max_blocks_per_request, + state_request_protocol_name, downloaded_blocks: 0, state_sync: None, import_existing: false, diff --git a/substrate/client/network/sync/src/strategy/chain_sync/test.rs b/substrate/client/network/sync/src/strategy/chain_sync/test.rs index 59436f387db6..d13f034e2e8d 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync/test.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync/test.rs @@ -38,9 +38,16 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { let client = Arc::new(TestClientBuilder::new().build()); let peer_id = PeerId::random(); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let (a1_hash, a1_number) = { let a1 = BlockBuilderBuilder::new(&*client) @@ -95,9 +102,16 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { // we request max 8 blocks to always initiate block requests to both peers for the test to be // deterministic - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 8, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 8, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -291,9 +305,16 @@ fn do_ancestor_search_when_common_block_to_best_queued_gap_is_to_big() { let client = Arc::new(TestClientBuilder::new().build()); let info = client.info(); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let peer_id2 = PeerId::random(); @@ -438,9 +459,16 @@ fn can_sync_huge_fork() { let info = client.info(); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); let just = (*b"TEST", Vec::new()); @@ -572,9 +600,16 @@ fn syncs_fork_without_duplicate_requests() { let info = client.info(); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let finalized_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize * 2 - 1].clone(); let just = (*b"TEST", Vec::new()); @@ -709,9 +744,16 @@ fn removes_target_fork_on_disconnect() { let client = Arc::new(TestClientBuilder::new().build()); let blocks = (0..3).map(|_| build_block(&client, None, false)).collect::>(); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let common_block = blocks[1].clone(); @@ -736,9 +778,16 @@ fn can_import_response_with_missing_blocks() { let empty_client = Arc::new(TestClientBuilder::new().build()); - let mut sync = - ChainSync::new(ChainSyncMode::Full, empty_client.clone(), 1, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + empty_client.clone(), + 1, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let peer_id1 = PeerId::random(); let best_block = blocks[3].clone(); @@ -769,9 +818,16 @@ fn ancestor_search_repeat() { #[test] fn sync_restart_removes_block_but_not_justification_requests() { let client = Arc::new(TestClientBuilder::new().build()); - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 1, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 1, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); let peers = vec![PeerId::random(), PeerId::random()]; @@ -913,9 +969,16 @@ fn request_across_forks() { fork_blocks }; - let mut sync = - ChainSync::new(ChainSyncMode::Full, client.clone(), 5, 64, None, std::iter::empty()) - .unwrap(); + let mut sync = ChainSync::new( + ChainSyncMode::Full, + client.clone(), + 5, + 64, + ProtocolName::Static(""), + None, + std::iter::empty(), + ) + .unwrap(); // Add the peers, all at the common ancestor 100. let common_block = blocks.last().unwrap(); diff --git a/substrate/client/network/sync/src/strategy/state.rs b/substrate/client/network/sync/src/strategy/state.rs index 6f06f238fe3a..a04ab8be4fea 100644 --- a/substrate/client/network/sync/src/strategy/state.rs +++ b/substrate/client/network/sync/src/strategy/state.rs @@ -30,6 +30,7 @@ use crate::{ use log::{debug, error, trace}; use sc_client_api::ProofProvider; use sc_consensus::{BlockImportError, BlockImportStatus, IncomingBlock}; +use sc_network::ProtocolName; use sc_network_common::sync::message::BlockAnnounce; use sc_network_types::PeerId; use sp_consensus::BlockOrigin; @@ -52,7 +53,7 @@ mod rep { /// Action that should be performed on [`StateStrategy`]'s behalf. pub enum StateStrategyAction { /// Send state request to peer. - SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, + SendStateRequest { peer_id: PeerId, protocol_name: ProtocolName, request: OpaqueStateRequest }, /// Disconnect and report peer. DropPeer(BadPeer), /// Import blocks. @@ -83,6 +84,7 @@ pub struct StateStrategy { peers: HashMap>, disconnected_peers: DisconnectedPeers, actions: Vec>, + protocol_name: ProtocolName, succeeded: bool, } @@ -95,6 +97,7 @@ impl StateStrategy { target_justifications: Option, skip_proof: bool, initial_peers: impl Iterator)>, + protocol_name: ProtocolName, ) -> Self where Client: ProofProvider + Send + Sync + 'static, @@ -115,6 +118,7 @@ impl StateStrategy { peers, disconnected_peers: DisconnectedPeers::new(), actions: Vec::new(), + protocol_name, succeeded: false, } } @@ -125,6 +129,7 @@ impl StateStrategy { fn new_with_provider( state_sync_provider: Box>, initial_peers: impl Iterator)>, + protocol_name: ProtocolName, ) -> Self { Self { state_sync: state_sync_provider, @@ -135,6 +140,7 @@ impl StateStrategy { .collect(), disconnected_peers: DisconnectedPeers::new(), actions: Vec::new(), + protocol_name, succeeded: false, } } @@ -349,10 +355,13 @@ impl StateStrategy { /// Get actions that should be performed by the owner on [`WarpSync`]'s behalf #[must_use] pub fn actions(&mut self) -> impl Iterator> { - let state_request = self - .state_request() - .into_iter() - .map(|(peer_id, request)| StateStrategyAction::SendStateRequest { peer_id, request }); + let state_request = self.state_request().into_iter().map(|(peer_id, request)| { + StateStrategyAction::SendStateRequest { + peer_id, + protocol_name: self.protocol_name.clone(), + request, + } + }); self.actions.extend(state_request); std::mem::take(&mut self.actions).into_iter() @@ -409,8 +418,15 @@ mod test { .block; let target_header = target_block.header().clone(); - let mut state_strategy = - StateStrategy::new(client, target_header, None, None, false, std::iter::empty()); + let mut state_strategy = StateStrategy::new( + client, + target_header, + None, + None, + false, + std::iter::empty(), + ProtocolName::Static(""), + ); assert!(state_strategy .schedule_next_peer(PeerState::DownloadingState, Zero::zero()) @@ -442,6 +458,7 @@ mod test { None, false, initial_peers, + ProtocolName::Static(""), ); let peer_id = @@ -475,6 +492,7 @@ mod test { None, false, initial_peers, + ProtocolName::Static(""), ); let peer_id = state_strategy.schedule_next_peer(PeerState::DownloadingState, 10); @@ -508,6 +526,7 @@ mod test { None, false, initial_peers, + ProtocolName::Static(""), ); // Disconnecting a peer without an inflight request has no effect on persistent states. @@ -557,6 +576,7 @@ mod test { None, false, initial_peers, + ProtocolName::Static(""), ); let (_peer_id, mut opaque_request) = state_strategy.state_request().unwrap(); @@ -587,6 +607,7 @@ mod test { None, false, initial_peers, + ProtocolName::Static(""), ); // First request is sent. @@ -602,8 +623,11 @@ mod test { state_sync_provider.expect_import().return_once(|_| ImportResult::Continue); let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + ProtocolName::Static(""), + ); // Manually set the peer's state. state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; @@ -620,8 +644,11 @@ mod test { state_sync_provider.expect_import().return_once(|_| ImportResult::BadResponse); let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + ProtocolName::Static(""), + ); // Manually set the peer's state. state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; let dummy_response = OpaqueStateResponse(Box::new(StateResponse::default())); @@ -639,8 +666,11 @@ mod test { state_sync_provider.expect_import().return_once(|_| ImportResult::Continue); let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + ProtocolName::Static(""), + ); // Manually set the peer's state . state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; @@ -698,8 +728,11 @@ mod test { // Prepare `StateStrategy`. let peer_id = PeerId::random(); let initial_peers = std::iter::once((peer_id, 10)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + ProtocolName::Static(""), + ); // Manually set the peer's state . state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState; @@ -722,8 +755,11 @@ mod test { let mut state_sync_provider = MockStateSync::::new(); state_sync_provider.expect_target_hash().return_const(target_hash); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), std::iter::empty()); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + std::iter::empty(), + ProtocolName::Static(""), + ); // Unknown block imported. state_strategy.on_blocks_processed( @@ -745,8 +781,11 @@ mod test { let mut state_sync_provider = MockStateSync::::new(); state_sync_provider.expect_target_hash().return_const(target_hash); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), std::iter::empty()); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + std::iter::empty(), + ProtocolName::Static(""), + ); // Target block imported. state_strategy.on_blocks_processed( @@ -769,8 +808,11 @@ mod test { let mut state_sync_provider = MockStateSync::::new(); state_sync_provider.expect_target_hash().return_const(target_hash); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), std::iter::empty()); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + std::iter::empty(), + ProtocolName::Static(""), + ); // Target block import failed. state_strategy.on_blocks_processed( @@ -797,8 +839,11 @@ mod test { // Get enough peers for possible spurious requests. let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number)); - let mut state_strategy = - StateStrategy::new_with_provider(Box::new(state_sync_provider), initial_peers); + let mut state_strategy = StateStrategy::new_with_provider( + Box::new(state_sync_provider), + initial_peers, + ProtocolName::Static(""), + ); state_strategy.on_blocks_processed( 1, diff --git a/substrate/client/network/sync/src/strategy/warp.rs b/substrate/client/network/sync/src/strategy/warp.rs index 99405c2e5f08..cce6a93caf43 100644 --- a/substrate/client/network/sync/src/strategy/warp.rs +++ b/substrate/client/network/sync/src/strategy/warp.rs @@ -26,7 +26,8 @@ use crate::{ LOG_TARGET, }; use codec::{Decode, Encode}; -use log::{debug, error, trace}; +use log::{debug, error, trace, warn}; +use sc_network::ProtocolName; use sc_network_common::sync::message::{ BlockAnnounce, BlockAttributes, BlockData, BlockRequest, Direction, FromBlock, }; @@ -188,7 +189,11 @@ struct Peer { /// Action that should be performed on [`WarpSync`]'s behalf. pub enum WarpSyncAction { /// Send warp proof request to peer. - SendWarpProofRequest { peer_id: PeerId, request: WarpProofRequest }, + SendWarpProofRequest { + peer_id: PeerId, + protocol_name: ProtocolName, + request: WarpProofRequest, + }, /// Send block request to peer. Always implies dropping a stale block request to the same peer. SendBlockRequest { peer_id: PeerId, request: BlockRequest }, /// Disconnect and report peer. @@ -211,6 +216,7 @@ pub struct WarpSync { total_state_bytes: u64, peers: HashMap>, disconnected_peers: DisconnectedPeers, + protocol_name: Option, actions: Vec>, result: Option>, } @@ -223,7 +229,11 @@ where /// Create a new instance. When passing a warp sync provider we will be checking for proof and /// authorities. Alternatively we can pass a target block when we want to skip downloading /// proofs, in this case we will continue polling until the target block is known. - pub fn new(client: Arc, warp_sync_config: WarpSyncConfig) -> Self { + pub fn new( + client: Arc, + warp_sync_config: WarpSyncConfig, + protocol_name: Option, + ) -> Self { if client.info().finalized_state.is_some() { error!( target: LOG_TARGET, @@ -236,6 +246,7 @@ where total_state_bytes: 0, peers: HashMap::new(), disconnected_peers: DisconnectedPeers::new(), + protocol_name, actions: vec![WarpSyncAction::Finished], result: None, } @@ -254,6 +265,7 @@ where total_state_bytes: 0, peers: HashMap::new(), disconnected_peers: DisconnectedPeers::new(), + protocol_name, actions: Vec::new(), result: None, } @@ -469,7 +481,7 @@ where } /// Produce warp proof request. - fn warp_proof_request(&mut self) -> Option<(PeerId, WarpProofRequest)> { + fn warp_proof_request(&mut self) -> Option<(PeerId, ProtocolName, WarpProofRequest)> { let Phase::WarpProof { last_hash, .. } = &self.phase else { return None }; // Copy `last_hash` early to cut the borrowing tie. @@ -487,7 +499,17 @@ where let peer_id = self.schedule_next_peer(PeerState::DownloadingProofs, None)?; trace!(target: LOG_TARGET, "New WarpProofRequest to {peer_id}, begin hash: {begin}."); - Some((peer_id, WarpProofRequest { begin })) + let request = WarpProofRequest { begin }; + + let Some(protocol_name) = self.protocol_name.clone() else { + warn!( + target: LOG_TARGET, + "Trying to send warp sync request when no protocol is configured {request:?}", + ); + return None; + }; + + Some((peer_id, protocol_name, request)) } /// Produce target block request. @@ -585,10 +607,10 @@ where /// Get actions that should be performed by the owner on [`WarpSync`]'s behalf #[must_use] pub fn actions(&mut self) -> impl Iterator> { - let warp_proof_request = self - .warp_proof_request() - .into_iter() - .map(|(peer_id, request)| WarpSyncAction::SendWarpProofRequest { peer_id, request }); + let warp_proof_request = + self.warp_proof_request().into_iter().map(|(peer_id, protocol_name, request)| { + WarpSyncAction::SendWarpProofRequest { peer_id, protocol_name, request } + }); self.actions.extend(warp_proof_request); let target_block_request = self @@ -694,7 +716,7 @@ mod test { let client = mock_client_with_state(); let provider = MockWarpSyncProvider::::new(); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // Warp sync instantly finishes let actions = warp_sync.actions().collect::>(); @@ -715,7 +737,7 @@ mod test { Default::default(), Default::default(), )); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // Warp sync instantly finishes let actions = warp_sync.actions().collect::>(); @@ -731,7 +753,7 @@ mod test { let client = mock_client_without_state(); let provider = MockWarpSyncProvider::::new(); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // No actions are emitted. assert_eq!(warp_sync.actions().count(), 0) @@ -747,7 +769,7 @@ mod test { Default::default(), Default::default(), )); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // No actions are emitted. assert_eq!(warp_sync.actions().count(), 0) @@ -762,7 +784,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // Warp sync is not started when there is not enough peers. for _ in 0..(MIN_PEERS_TO_START_WARP_SYNC - 1) { @@ -780,7 +802,7 @@ mod test { let client = mock_client_without_state(); let provider = MockWarpSyncProvider::::new(); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); assert!(warp_sync.schedule_next_peer(PeerState::DownloadingProofs, None).is_none()); } @@ -804,7 +826,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); for best_number in 1..11 { warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); @@ -825,7 +847,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); for best_number in 1..11 { warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); @@ -845,7 +867,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); for best_number in 1..11 { warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); @@ -889,7 +911,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -918,7 +940,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -936,7 +958,7 @@ mod test { _ => panic!("Invalid phase."), } - let (_peer_id, request) = warp_sync.warp_proof_request().unwrap(); + let (_peer_id, _protocol_name, request) = warp_sync.warp_proof_request().unwrap(); assert_eq!(request.begin, known_last_hash); } @@ -949,7 +971,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make requests. for best_number in 1..11 { @@ -976,7 +998,7 @@ mod test { Err(Box::new(std::io::Error::new(ErrorKind::Other, "test-verification-failure"))) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1017,7 +1039,7 @@ mod test { Ok(VerificationResult::Partial(set_id, authorities, Hash::random())) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1061,7 +1083,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, Some(ProtocolName::Static(""))); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1094,7 +1116,7 @@ mod test { .once() .return_const(AuthorityList::default()); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(Arc::new(client), config); + let mut warp_sync = WarpSync::new(Arc::new(client), config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1129,7 +1151,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1161,7 +1183,7 @@ mod test { .block; let target_header = target_block.header().clone(); let config = WarpSyncConfig::WithTarget(target_header); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1201,7 +1223,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1239,7 +1261,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1293,7 +1315,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1370,7 +1392,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { @@ -1423,7 +1445,7 @@ mod test { Ok(VerificationResult::Complete(set_id, authorities, target_header)) }); let config = WarpSyncConfig::WithProvider(Arc::new(provider)); - let mut warp_sync = WarpSync::new(client, config); + let mut warp_sync = WarpSync::new(client, config, None); // Make sure we have enough peers to make a request. for best_number in 1..11 { diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index f84f353fb4a0..0f73e3194baa 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -66,8 +66,12 @@ use sc_network_sync::{ block_request_handler::BlockRequestHandler, service::{network::NetworkServiceProvider, syncing_service::SyncingService}, state_request_handler::StateRequestHandler, - strategy::warp::{ - AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncConfig, WarpSyncProvider, + strategy::{ + warp::{ + AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncConfig, + WarpSyncProvider, + }, + PolkadotSyncingStrategy, SyncingConfig, }, warp_request_handler, }; @@ -905,6 +909,24 @@ pub trait TestNetFactory: Default + Sized + Send { ::Hash, >>::register_notification_metrics(None); + let syncing_config = SyncingConfig { + mode: network_config.sync_mode, + max_parallel_downloads: network_config.max_parallel_downloads, + max_blocks_per_request: network_config.max_blocks_per_request, + metrics_registry: None, + state_request_protocol_name: state_request_protocol_config.name.clone(), + }; + // Initialize syncing strategy. + let syncing_strategy = Box::new( + PolkadotSyncingStrategy::new( + syncing_config, + client.clone(), + Some(warp_sync_config), + Some(warp_protocol_config.name.clone()), + ) + .unwrap(), + ); + let (engine, sync_service, block_announce_config) = sc_network_sync::engine::SyncingEngine::new( Roles::from(if config.is_authority { &Role::Authority } else { &Role::Full }), @@ -915,12 +937,10 @@ pub trait TestNetFactory: Default + Sized + Send { protocol_id.clone(), &fork_id, block_announce_validator, - Some(warp_sync_config), + syncing_strategy, chain_sync_network_handle, import_queue.service(), block_relay_params.downloader, - state_request_protocol_config.name.clone(), - Some(warp_protocol_config.name.clone()), peer_store_handle.clone(), ) .unwrap(); diff --git a/substrate/client/network/test/src/service.rs b/substrate/client/network/test/src/service.rs index a5cee97531ca..ad2d1d9ec24d 100644 --- a/substrate/client/network/test/src/service.rs +++ b/substrate/client/network/test/src/service.rs @@ -34,6 +34,7 @@ use sc_network_sync::{ engine::SyncingEngine, service::network::{NetworkServiceHandle, NetworkServiceProvider}, state_request_handler::StateRequestHandler, + strategy::{PolkadotSyncingStrategy, SyncingConfig}, }; use sp_blockchain::HeaderBackend; use sp_runtime::traits::{Block as BlockT, Zero}; @@ -202,6 +203,18 @@ impl TestNetworkBuilder { let peer_store_handle: Arc = Arc::new(peer_store.handle()); tokio::spawn(peer_store.run().boxed()); + let syncing_config = SyncingConfig { + mode: network_config.sync_mode, + max_parallel_downloads: network_config.max_parallel_downloads, + max_blocks_per_request: network_config.max_blocks_per_request, + metrics_registry: None, + state_request_protocol_name: state_request_protocol_config.name.clone(), + }; + // Initialize syncing strategy. + let syncing_strategy = Box::new( + PolkadotSyncingStrategy::new(syncing_config, client.clone(), None, None).unwrap(), + ); + let (engine, chain_sync_service, block_announce_config) = SyncingEngine::new( Roles::from(&config::Role::Full), client.clone(), @@ -211,12 +224,10 @@ impl TestNetworkBuilder { protocol_id.clone(), &None, Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator), - None, + syncing_strategy, chain_sync_network_handle, import_queue.service(), block_relay_params.downloader, - state_request_protocol_config.name.clone(), - None, Arc::clone(&peer_store_handle), ) .unwrap(); diff --git a/substrate/client/service/src/builder.rs b/substrate/client/service/src/builder.rs index 28a76847ac06..f27b7ec6fbad 100644 --- a/substrate/client/service/src/builder.rs +++ b/substrate/client/service/src/builder.rs @@ -42,7 +42,7 @@ use sc_executor::{ }; use sc_keystore::LocalKeystore; use sc_network::{ - config::{FullNetworkConfiguration, SyncMode}, + config::{FullNetworkConfiguration, ProtocolId, SyncMode}, multiaddr::Protocol, service::{ traits::{PeerStore, RequestResponseConfig}, @@ -53,10 +53,14 @@ use sc_network::{ use sc_network_common::role::Roles; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ - block_relay_protocol::BlockRelayParams, block_request_handler::BlockRequestHandler, - engine::SyncingEngine, service::network::NetworkServiceProvider, + block_relay_protocol::BlockRelayParams, + block_request_handler::BlockRequestHandler, + engine::SyncingEngine, + service::network::NetworkServiceProvider, state_request_handler::StateRequestHandler, - warp_request_handler::RequestHandler as WarpSyncRequestHandler, SyncingService, WarpSyncConfig, + strategy::{PolkadotSyncingStrategy, SyncingConfig, SyncingStrategy}, + warp_request_handler::RequestHandler as WarpSyncRequestHandler, + SyncingService, WarpSyncConfig, }; use sc_rpc::{ author::AuthorApiServer, @@ -777,65 +781,63 @@ where } /// Parameters to pass into `build_network`. -pub struct BuildNetworkParams< - 'a, - TBl: BlockT, - TNet: NetworkBackend::Hash>, - TExPool, - TImpQu, - TCl, -> { +pub struct BuildNetworkParams<'a, Block, Net, TxPool, IQ, Client> +where + Block: BlockT, + Net: NetworkBackend::Hash>, +{ /// The service configuration. pub config: &'a Configuration, /// Full network configuration. - pub net_config: FullNetworkConfiguration::Hash, TNet>, + pub net_config: FullNetworkConfiguration::Hash, Net>, /// A shared client returned by `new_full_parts`. - pub client: Arc, + pub client: Arc, /// A shared transaction pool. - pub transaction_pool: Arc, + pub transaction_pool: Arc, /// A handle for spawning tasks. pub spawn_handle: SpawnTaskHandle, /// An import queue. - pub import_queue: TImpQu, + pub import_queue: IQ, /// A block announce validator builder. - pub block_announce_validator_builder: - Option) -> Box + Send> + Send>>, - /// Optional warp sync config. - pub warp_sync_config: Option>, + pub block_announce_validator_builder: Option< + Box) -> Box + Send> + Send>, + >, + /// Syncing strategy to use in syncing engine. + pub syncing_strategy: Box>, /// User specified block relay params. If not specified, the default /// block request handler will be used. - pub block_relay: Option>, + pub block_relay: Option>, /// Metrics. pub metrics: NotificationMetrics, } /// Build the network service, the network status sinks and an RPC sender. -pub fn build_network( - params: BuildNetworkParams, +pub fn build_network( + params: BuildNetworkParams, ) -> Result< ( Arc, - TracingUnboundedSender>, - sc_network_transactions::TransactionsHandlerController<::Hash>, + TracingUnboundedSender>, + sc_network_transactions::TransactionsHandlerController<::Hash>, NetworkStarter, - Arc>, + Arc>, ), Error, > where - TBl: BlockT, - TCl: ProvideRuntimeApi - + HeaderMetadata - + Chain - + BlockBackend - + BlockIdTo - + ProofProvider - + HeaderBackend - + BlockchainEvents + Block: BlockT, + Client: ProvideRuntimeApi + + HeaderMetadata + + Chain + + BlockBackend + + BlockIdTo + + ProofProvider + + HeaderBackend + + BlockchainEvents + 'static, - TExPool: TransactionPool::Hash> + 'static, - TImpQu: ImportQueue + 'static, - TNet: NetworkBackend::Hash>, + TxPool: TransactionPool::Hash> + 'static, + IQ: ImportQueue + 'static, + Net: NetworkBackend::Hash>, { let BuildNetworkParams { config, @@ -845,30 +847,13 @@ where spawn_handle, import_queue, block_announce_validator_builder, - warp_sync_config, + syncing_strategy, block_relay, metrics, } = params; - if warp_sync_config.is_none() && config.network.sync_mode.is_warp() { - return Err("Warp sync enabled, but no warp sync provider configured.".into()) - } - - if client.requires_full_sync() { - match config.network.sync_mode { - SyncMode::LightState { .. } => - return Err("Fast sync doesn't work for archive nodes".into()), - SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()), - SyncMode::Full => {}, - } - } - let protocol_id = config.protocol_id(); - let genesis_hash = client - .block_hash(0u32.into()) - .ok() - .flatten() - .expect("Genesis block exists; qed"); + let genesis_hash = client.info().genesis_hash; let block_announce_validator = if let Some(f) = block_announce_validator_builder { f(client.clone()) @@ -882,7 +867,7 @@ where None => { // Custom protocol was not specified, use the default block handler. // Allow both outgoing and incoming requests. - let params = BlockRequestHandler::new::( + let params = BlockRequestHandler::new::( chain_sync_network_handle.clone(), &protocol_id, config.chain_spec.fork_id(), @@ -897,42 +882,9 @@ where block_server.run().await; }); - let (state_request_protocol_config, state_request_protocol_name) = { - let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize + - net_config.network_config.default_peers_set.reserved_nodes.len(); - // Allow both outgoing and incoming requests. - let (handler, protocol_config) = StateRequestHandler::new::( - &protocol_id, - config.chain_spec.fork_id(), - client.clone(), - num_peer_hint, - ); - let config_name = protocol_config.protocol_name().clone(); - - spawn_handle.spawn("state-request-handler", Some("networking"), handler.run()); - (protocol_config, config_name) - }; - - let (warp_sync_protocol_config, warp_request_protocol_name) = match warp_sync_config.as_ref() { - Some(WarpSyncConfig::WithProvider(warp_with_provider)) => { - // Allow both outgoing and incoming requests. - let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, TNet>( - protocol_id.clone(), - genesis_hash, - config.chain_spec.fork_id(), - warp_with_provider.clone(), - ); - let config_name = protocol_config.protocol_name().clone(); - - spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); - (Some(protocol_config), Some(config_name)) - }, - _ => (None, None), - }; - let light_client_request_protocol_config = { // Allow both outgoing and incoming requests. - let (handler, protocol_config) = LightClientRequestHandler::new::( + let (handler, protocol_config) = LightClientRequestHandler::new::( &protocol_id, config.chain_spec.fork_id(), client.clone(), @@ -943,15 +895,10 @@ where // install request handlers to `FullNetworkConfiguration` net_config.add_request_response_protocol(block_request_protocol_config); - net_config.add_request_response_protocol(state_request_protocol_config); net_config.add_request_response_protocol(light_client_request_protocol_config); - if let Some(config) = warp_sync_protocol_config { - net_config.add_request_response_protocol(config); - } - let bitswap_config = config.network.ipfs_server.then(|| { - let (handler, config) = TNet::bitswap_server(client.clone()); + let (handler, config) = Net::bitswap_server(client.clone()); spawn_handle.spawn("bitswap-request-handler", Some("networking"), handler); config @@ -960,7 +907,7 @@ where // create transactions protocol and add it to the list of supported protocols of let peer_store_handle = net_config.peer_store_handle(); let (transactions_handler_proto, transactions_config) = - sc_network_transactions::TransactionsHandlerPrototype::new::<_, TBl, TNet>( + sc_network_transactions::TransactionsHandlerPrototype::new::<_, Block, Net>( protocol_id.clone(), genesis_hash, config.chain_spec.fork_id(), @@ -983,19 +930,16 @@ where protocol_id.clone(), &config.chain_spec.fork_id().map(ToOwned::to_owned), block_announce_validator, - warp_sync_config, + syncing_strategy, chain_sync_network_handle, import_queue.service(), block_downloader, - state_request_protocol_name, - warp_request_protocol_name, Arc::clone(&peer_store_handle), )?; let sync_service_import_queue = sync_service.clone(); let sync_service = Arc::new(sync_service); - let genesis_hash = client.hash(Zero::zero()).ok().flatten().expect("Genesis block exists; qed"); - let network_params = sc_network::config::Params::::Hash, TNet> { + let network_params = sc_network::config::Params::::Hash, Net> { role: config.role, executor: { let spawn_handle = Clone::clone(&spawn_handle); @@ -1005,7 +949,7 @@ where }, network_config: net_config, genesis_hash, - protocol_id: protocol_id.clone(), + protocol_id, fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned), metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()), block_announce_config, @@ -1014,7 +958,7 @@ where }; let has_bootnodes = !network_params.network_config.network_config.boot_nodes.is_empty(); - let network_mut = TNet::new(network_params)?; + let network_mut = Net::new(network_params)?; let network = network_mut.network_service().clone(); let (tx_handler, tx_handler_controller) = transactions_handler_proto.build( @@ -1041,7 +985,7 @@ where spawn_handle.spawn( "system-rpc-handler", Some("networking"), - build_system_rpc_future::<_, _, ::Hash>( + build_system_rpc_future::<_, _, ::Hash>( config.role, network_mut.network_service(), sync_service.clone(), @@ -1051,7 +995,7 @@ where ), ); - let future = build_network_future::<_, _, ::Hash, _>( + let future = build_network_future::<_, _, ::Hash, _>( network_mut, client, sync_service.clone(), @@ -1103,6 +1047,91 @@ where )) } +/// Build standard polkadot syncing strategy +pub fn build_polkadot_syncing_strategy( + protocol_id: ProtocolId, + fork_id: Option<&str>, + net_config: &mut FullNetworkConfiguration::Hash, Net>, + warp_sync_config: Option>, + client: Arc, + spawn_handle: &SpawnTaskHandle, + metrics_registry: Option<&Registry>, +) -> Result>, Error> +where + Block: BlockT, + Client: HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, + + Net: NetworkBackend::Hash>, +{ + if warp_sync_config.is_none() && net_config.network_config.sync_mode.is_warp() { + return Err("Warp sync enabled, but no warp sync provider configured.".into()) + } + + if client.requires_full_sync() { + match net_config.network_config.sync_mode { + SyncMode::LightState { .. } => + return Err("Fast sync doesn't work for archive nodes".into()), + SyncMode::Warp => return Err("Warp sync doesn't work for archive nodes".into()), + SyncMode::Full => {}, + } + } + + let genesis_hash = client.info().genesis_hash; + + let (state_request_protocol_config, state_request_protocol_name) = { + let num_peer_hint = net_config.network_config.default_peers_set_num_full as usize + + net_config.network_config.default_peers_set.reserved_nodes.len(); + // Allow both outgoing and incoming requests. + let (handler, protocol_config) = + StateRequestHandler::new::(&protocol_id, fork_id, client.clone(), num_peer_hint); + let config_name = protocol_config.protocol_name().clone(); + + spawn_handle.spawn("state-request-handler", Some("networking"), handler.run()); + (protocol_config, config_name) + }; + net_config.add_request_response_protocol(state_request_protocol_config); + + let (warp_sync_protocol_config, warp_sync_protocol_name) = match warp_sync_config.as_ref() { + Some(WarpSyncConfig::WithProvider(warp_with_provider)) => { + // Allow both outgoing and incoming requests. + let (handler, protocol_config) = WarpSyncRequestHandler::new::<_, Net>( + protocol_id, + genesis_hash, + fork_id, + warp_with_provider.clone(), + ); + let config_name = protocol_config.protocol_name().clone(); + + spawn_handle.spawn("warp-sync-request-handler", Some("networking"), handler.run()); + (Some(protocol_config), Some(config_name)) + }, + _ => (None, None), + }; + if let Some(config) = warp_sync_protocol_config { + net_config.add_request_response_protocol(config); + } + + let syncing_config = SyncingConfig { + mode: net_config.network_config.sync_mode, + max_parallel_downloads: net_config.network_config.max_parallel_downloads, + max_blocks_per_request: net_config.network_config.max_blocks_per_request, + metrics_registry: metrics_registry.cloned(), + state_request_protocol_name, + }; + Ok(Box::new(PolkadotSyncingStrategy::new( + syncing_config, + client, + warp_sync_config, + warp_sync_protocol_name, + )?)) +} + /// Object used to start the network. #[must_use] pub struct NetworkStarter(oneshot::Sender<()>); diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs index babb76f022f0..b6acdb8ed002 100644 --- a/substrate/client/service/src/lib.rs +++ b/substrate/client/service/src/lib.rs @@ -59,11 +59,11 @@ use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; pub use self::{ builder::{ - build_network, gen_rpc_module, init_telemetry, new_client, new_db_backend, new_full_client, - new_full_parts, new_full_parts_record_import, new_full_parts_with_genesis_builder, - new_wasm_executor, propagate_transaction_notifications, spawn_tasks, BuildNetworkParams, - KeystoreContainer, NetworkStarter, SpawnTasksParams, TFullBackend, TFullCallExecutor, - TFullClient, + build_network, build_polkadot_syncing_strategy, gen_rpc_module, init_telemetry, new_client, + new_db_backend, new_full_client, new_full_parts, new_full_parts_record_import, + new_full_parts_with_genesis_builder, new_wasm_executor, + propagate_transaction_notifications, spawn_tasks, BuildNetworkParams, KeystoreContainer, + NetworkStarter, SpawnTasksParams, TFullBackend, TFullCallExecutor, TFullClient, }, client::{ClientConfig, LocalCallExecutor}, error::Error, diff --git a/templates/minimal/node/src/service.rs b/templates/minimal/node/src/service.rs index a42eb10ccec6..08cd345f1e3e 100644 --- a/templates/minimal/node/src/service.rs +++ b/templates/minimal/node/src/service.rs @@ -15,12 +15,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::cli::Consensus; use futures::FutureExt; use minimal_template_runtime::{interface::OpaqueBlock as Block, RuntimeApi}; use polkadot_sdk::{ sc_client_api::backend::Backend, sc_executor::WasmExecutor, - sc_service::{error::Error as ServiceError, Configuration, TaskManager}, + sc_service::{ + build_polkadot_syncing_strategy, error::Error as ServiceError, Configuration, TaskManager, + }, sc_telemetry::{Telemetry, TelemetryWorker}, sc_transaction_pool_api::OffchainTransactionPoolFactory, sp_runtime::traits::Block as BlockT, @@ -28,8 +31,6 @@ use polkadot_sdk::{ }; use std::sync::Arc; -use crate::cli::Consensus; - type HostFunctions = sp_io::SubstrateHostFunctions; #[docify::export] @@ -120,7 +121,7 @@ pub fn new_full::Ha other: mut telemetry, } = new_partial(&config)?; - let net_config = sc_network::config::FullNetworkConfiguration::< + let mut net_config = sc_network::config::FullNetworkConfiguration::< Block, ::Hash, Network, @@ -132,6 +133,16 @@ pub fn new_full::Ha config.prometheus_config.as_ref().map(|cfg| &cfg.registry), ); + let syncing_strategy = build_polkadot_syncing_strategy( + config.protocol_id(), + config.chain_spec.fork_id(), + &mut net_config, + None, + client.clone(), + &task_manager.spawn_handle(), + config.prometheus_config.as_ref().map(|config| &config.registry), + )?; + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -141,7 +152,7 @@ pub fn new_full::Ha import_queue, net_config, block_announce_validator_builder: None, - warp_sync_config: None, + syncing_strategy, block_relay: None, metrics, })?; diff --git a/templates/solochain/node/src/service.rs b/templates/solochain/node/src/service.rs index 7d37c5ce87f8..2de543235ec8 100644 --- a/templates/solochain/node/src/service.rs +++ b/templates/solochain/node/src/service.rs @@ -4,7 +4,10 @@ use futures::FutureExt; use sc_client_api::{Backend, BlockBackend}; use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams}; use sc_consensus_grandpa::SharedVoterState; -use sc_service::{error::Error as ServiceError, Configuration, TaskManager, WarpSyncConfig}; +use sc_service::{ + build_polkadot_syncing_strategy, error::Error as ServiceError, Configuration, TaskManager, + WarpSyncConfig, +}; use sc_telemetry::{Telemetry, TelemetryWorker}; use sc_transaction_pool_api::OffchainTransactionPoolFactory; use solochain_template_runtime::{self, apis::RuntimeApi, opaque::Block}; @@ -166,6 +169,16 @@ pub fn new_full< Vec::default(), )); + let syncing_strategy = build_polkadot_syncing_strategy( + config.protocol_id(), + config.chain_spec.fork_id(), + &mut net_config, + Some(WarpSyncConfig::WithProvider(warp_sync)), + client.clone(), + &task_manager.spawn_handle(), + config.prometheus_config.as_ref().map(|config| &config.registry), + )?; + let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, @@ -175,7 +188,7 @@ pub fn new_full< spawn_handle: task_manager.spawn_handle(), import_queue, block_announce_validator_builder: None, - warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)), + syncing_strategy, block_relay: None, metrics, })?;