Skip to content

Commit

Permalink
Syncing strategy refactoring (part 2) (#5666)
Browse files Browse the repository at this point in the history
# Description

Follow-up to #5469 and
mostly covering #5333.

The primary change here is that syncing strategy is no longer created
inside of syncing engine, instead syncing strategy is an argument of
syncing engine, more specifically it is an argument to `build_network`
that most downstream users will use. This also extracts addition of
request-response protocols outside of network construction, making sure
they are physically not present when they don't need to be (imagine
syncing strategy that uses none of Substrate's protocols in its
implementation for example).

This technically allows to completely replace syncing strategy with
whatever strategy chain might need.

There will be at least one follow-up PR that will simplify
`SyncingStrategy` trait and other public interfaces to remove mentions
of block/state/warp sync requests, replacing them with generic APIs,
such that strategies where warp sync is not applicable don't have to
provide dummy method implementations, etc.

## Integration

Downstream projects will have to write a bit of boilerplate calling
`build_polkadot_syncing_strategy` function to create previously default
syncing strategy.

## Review Notes

Please review PR through individual commits rather than the final diff,
it will be easier that way. The changes are mostly just moving code
around one step at a time.

# Checklist

* [x] My PR includes a detailed description as outlined in the
"Description" and its two subsections above.
* [x] My PR follows the [labeling requirements](

https://github.com/paritytech/polkadot-sdk/blob/master/docs/contributor/CONTRIBUTING.md#Process
) of this project (at minimum one label for `T` required)
* External contributors: ask maintainers to put the right label on your
PR.
* [x] I have made corresponding changes to the documentation (if
applicable)
  • Loading branch information
