From 6be8bbf90034609884a338427aedf0354e982135 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 13 Nov 2023 09:18:51 +0200 Subject: [PATCH 1/6] Rename `service/chain_sync.rs` -> `service/engine.rs` --- substrate/client/network/sync/src/engine.rs | 2 +- substrate/client/network/sync/src/lib.rs | 2 +- .../network/sync/src/service/{chain_sync.rs => engine.rs} | 6 +++--- substrate/client/network/sync/src/service/mod.rs | 2 +- substrate/client/network/test/src/lib.rs | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) rename substrate/client/network/sync/src/service/{chain_sync.rs => engine.rs} (98%) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 58a9fdc49f20..64bcc9cbfa7b 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -30,7 +30,7 @@ use crate::{ schema::v1::{StateRequest, StateResponse}, service::{ self, - chain_sync::{SyncingService, ToServiceCommand}, + engine::{SyncingService, ToServiceCommand}, }, types::{ BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent, diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index c42b0601e659..37be42e1082d 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -18,7 +18,7 @@ //! Blockchain syncing implementation in Substrate. -pub use service::chain_sync::SyncingService; +pub use service::engine::SyncingService; pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider}; mod block_announce_validator; diff --git a/substrate/client/network/sync/src/service/chain_sync.rs b/substrate/client/network/sync/src/service/engine.rs similarity index 98% rename from substrate/client/network/sync/src/service/chain_sync.rs rename to substrate/client/network/sync/src/service/engine.rs index 3d11880c511c..92d649d65dc3 100644 --- a/substrate/client/network/sync/src/service/chain_sync.rs +++ b/substrate/client/network/sync/src/service/engine.rs @@ -34,7 +34,7 @@ use std::{ }, }; -/// Commands send to `ChainSync` +/// Commands send to `SyncingEngine` pub enum ToServiceCommand { SetSyncForkRequest(Vec, B::Hash, NumberFor), RequestJustification(B::Hash, NumberFor), @@ -63,7 +63,7 @@ pub enum ToServiceCommand { // }, } -/// Handle for communicating with `ChainSync` asynchronously +/// Handle for communicating with `SyncingEngine` asynchronously #[derive(Clone)] pub struct SyncingService { tx: TracingUnboundedSender>, @@ -148,7 +148,7 @@ impl SyncingService { /// Get sync status /// - /// Returns an error if `ChainSync` has terminated. + /// Returns an error if `SyncingEngine` has terminated. pub async fn status(&self) -> Result, ()> { let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx)); diff --git a/substrate/client/network/sync/src/service/mod.rs b/substrate/client/network/sync/src/service/mod.rs index 18331d63ed29..d928785cce66 100644 --- a/substrate/client/network/sync/src/service/mod.rs +++ b/substrate/client/network/sync/src/service/mod.rs @@ -18,6 +18,6 @@ //! `ChainSync`-related service code -pub mod chain_sync; +pub mod engine; pub mod mock; pub mod network; diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index f869e3a171a3..03eff4686444 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -64,7 +64,7 @@ use sc_network_common::role::Roles; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ block_request_handler::BlockRequestHandler, - service::{chain_sync::SyncingService, network::NetworkServiceProvider}, + service::{engine::SyncingService, network::NetworkServiceProvider}, state_request_handler::StateRequestHandler, warp::{ AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider, From 5814187daf273efe5a4c9a875c3af008b5968f58 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 13 Nov 2023 09:56:00 +0200 Subject: [PATCH 2/6] Get rid of `ChainSync::is_peer_known` --- substrate/client/network/sync/src/chain_sync.rs | 5 ----- substrate/client/network/sync/src/engine.rs | 6 +++--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 2adc6d423415..094d4ed9fd1b 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -1420,11 +1420,6 @@ where .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) } - /// Check if the peer is known to the sync state machine. Used for sanity checks. - pub fn is_peer_known(&self, peer_id: &PeerId) -> bool { - self.peers.contains_key(peer_id) - } - /// Get the set of downloaded blocks that are ready to be queued for import. fn ready_blocks(&mut self) -> Vec> { self.blocks diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 64bcc9cbfa7b..3ff8df810d87 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -1123,7 +1123,7 @@ where } fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest) { - if !self.chain_sync.is_peer_known(&peer_id) { + if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}"); debug_assert!(false); return @@ -1139,7 +1139,7 @@ where } fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) { - if !self.chain_sync.is_peer_known(&peer_id) { + if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}"); debug_assert!(false); return @@ -1169,7 +1169,7 @@ where } fn send_warp_sync_request(&mut self, peer_id: PeerId, request: WarpProofRequest) { - if !self.chain_sync.is_peer_known(&peer_id) { + if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}"); debug_assert!(false); return From 0582a05298562cff18730f3feadc99c06ebbe008 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 14 Nov 2023 12:44:19 +0200 Subject: [PATCH 3/6] Make `ChainSync` only return actions (no separate `..._requests` functions --- .../client/network/sync/src/chain_sync.rs | 38 +++++++++++++-- substrate/client/network/sync/src/engine.rs | 47 +++++++++---------- substrate/client/network/sync/src/warp.rs | 2 +- 3 files changed, 56 insertions(+), 31 deletions(-) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 094d4ed9fd1b..8cbcf94c376f 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -191,6 +191,10 @@ pub enum ChainSyncAction { SendBlockRequest { peer_id: PeerId, request: BlockRequest }, /// Drop stale block request. CancelBlockRequest { peer_id: PeerId }, + /// Send state request to peer. + SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, + /// Send warp proof request to peer. + SendWarpProofRequest { peer_id: PeerId, request: WarpProofRequest }, /// Peer misbehaved. Disconnect, report it and cancel the block request to it. DropPeer(BadPeer), /// Import blocks. @@ -1532,7 +1536,7 @@ where } /// Get justification requests scheduled by sync to be sent out. - pub fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { + fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { let peers = &mut self.peers; let mut matcher = self.extra_justifications.matcher(); std::iter::from_fn(move || { @@ -1559,7 +1563,7 @@ where } /// Get block requests scheduled by sync to be sent out. - pub fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { + fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { if self.mode == SyncMode::Warp { return self .warp_target_block_request() @@ -1686,7 +1690,7 @@ where } /// Get a state request scheduled by sync to be sent out (if any). - pub fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { + fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { if self.allowed_requests.is_empty() { return None } @@ -1732,7 +1736,7 @@ where } /// Get a warp proof request scheduled by sync to be sent out (if any). - pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest)> { + fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest)> { if let Some(sync) = &self.warp_sync { if self.allowed_requests.is_empty() || sync.is_complete() || @@ -2020,7 +2024,31 @@ where /// Get pending actions to perform. #[must_use] - pub fn take_actions(&mut self) -> impl Iterator> { + pub fn actions(&mut self) -> impl Iterator> { + let block_requests = self + .block_requests() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); + self.actions.extend(block_requests); + + let justification_requests = self + .justification_requests() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); + self.actions.extend(justification_requests); + + let state_request = self + .state_request() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendStateRequest { peer_id, request }); + self.actions.extend(state_request); + + let warp_proof_request = self + .warp_sync_request() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendWarpProofRequest { peer_id, request }); + self.actions.extend(warp_proof_request); + std::mem::take(&mut self.actions).into_iter() } } diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 3ff8df810d87..399a4a57803d 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -713,16 +713,13 @@ where self.is_major_syncing .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); - // Process actions requested by `ChainSync` during `select!`. + // Process actions requested by `ChainSync`. self.process_chain_sync_actions(); - - // Send outbound requests on `ChanSync`'s behalf. - self.send_chain_sync_requests(); } } fn process_chain_sync_actions(&mut self) { - self.chain_sync.take_actions().for_each(|action| match action { + self.chain_sync.actions().for_each(|action| match action { ChainSyncAction::SendBlockRequest { peer_id, request } => { // Sending block request implies dropping obsolete pending response as we are not // interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]). @@ -741,7 +738,25 @@ where ChainSyncAction::CancelBlockRequest { peer_id } => { let removed = self.pending_responses.remove(&peer_id); - trace!(target: LOG_TARGET, "Processed {action:?}., response removed: {removed}."); + trace!(target: LOG_TARGET, "Processed {action:?}, response removed: {removed}."); + }, + ChainSyncAction::SendStateRequest { peer_id, request } => { + self.send_state_request(peer_id, request); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.", + ); + }, + ChainSyncAction::SendWarpProofRequest { peer_id, request } => { + self.send_warp_proof_request(peer_id, request.clone()); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::SendWarpProofRequest` to {}, request: {:?}.", + peer_id, + request, + ); }, ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => { self.pending_responses.remove(&peer_id); @@ -1104,24 +1119,6 @@ where Ok(()) } - fn send_chain_sync_requests(&mut self) { - for (peer_id, request) in self.chain_sync.block_requests() { - self.send_block_request(peer_id, request); - } - - if let Some((peer_id, request)) = self.chain_sync.state_request() { - self.send_state_request(peer_id, request); - } - - for (peer_id, request) in self.chain_sync.justification_requests() { - self.send_block_request(peer_id, request); - } - - if let Some((peer_id, request)) = self.chain_sync.warp_sync_request() { - self.send_warp_sync_request(peer_id, request); - } - } - fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest) { if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}"); @@ -1168,7 +1165,7 @@ where } } - fn send_warp_sync_request(&mut self, peer_id: PeerId, request: WarpProofRequest) { + fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest) { if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}"); debug_assert!(false); diff --git a/substrate/client/network/sync/src/warp.rs b/substrate/client/network/sync/src/warp.rs index 2c0adc856c12..169b3de35aa1 100644 --- a/substrate/client/network/sync/src/warp.rs +++ b/substrate/client/network/sync/src/warp.rs @@ -42,7 +42,7 @@ const LOG_TARGET: &'static str = "sync"; pub struct EncodedProof(pub Vec); /// Warp sync request -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, Clone)] pub struct WarpProofRequest { /// Start collecting proofs from this block. pub begin: B::Hash, From 6b59d34de3d4fde5d8a1f591270b1c62a024b004 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 14 Nov 2023 16:08:09 +0200 Subject: [PATCH 4/6] minor: fix tests compilation --- substrate/client/network/sync/src/chain_sync.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 8cbcf94c376f..3825cfa33f73 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -2051,6 +2051,13 @@ where std::mem::take(&mut self.actions).into_iter() } + + /// A version of `actions()` that doesn't schedule extra requests. For testing only. + #[cfg(test)] + #[must_use] + fn take_actions(&mut self) -> impl Iterator> { + std::mem::take(&mut self.actions).into_iter() + } } // This is purely during a backwards compatible transitionary period and should be removed From 28d7bbe2f0af8400e5005638dfea40ad0c1222f0 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 14 Nov 2023 18:22:20 +0200 Subject: [PATCH 5/6] Apply suggestions from code review Co-authored-by: Sebastian Kunert --- substrate/client/network/sync/src/service/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/sync/src/service/mod.rs b/substrate/client/network/sync/src/service/mod.rs index d928785cce66..dbe1bd59bb81 100644 --- a/substrate/client/network/sync/src/service/mod.rs +++ b/substrate/client/network/sync/src/service/mod.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! `ChainSync`-related service code +//! `SyncingEngine`-related service code pub mod engine; pub mod mock; From 7c21d529a2bef4b07c4caf861c7cc4e6cec44153 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 14 Nov 2023 18:33:50 +0200 Subject: [PATCH 6/6] Rename `service/engine.rs` -> `service/syncing_service.rs` --- substrate/client/network/sync/src/engine.rs | 2 +- substrate/client/network/sync/src/lib.rs | 2 +- substrate/client/network/sync/src/service/mod.rs | 2 +- .../network/sync/src/service/{engine.rs => syncing_service.rs} | 0 substrate/client/network/test/src/lib.rs | 2 +- 5 files changed, 4 insertions(+), 4 deletions(-) rename substrate/client/network/sync/src/service/{engine.rs => syncing_service.rs} (100%) diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 399a4a57803d..2cb8eab22f7a 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -30,7 +30,7 @@ use crate::{ schema::v1::{StateRequest, StateResponse}, service::{ self, - engine::{SyncingService, ToServiceCommand}, + syncing_service::{SyncingService, ToServiceCommand}, }, types::{ BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent, diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index 37be42e1082d..1a7e773c95f7 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -18,7 +18,7 @@ //! Blockchain syncing implementation in Substrate. -pub use service::engine::SyncingService; +pub use service::syncing_service::SyncingService; pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider}; mod block_announce_validator; diff --git a/substrate/client/network/sync/src/service/mod.rs b/substrate/client/network/sync/src/service/mod.rs index dbe1bd59bb81..d045af26e70d 100644 --- a/substrate/client/network/sync/src/service/mod.rs +++ b/substrate/client/network/sync/src/service/mod.rs @@ -18,6 +18,6 @@ //! `SyncingEngine`-related service code -pub mod engine; pub mod mock; pub mod network; +pub mod syncing_service; diff --git a/substrate/client/network/sync/src/service/engine.rs b/substrate/client/network/sync/src/service/syncing_service.rs similarity index 100% rename from substrate/client/network/sync/src/service/engine.rs rename to substrate/client/network/sync/src/service/syncing_service.rs diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 03eff4686444..cfc3cb7af3fc 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -64,7 +64,7 @@ use sc_network_common::role::Roles; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ block_request_handler::BlockRequestHandler, - service::{engine::SyncingService, network::NetworkServiceProvider}, + service::{network::NetworkServiceProvider, syncing_service::SyncingService}, state_request_handler::StateRequestHandler, warp::{ AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider,