Skip to content

Commit

Permalink
Integrate litep2p into Polkadot SDK (paritytech#2944)
Browse files Browse the repository at this point in the history
[litep2p](https://github.com/altonen/litep2p) is a libp2p-compatible P2P
networking library. It supports all of the features of `rust-libp2p`
that are currently being utilized by Polkadot SDK.

Compared to `rust-libp2p`, `litep2p` has a quite different architecture
which is why the new `litep2p` network backend is only able to use a
little of the existing code in `sc-network`. The design has been mainly
influenced by how we'd wish to structure our networking-related code in
Polkadot SDK: independent higher-levels protocols directly communicating
with the network over links that support bidirectional backpressure. A
good example would be `NotificationHandle`/`RequestResponseHandle`
abstractions which allow, e.g., `SyncingEngine` to directly communicate
with peers to announce/request blocks.

I've tried running `polkadot --network-backend litep2p` with a few
different peer configurations and there is a noticeable reduction in
networking CPU usage. For high load (`--out-peers 200`), networking CPU
usage goes down from ~110% to ~30% (80 pp) and for normal load
(`--out-peers 40`), the usage goes down from ~55% to ~18% (37 pp).

These should not be taken as final numbers because:

a) there are still some low-hanging optimization fruits, such as
enabling [receive window
auto-tuning](libp2p/rust-yamux#176), integrating
`Peerset` more closely with `litep2p` or improving memory usage of the
WebSocket transport
b) fixing bugs/instabilities that incorrectly cause `litep2p` to do less
work will increase the networking CPU usage
c) verification in a more diverse set of tests/conditions is needed

Nevertheless, these numbers should give an early estimate for CPU usage
of the new networking backend.

This PR consists of three separate changes:
* introduce a generic `PeerId` (wrapper around `Multihash`) so that we
don't have use `NetworkService::PeerId` in every part of the code that
uses a `PeerId`
* introduce `NetworkBackend` trait, implement it for the libp2p network
stack and make Polkadot SDK generic over `NetworkBackend`
  * implement `NetworkBackend` for litep2p

The new library should be considered experimental which is why
`rust-libp2p` will remain as the default option for the time being. This
PR currently depends on the master branch of `litep2p` but I'll cut a
new release for the library once all review comments have been
addresses.

---------

Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com>
Co-authored-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
4 people authored and dharjeezy committed Apr 9, 2024
1 parent 63a20c3 commit 83c11b8
Show file tree
Hide file tree
Showing 181 changed files with 11,052 additions and 1,859 deletions.
729 changes: 589 additions & 140 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ members = [
"substrate/client/mixnet",
"substrate/client/network",
"substrate/client/network-gossip",
"substrate/client/network/bitswap",
"substrate/client/network/common",
"substrate/client/network/light",
"substrate/client/network/statement",
"substrate/client/network/sync",
"substrate/client/network/test",
"substrate/client/network/transactions",
"substrate/client/network/types",
"substrate/client/offchain",
"substrate/client/proposer-metrics",
"substrate/client/rpc",
Expand Down
82 changes: 56 additions & 26 deletions cumulus/client/relay-chain-minimal-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ use polkadot_node_network_protocol::{
},
};

use polkadot_core_primitives::{Block as RelayBlock, Hash as RelayHash};
use polkadot_node_subsystem_util::metrics::prometheus::Registry;
use polkadot_primitives::CollatorPair;
use polkadot_service::{overseer::OverseerGenArgs, IsParachainNode};

use sc_authority_discovery::Service as AuthorityDiscoveryService;
use sc_network::{config::FullNetworkConfiguration, Event, NetworkEventStream, NetworkService};
use sc_network::{
config::FullNetworkConfiguration, service::traits::NetworkService, Event, NetworkBackend,
NetworkEventStream,
};
use sc_service::{config::PrometheusConfig, Configuration, TaskManager};
use sp_runtime::{app_crypto::Pair, traits::Block as BlockT};