nazar-pc authored Sep 17, 2024
1 parent 9307d99 commit 43cd6fd
Show file tree
Hide file tree
Showing 16 changed files with 562 additions and 286 deletions.
23 changes: 18 additions & 5 deletions cumulus/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ use sc_consensus::{
use sc_network::{config::SyncMode, service::traits::NetworkService, NetworkBackend};
use sc_network_sync::SyncingService;
use sc_network_transactions::TransactionsHandlerController;
use sc_service::{Configuration, NetworkStarter, SpawnTaskHandle, TaskManager, WarpSyncConfig};
use sc_service::{
build_polkadot_syncing_strategy, Configuration, NetworkStarter, SpawnTaskHandle, TaskManager,
WarpSyncConfig,
};
use sc_telemetry::{log, TelemetryWorkerHandle};
use sc_utils::mpsc::TracingUnboundedSender;
use sp_api::ProvideRuntimeApi;
Expand Down Expand Up @@ -425,7 +428,7 @@ pub struct BuildNetworkParams<
pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>(
BuildNetworkParams {
parachain_config,
net_config,
mut net_config,
client,
transaction_pool,
para_id,
Expand Down Expand Up @@ -462,7 +465,7 @@ where
IQ: ImportQueue<Block> + 'static,
Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
{
let warp_sync_params = match parachain_config.network.sync_mode {
let warp_sync_config = match parachain_config.network.sync_mode {
SyncMode::Warp => {
log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block...");

Expand Down Expand Up @@ -493,9 +496,19 @@ where
},
};
let metrics = Network::register_notification_metrics(
parachain_config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
);

let syncing_strategy = build_polkadot_syncing_strategy(
parachain_config.protocol_id(),
parachain_config.chain_spec.fork_id(),
&mut net_config,
warp_sync_config,
client.clone(),
&spawn_handle,
parachain_config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

sc_service::build_network(sc_service::BuildNetworkParams {
config: parachain_config,
net_config,
Expand All @@ -504,7 +517,7 @@ where
spawn_handle,
import_queue,
block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
warp_sync_config: warp_sync_params,
syncing_strategy,
block_relay: None,
metrics,
})
Expand Down
14 changes: 12 additions & 2 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
use prometheus_endpoint::Registry;
#[cfg(feature = "full-node")]
use sc_service::KeystoreContainer;
use sc_service::{RpcHandlers, SpawnTaskHandle};
use sc_service::{build_polkadot_syncing_strategy, RpcHandlers, SpawnTaskHandle};
use sc_telemetry::TelemetryWorker;
#[cfg(feature = "full-node")]
use sc_telemetry::{Telemetry, TelemetryWorkerHandle};
Expand Down Expand Up @@ -1028,6 +1028,16 @@ pub fn new_full<
})
};

let syncing_strategy = build_polkadot_syncing_strategy(
config.protocol_id(),
config.chain_spec.fork_id(),
&mut net_config,
Some(WarpSyncConfig::WithProvider(warp_sync)),
client.clone(),
&task_manager.spawn_handle(),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
Expand All @@ -1037,7 +1047,7 @@ pub fn new_full<
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
syncing_strategy,
block_relay: None,
metrics,
})?;
Expand Down
19 changes: 19 additions & 0 deletions prdoc/pr_5666.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
title: Make syncing strategy an argument of the syncing engine

doc:
- audience: Node Dev
description: |
Syncing strategy is no longer implicitly created when building network, but needs to be instantiated explicitly.
Previously default implementation can be created with new function `build_polkadot_syncing_strategy` or custom
syncing strategy could be implemented and used instead if desired, providing greater flexibility for chain
developers.

crates:
- name: cumulus-client-service
bump: patch
- name: polkadot-service
bump: patch
- name: sc-service
bump: major
- name: sc-network-sync
bump: major
13 changes: 12 additions & 1 deletion substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use frame_system_rpc_runtime_api::AccountNonceApi;
use futures::prelude::*;
use kitchensink_runtime::RuntimeApi;
use node_primitives::Block;
use polkadot_sdk::sc_service::build_polkadot_syncing_strategy;
use sc_client_api::{Backend, BlockBackend};
use sc_consensus_babe::{self, SlotProportion};
use sc_network::{
Expand Down Expand Up @@ -506,6 +507,16 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
Vec::default(),
));

let syncing_strategy = build_polkadot_syncing_strategy(
config.protocol_id(),
config.chain_spec.fork_id(),
&mut net_config,
Some(WarpSyncConfig::WithProvider(warp_sync)),
client.clone(),
&task_manager.spawn_handle(),
config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
sc_service::build_network(sc_service::BuildNetworkParams {
config: &config,
Expand All @@ -515,7 +526,7 @@ pub fn new_full_base<N: NetworkBackend<Block, <Block as BlockT>::Hash>>(
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)),
syncing_strategy,
block_relay: None,
metrics,
})?;
Expand Down
77 changes: 19 additions & 58 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ use crate::{
BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
},
block_relay_protocol::{BlockDownloader, BlockResponseError},
block_request_handler::MAX_BLOCKS_IN_RESPONSE,
pending_responses::{PendingResponses, ResponseEvent},
schema::v1::{StateRequest, StateResponse},
service::{
self,
syncing_service::{SyncingService, ToServiceCommand},
},
strategy::{
warp::{EncodedProof, WarpProofRequest, WarpSyncConfig},
PolkadotSyncingStrategy, StrategyKey, SyncingAction, SyncingConfig, SyncingStrategy,
warp::{EncodedProof, WarpProofRequest},
StrategyKey, SyncingAction, SyncingStrategy,
},
types::{
BadPeer, ExtendedPeerInfo, OpaqueStateRequest, OpaqueStateResponse, PeerRequest, SyncEvent,
Expand Down Expand Up @@ -189,7 +188,7 @@ pub struct Peer<B: BlockT> {

pub struct SyncingEngine<B: BlockT, Client> {
/// Syncing strategy.
strategy: PolkadotSyncingStrategy<B, Client>,
strategy: Box<dyn SyncingStrategy<B>>,

/// Blockchain client.
client: Arc<Client>,
Expand Down Expand Up @@ -271,12 +270,6 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// Block downloader
block_downloader: Arc<dyn BlockDownloader<B>>,

/// Protocol name used to send out state requests
state_request_protocol_name: ProtocolName,

/// Protocol name used to send out warp sync requests
warp_sync_protocol_name: Option<ProtocolName>,

/// Handle to import queue.
import_queue: Box<dyn ImportQueueService<B>>,
}
Expand All @@ -301,35 +294,15 @@ where
protocol_id: ProtocolId,
fork_id: &Option<String>,
block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
warp_sync_config: Option<WarpSyncConfig<B>>,
syncing_strategy: Box<dyn SyncingStrategy<B>>,
network_service: service::network::NetworkServiceHandle,
import_queue: Box<dyn ImportQueueService<B>>,
block_downloader: Arc<dyn BlockDownloader<B>>,
state_request_protocol_name: ProtocolName,
warp_sync_protocol_name: Option<ProtocolName>,
peer_store_handle: Arc<dyn PeerStoreProvider>,
) -> Result<(Self, SyncingService<B>, N::NotificationProtocolConfig), ClientError>
where
N: NetworkBackend<B, <B as BlockT>::Hash>,
{
let mode = net_config.network_config.sync_mode;
let max_parallel_downloads = net_config.network_config.max_parallel_downloads;
let max_blocks_per_request =
if net_config.network_config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
log::info!(
target: LOG_TARGET,
"clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
);
MAX_BLOCKS_IN_RESPONSE as u32
} else {
net_config.network_config.max_blocks_per_request
};
let syncing_config = SyncingConfig {
mode,
max_parallel_downloads,
max_blocks_per_request,
metrics_registry: metrics_registry.cloned(),
};
let cache_capacity = (net_config.network_config.default_peers_set.in_peers +
net_config.network_config.default_peers_set.out_peers)
.max(1);
Expand Down Expand Up @@ -388,10 +361,6 @@ where
Arc::clone(&peer_store_handle),
);

// Initialize syncing strategy.
let strategy =
PolkadotSyncingStrategy::new(syncing_config, client.clone(), warp_sync_config)?;

let block_announce_protocol_name = block_announce_config.protocol_name().clone();
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
let num_connected = Arc::new(AtomicUsize::new(0));
Expand All @@ -413,7 +382,7 @@ where
Self {
roles,
client,
strategy,
strategy: syncing_strategy,
network_service,
peers: HashMap::new(),
block_announce_data_cache: LruMap::new(ByLength::new(cache_capacity)),
Expand Down Expand Up @@ -450,8 +419,6 @@ where
},
pending_responses: PendingResponses::new(),
block_downloader,
state_request_protocol_name,
warp_sync_protocol_name,
import_queue,
},
SyncingService::new(tx, num_connected, is_major_syncing),
Expand Down Expand Up @@ -652,16 +619,16 @@ where
"Processed {action:?}, response removed: {removed}.",
);
},
SyncingAction::SendStateRequest { peer_id, key, request } => {
self.send_state_request(peer_id, key, request);
SyncingAction::SendStateRequest { peer_id, key, protocol_name, request } => {
self.send_state_request(peer_id, key, protocol_name, request);

trace!(
target: LOG_TARGET,
"Processed `ChainSyncAction::SendStateRequest` to {peer_id}.",
);
},
SyncingAction::SendWarpProofRequest { peer_id, key, request } => {
self.send_warp_proof_request(peer_id, key, request.clone());
SyncingAction::SendWarpProofRequest { peer_id, key, protocol_name, request } => {
self.send_warp_proof_request(peer_id, key, protocol_name, request.clone());

trace!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1054,6 +1021,7 @@ where
&mut self,
peer_id: PeerId,
key: StrategyKey,
protocol_name: ProtocolName,
request: OpaqueStateRequest,
) {
if !self.peers.contains_key(&peer_id) {
Expand All @@ -1070,7 +1038,7 @@ where
Ok(data) => {
self.network_service.start_request(
peer_id,
self.state_request_protocol_name.clone(),
protocol_name,
data,
tx,
IfDisconnected::ImmediateError,
Expand All @@ -1089,6 +1057,7 @@ where
&mut self,
peer_id: PeerId,
key: StrategyKey,
protocol_name: ProtocolName,
request: WarpProofRequest<B>,
) {
if !self.peers.contains_key(&peer_id) {
Expand All @@ -1101,21 +1070,13 @@ where

self.pending_responses.insert(peer_id, key, PeerRequest::WarpProof, rx.boxed());

match &self.warp_sync_protocol_name {
Some(name) => self.network_service.start_request(
peer_id,
name.clone(),
request.encode(),
tx,
IfDisconnected::ImmediateError,
),
None => {
log::warn!(
target: LOG_TARGET,
"Trying to send warp sync request when no protocol is configured {request:?}",
);
},
}
self.network_service.start_request(
peer_id,
protocol_name,
request.encode(),
tx,
IfDisconnected::ImmediateError,
);
}

fn encode_state_request(request: &OpaqueStateRequest) -> Result<Vec<u8>, String> {
Expand Down
Loading

0 comments on commit 43cd6fd

Please sign in to comment.