From 18165ebba88d41b4ba314685e5468ec0e809d799 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Wed, 15 Nov 2023 10:33:58 +0200 Subject: [PATCH] Unify `ChainSync` actions under one enum (follow-up) (#2317) Get rid of public `ChainSync::..._requests()` functions and return all requests as actions. --------- Co-authored-by: Sebastian Kunert --- .../client/network/sync/src/chain_sync.rs | 50 +++++++++++++---- substrate/client/network/sync/src/engine.rs | 55 +++++++++---------- substrate/client/network/sync/src/lib.rs | 2 +- .../client/network/sync/src/service/mod.rs | 4 +- .../{chain_sync.rs => syncing_service.rs} | 6 +- substrate/client/network/sync/src/warp.rs | 2 +- substrate/client/network/test/src/lib.rs | 2 +- 7 files changed, 74 insertions(+), 47 deletions(-) rename substrate/client/network/sync/src/service/{chain_sync.rs => syncing_service.rs} (98%) diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 2adc6d423415..3825cfa33f73 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -191,6 +191,10 @@ pub enum ChainSyncAction { SendBlockRequest { peer_id: PeerId, request: BlockRequest }, /// Drop stale block request. CancelBlockRequest { peer_id: PeerId }, + /// Send state request to peer. + SendStateRequest { peer_id: PeerId, request: OpaqueStateRequest }, + /// Send warp proof request to peer. + SendWarpProofRequest { peer_id: PeerId, request: WarpProofRequest }, /// Peer misbehaved. Disconnect, report it and cancel the block request to it. DropPeer(BadPeer), /// Import blocks. @@ -1420,11 +1424,6 @@ where .any(|(_, p)| p.state == PeerSyncState::DownloadingStale(*hash)) } - /// Check if the peer is known to the sync state machine. Used for sanity checks. - pub fn is_peer_known(&self, peer_id: &PeerId) -> bool { - self.peers.contains_key(peer_id) - } - /// Get the set of downloaded blocks that are ready to be queued for import. fn ready_blocks(&mut self) -> Vec> { self.blocks @@ -1537,7 +1536,7 @@ where } /// Get justification requests scheduled by sync to be sent out. - pub fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { + fn justification_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { let peers = &mut self.peers; let mut matcher = self.extra_justifications.matcher(); std::iter::from_fn(move || { @@ -1564,7 +1563,7 @@ where } /// Get block requests scheduled by sync to be sent out. - pub fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { + fn block_requests(&mut self) -> Vec<(PeerId, BlockRequest)> { if self.mode == SyncMode::Warp { return self .warp_target_block_request() @@ -1691,7 +1690,7 @@ where } /// Get a state request scheduled by sync to be sent out (if any). - pub fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { + fn state_request(&mut self) -> Option<(PeerId, OpaqueStateRequest)> { if self.allowed_requests.is_empty() { return None } @@ -1737,7 +1736,7 @@ where } /// Get a warp proof request scheduled by sync to be sent out (if any). - pub fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest)> { + fn warp_sync_request(&mut self) -> Option<(PeerId, WarpProofRequest)> { if let Some(sync) = &self.warp_sync { if self.allowed_requests.is_empty() || sync.is_complete() || @@ -2025,7 +2024,38 @@ where /// Get pending actions to perform. #[must_use] - pub fn take_actions(&mut self) -> impl Iterator> { + pub fn actions(&mut self) -> impl Iterator> { + let block_requests = self + .block_requests() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); + self.actions.extend(block_requests); + + let justification_requests = self + .justification_requests() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendBlockRequest { peer_id, request }); + self.actions.extend(justification_requests); + + let state_request = self + .state_request() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendStateRequest { peer_id, request }); + self.actions.extend(state_request); + + let warp_proof_request = self + .warp_sync_request() + .into_iter() + .map(|(peer_id, request)| ChainSyncAction::SendWarpProofRequest { peer_id, request }); + self.actions.extend(warp_proof_request); + + std::mem::take(&mut self.actions).into_iter() + } + + /// A version of `actions()` that doesn't schedule extra requests. For testing only. + #[cfg(test)] + #[must_use] + fn take_actions(&mut self) -> impl Iterator> { std::mem::take(&mut self.actions).into_iter() } } diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 58a9fdc49f20..2cb8eab22f7a 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -30,7 +30,7 @@ use crate::{ schema::v1::{StateRequest, StateResponse}, service::{ self, - chain_sync::{SyncingService, ToServiceCommand}, + syncing_service::{SyncingService, ToServiceCommand}, }, types::{ BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent, @@ -713,16 +713,13 @@ where self.is_major_syncing .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); - // Process actions requested by `ChainSync` during `select!`. + // Process actions requested by `ChainSync`. self.process_chain_sync_actions(); - - // Send outbound requests on `ChanSync`'s behalf. - self.send_chain_sync_requests(); } } fn process_chain_sync_actions(&mut self) { - self.chain_sync.take_actions().for_each(|action| match action { + self.chain_sync.actions().for_each(|action| match action { ChainSyncAction::SendBlockRequest { peer_id, request } => { // Sending block request implies dropping obsolete pending response as we are not // interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]). @@ -741,7 +738,25 @@ where ChainSyncAction::CancelBlockRequest { peer_id } => { let removed = self.pending_responses.remove(&peer_id); - trace!(target: LOG_TARGET, "Processed {action:?}., response removed: {removed}."); + trace!(target: LOG_TARGET, "Processed {action:?}, response removed: {removed}."); + }, + ChainSyncAction::SendStateRequest { peer_id, request } => { + self.send_state_request(peer_id, request); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::SendBlockRequest` to {peer_id}.", + ); + }, + ChainSyncAction::SendWarpProofRequest { peer_id, request } => { + self.send_warp_proof_request(peer_id, request.clone()); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::SendWarpProofRequest` to {}, request: {:?}.", + peer_id, + request, + ); }, ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => { self.pending_responses.remove(&peer_id); @@ -1104,26 +1119,8 @@ where Ok(()) } - fn send_chain_sync_requests(&mut self) { - for (peer_id, request) in self.chain_sync.block_requests() { - self.send_block_request(peer_id, request); - } - - if let Some((peer_id, request)) = self.chain_sync.state_request() { - self.send_state_request(peer_id, request); - } - - for (peer_id, request) in self.chain_sync.justification_requests() { - self.send_block_request(peer_id, request); - } - - if let Some((peer_id, request)) = self.chain_sync.warp_sync_request() { - self.send_warp_sync_request(peer_id, request); - } - } - fn send_block_request(&mut self, peer_id: PeerId, request: BlockRequest) { - if !self.chain_sync.is_peer_known(&peer_id) { + if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send block request to unknown peer {peer_id}"); debug_assert!(false); return @@ -1139,7 +1136,7 @@ where } fn send_state_request(&mut self, peer_id: PeerId, request: OpaqueStateRequest) { - if !self.chain_sync.is_peer_known(&peer_id) { + if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send state request to unknown peer {peer_id}"); debug_assert!(false); return @@ -1168,8 +1165,8 @@ where } } - fn send_warp_sync_request(&mut self, peer_id: PeerId, request: WarpProofRequest) { - if !self.chain_sync.is_peer_known(&peer_id) { + fn send_warp_proof_request(&mut self, peer_id: PeerId, request: WarpProofRequest) { + if !self.peers.contains_key(&peer_id) { trace!(target: LOG_TARGET, "Cannot send warp proof request to unknown peer {peer_id}"); debug_assert!(false); return diff --git a/substrate/client/network/sync/src/lib.rs b/substrate/client/network/sync/src/lib.rs index c42b0601e659..1a7e773c95f7 100644 --- a/substrate/client/network/sync/src/lib.rs +++ b/substrate/client/network/sync/src/lib.rs @@ -18,7 +18,7 @@ //! Blockchain syncing implementation in Substrate. -pub use service::chain_sync::SyncingService; +pub use service::syncing_service::SyncingService; pub use types::{SyncEvent, SyncEventStream, SyncState, SyncStatus, SyncStatusProvider}; mod block_announce_validator; diff --git a/substrate/client/network/sync/src/service/mod.rs b/substrate/client/network/sync/src/service/mod.rs index 18331d63ed29..d045af26e70d 100644 --- a/substrate/client/network/sync/src/service/mod.rs +++ b/substrate/client/network/sync/src/service/mod.rs @@ -16,8 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -//! `ChainSync`-related service code +//! `SyncingEngine`-related service code -pub mod chain_sync; pub mod mock; pub mod network; +pub mod syncing_service; diff --git a/substrate/client/network/sync/src/service/chain_sync.rs b/substrate/client/network/sync/src/service/syncing_service.rs similarity index 98% rename from substrate/client/network/sync/src/service/chain_sync.rs rename to substrate/client/network/sync/src/service/syncing_service.rs index 3d11880c511c..92d649d65dc3 100644 --- a/substrate/client/network/sync/src/service/chain_sync.rs +++ b/substrate/client/network/sync/src/service/syncing_service.rs @@ -34,7 +34,7 @@ use std::{ }, }; -/// Commands send to `ChainSync` +/// Commands send to `SyncingEngine` pub enum ToServiceCommand { SetSyncForkRequest(Vec, B::Hash, NumberFor), RequestJustification(B::Hash, NumberFor), @@ -63,7 +63,7 @@ pub enum ToServiceCommand { // }, } -/// Handle for communicating with `ChainSync` asynchronously +/// Handle for communicating with `SyncingEngine` asynchronously #[derive(Clone)] pub struct SyncingService { tx: TracingUnboundedSender>, @@ -148,7 +148,7 @@ impl SyncingService { /// Get sync status /// - /// Returns an error if `ChainSync` has terminated. + /// Returns an error if `SyncingEngine` has terminated. pub async fn status(&self) -> Result, ()> { let (tx, rx) = oneshot::channel(); let _ = self.tx.unbounded_send(ToServiceCommand::Status(tx)); diff --git a/substrate/client/network/sync/src/warp.rs b/substrate/client/network/sync/src/warp.rs index 2c0adc856c12..169b3de35aa1 100644 --- a/substrate/client/network/sync/src/warp.rs +++ b/substrate/client/network/sync/src/warp.rs @@ -42,7 +42,7 @@ const LOG_TARGET: &'static str = "sync"; pub struct EncodedProof(pub Vec); /// Warp sync request -#[derive(Encode, Decode, Debug)] +#[derive(Encode, Decode, Debug, Clone)] pub struct WarpProofRequest { /// Start collecting proofs from this block. pub begin: B::Hash, diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index f869e3a171a3..cfc3cb7af3fc 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -64,7 +64,7 @@ use sc_network_common::role::Roles; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; use sc_network_sync::{ block_request_handler::BlockRequestHandler, - service::{chain_sync::SyncingService, network::NetworkServiceProvider}, + service::{network::NetworkServiceProvider, syncing_service::SyncingService}, state_request_handler::StateRequestHandler, warp::{ AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncParams, WarpSyncProvider,