Expand All @@ -51,7 +55,7 @@ fn build_authority_discovery_service<Block: BlockT>(
task_manager: &TaskManager,
client: Arc<BlockChainRpcClient>,
config: &Configuration,
network: Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
network: Arc<dyn NetworkService>,
prometheus_registry: Option<Registry>,
) -> AuthorityDiscoveryService {
let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht;
Expand All @@ -72,7 +76,7 @@ fn build_authority_discovery_service<Block: BlockT>(
..Default::default()
},
client,
network,
Arc::new(network.clone()),
Box::pin(dht_event_stream),
authority_discovery_role,
prometheus_registry,
Expand All @@ -92,12 +96,22 @@ async fn build_interface(
client: RelayChainRpcClient,
) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option<CollatorPair>)> {
let collator_pair = CollatorPair::generate().0;
let collator_node = new_minimal_relay_chain(
polkadot_config,
collator_pair.clone(),
Arc::new(BlockChainRpcClient::new(client.clone())),
)
.await?;
let collator_node = match polkadot_config.network.network_backend {
sc_network::config::NetworkBackendType::Libp2p =>
new_minimal_relay_chain::<RelayBlock, sc_network::NetworkWorker<RelayBlock, RelayHash>>(
polkadot_config,
collator_pair.clone(),
Arc::new(BlockChainRpcClient::new(client.clone())),
)
.await?,
sc_network::config::NetworkBackendType::Litep2p =>
new_minimal_relay_chain::<RelayBlock, sc_network::Litep2pNetworkBackend>(
polkadot_config,
collator_pair.clone(),
Arc::new(BlockChainRpcClient::new(client.clone())),
)
.await?,
};
task_manager.add_child(collator_node.task_manager);
Ok((
Arc::new(RelayChainRpcInterface::new(client, collator_node.overseer_handle)),
Expand Down Expand Up @@ -143,6 +157,7 @@ pub async fn build_minimal_relay_chain_node_light_client(

build_interface(polkadot_config, task_manager, client).await
}

/// Builds a minimal relay chain node. Chain data is fetched
/// via [`BlockChainRpcClient`] and fed into the overseer and its subsystems.
///
Expand All @@ -155,13 +170,18 @@ pub async fn build_minimal_relay_chain_node_light_client(
/// - NetworkBridgeTx
/// - RuntimeApi
#[sc_tracing::logging::prefix_logs_with("Relaychain")]
async fn new_minimal_relay_chain(
async fn new_minimal_relay_chain<Block: BlockT, Network: NetworkBackend<RelayBlock, RelayHash>>(
config: Configuration,
collator_pair: CollatorPair,
relay_chain_rpc_client: Arc<BlockChainRpcClient>,
) -> Result<NewMinimalNode, RelayChainError> {
let role = config.role.clone();
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
let mut net_config =
sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(&config.network);
let metrics = Network::register_notification_metrics(
config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
);
let peer_store_handle = net_config.peer_store_handle();

let prometheus_registry = config.prometheus_registry();
let task_manager = TaskManager::new(config.tokio_handle.clone(), prometheus_registry)?;
Expand All @@ -178,13 +198,18 @@ async fn new_minimal_relay_chain(
let peerset_protocol_names =
PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No };
let notification_services = peer_sets_info(is_authority, &peerset_protocol_names)
.into_iter()
.map(|(config, (peerset, service))| {
net_config.add_notification_protocol(config);
(peerset, service)
})
.collect::<std::collections::HashMap<PeerSet, Box<dyn sc_network::NotificationService>>>();
let notification_services = peer_sets_info::<_, Network>(
is_authority,
&peerset_protocol_names,
metrics.clone(),
Arc::clone(&peer_store_handle),
)
.into_iter()
.map(|(config, (peerset, service))| {
net_config.add_notification_protocol(config);
(peerset, service)
})
.collect::<std::collections::HashMap<PeerSet, Box<dyn sc_network::NotificationService>>>();

let request_protocol_names = ReqProtocolNames::new(genesis_hash, config.chain_spec.fork_id());
let (collation_req_v1_receiver, collation_req_v2_receiver, available_data_req_receiver) =
Expand All @@ -194,16 +219,17 @@ async fn new_minimal_relay_chain(
.chain_get_header(None)
.await?
.ok_or_else(|| RelayChainError::RpcCallError("Unable to fetch best header".to_string()))?;
let (network, network_starter, sync_service) = build_collator_network(
let (network, network_starter, sync_service) = build_collator_network::<Network>(
&config,
net_config,
task_manager.spawn_handle(),
genesis_hash,
best_header,
metrics,
)
.map_err(|e| RelayChainError::Application(Box::new(e) as Box<_>))?;

let authority_discovery_service = build_authority_discovery_service(
let authority_discovery_service = build_authority_discovery_service::<Block>(
&task_manager,
relay_chain_rpc_client.clone(),
&config,
Expand Down Expand Up @@ -236,24 +262,28 @@ async fn new_minimal_relay_chain(
Ok(NewMinimalNode { task_manager, overseer_handle })
}

fn build_request_response_protocol_receivers(
fn build_request_response_protocol_receivers<
Block: BlockT,
Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
>(
request_protocol_names: &ReqProtocolNames,
config: &mut FullNetworkConfiguration,
config: &mut FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Network>,
) -> (
IncomingRequestReceiver<v1::CollationFetchingRequest>,
IncomingRequestReceiver<v2::CollationFetchingRequest>,
IncomingRequestReceiver<v1::AvailableDataFetchingRequest>,
) {
let (collation_req_v1_receiver, cfg) =
IncomingRequest::get_config_receiver(request_protocol_names);
IncomingRequest::get_config_receiver::<_, Network>(request_protocol_names);
config.add_request_response_protocol(cfg);
let (collation_req_v2_receiver, cfg) =
IncomingRequest::get_config_receiver(request_protocol_names);
IncomingRequest::get_config_receiver::<_, Network>(request_protocol_names);
config.add_request_response_protocol(cfg);
let (available_data_req_receiver, cfg) =
IncomingRequest::get_config_receiver(request_protocol_names);
IncomingRequest::get_config_receiver::<_, Network>(request_protocol_names);
config.add_request_response_protocol(cfg);
let cfg = Protocol::ChunkFetchingV1.get_outbound_only_config(request_protocol_names);
let cfg =
Protocol::ChunkFetchingV1.get_outbound_only_config::<_, Network>(request_protocol_names);
config.add_request_response_protocol(cfg);
(collation_req_v1_receiver, collation_req_v2_receiver, available_data_req_receiver)
}
71 changes: 34 additions & 37 deletions cumulus/client/relay-chain-minimal-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,64 +15,56 @@
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use polkadot_core_primitives::{Block, Hash, Header};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use sp_runtime::traits::NumberFor;

use sc_network::{
config::{
NetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake,
ProtocolId, SetConfig,
NetworkConfiguration, NonReservedPeerMode, NotificationHandshake, PeerStore, ProtocolId,
SetConfig,
},
peer_store::PeerStore,
NetworkService,
peer_store::PeerStoreProvider,
service::traits::NetworkService,
NotificationMetrics,
};

use sc_network::{config::FullNetworkConfiguration, NotificationService};
use sc_network::{config::FullNetworkConfiguration, NetworkBackend, NotificationService};
use sc_network_common::{role::Roles, sync::message::BlockAnnouncesHandshake};
use sc_service::{error::Error, Configuration, NetworkStarter, SpawnTaskHandle};

use std::{iter, sync::Arc};

/// Build the network service, the network status sinks and an RPC sender.
pub(crate) fn build_collator_network(
pub(crate) fn build_collator_network<Network: NetworkBackend<Block, Hash>>(
config: &Configuration,
mut full_network_config: FullNetworkConfiguration,
mut network_config: FullNetworkConfiguration<Block, Hash, Network>,
spawn_handle: SpawnTaskHandle,
genesis_hash: Hash,
best_header: Header,
notification_metrics: NotificationMetrics,
) -> Result<
(
Arc<NetworkService<Block, Hash>>,
NetworkStarter,
Arc<dyn sp_consensus::SyncOracle + Send + Sync>,
),
(Arc<dyn NetworkService>, NetworkStarter, Arc<dyn sp_consensus::SyncOracle + Send + Sync>),
Error,
> {
let protocol_id = config.protocol_id();
let (block_announce_config, _notification_service) = get_block_announce_proto_config::<Block>(
let (block_announce_config, _notification_service) = get_block_announce_proto_config::<Network>(
protocol_id.clone(),
&None,
Roles::from(&config.role),
best_header.number,
best_header.hash(),
genesis_hash,
notification_metrics.clone(),
network_config.peer_store_handle(),
);

// Since this node has no syncing, we do not want light-clients to connect to it.
// Here we set any potential light-client slots to 0.
adjust_network_config_light_in_peers(&mut full_network_config.network_config);

let peer_store = PeerStore::new(
full_network_config
.network_config
.boot_nodes
.iter()
.map(|bootnode| bootnode.peer_id)
.collect(),
);
let peer_store_handle = peer_store.handle();
adjust_network_config_light_in_peers(&mut network_config.network_config);

let peer_store = network_config.take_peer_store();
spawn_handle.spawn("peer-store", Some("networking"), peer_store.run());

let network_params = sc_network::config::Params::<Block> {
let network_params = sc_network::config::Params::<Block, Hash, Network> {
role: config.role.clone(),
executor: {
let spawn_handle = Clone::clone(&spawn_handle);
Expand All @@ -81,16 +73,17 @@ pub(crate) fn build_collator_network(
})
},
fork_id: None,
network_config: full_network_config,
peer_store: peer_store_handle,
network_config,
genesis_hash,
protocol_id,
metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
block_announce_config,
bitswap_config: None,
notification_metrics,
};

let network_worker = sc_network::NetworkWorker::new(network_params)?;
let network_service = network_worker.service().clone();
let network_worker = Network::new(network_params)?;
let network_service = network_worker.network_service();

let (network_start_tx, network_start_rx) = futures::channel::oneshot::channel();

Expand Down Expand Up @@ -143,14 +136,16 @@ impl sp_consensus::SyncOracle for SyncOracle {
}
}

fn get_block_announce_proto_config<B: BlockT>(
fn get_block_announce_proto_config<Network: NetworkBackend<Block, Hash>>(
protocol_id: ProtocolId,
fork_id: &Option<String>,
roles: Roles,
best_number: NumberFor<B>,
best_hash: B::Hash,
genesis_hash: B::Hash,
) -> (NonDefaultSetConfig, Box<dyn NotificationService>) {
best_number: NumberFor<Block>,
best_hash: Hash,
genesis_hash: Hash,
metrics: NotificationMetrics,
peer_store_handle: Arc<dyn PeerStoreProvider>,
) -> (Network::NotificationProtocolConfig, Box<dyn NotificationService>) {
let block_announces_protocol = {
let genesis_hash = genesis_hash.as_ref();
if let Some(ref fork_id) = fork_id {
Expand All @@ -160,11 +155,11 @@ fn get_block_announce_proto_config<B: BlockT>(
}
};

NonDefaultSetConfig::new(
Network::notification_config(
block_announces_protocol.into(),
iter::once(format!("/{}/block-announces/1", protocol_id.as_ref()).into()).collect(),
1024 * 1024,
Some(NotificationHandshake::new(BlockAnnouncesHandshake::<B>::build(
Some(NotificationHandshake::new(BlockAnnouncesHandshake::<Block>::build(
roles,
best_number,
best_hash,
Expand All @@ -178,5 +173,7 @@ fn get_block_announce_proto_config<B: BlockT>(
reserved_nodes: Vec::new(),
non_reserved_mode: NonReservedPeerMode::Deny,
},
metrics,
peer_store_handle,
)
}
Loading

0 comments on commit 83c11b8

Please sign in to comment.