diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 6298f194e4f25..5b2f09dc9c035 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -6,6 +6,7 @@ use sc_client::LongestChain; use node_template_runtime::{self, GenesisConfig, opaque::Block, RuntimeApi}; use sc_service::{error::{Error as ServiceError}, AbstractService, Configuration, ServiceBuilder}; use sp_inherents::InherentDataProviders; +use sc_network::{construct_simple_protocol}; use sc_executor::native_executor_instance; pub use sc_executor::NativeExecutor; use sp_consensus_aura::sr25519::{AuthorityPair as AuraPair}; @@ -18,6 +19,11 @@ native_executor_instance!( node_template_runtime::native_version, ); +construct_simple_protocol! { + /// Demo protocol attachment for substrate. + pub struct NodeProtocol where Block = Block { } +} + /// Starts a `ServiceBuilder` for a full service. /// /// Use this macro if you don't actually need the full service, but just the builder in order to @@ -88,7 +94,7 @@ pub fn new_full(config: Configuration) import_setup.take() .expect("Link Half and Block Import are present for Full Services or setup failed before. qed"); - let service = builder + let service = builder.with_network_protocol(|_| Ok(NodeProtocol::new()))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, client)) as _) )? @@ -220,6 +226,7 @@ pub fn new_light(config: Configuration) Ok((import_queue, finality_proof_request_builder)) })? + .with_network_protocol(|_| Ok(NodeProtocol::new()))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, client)) as _) )? diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 70dd0521dece3..1dc5a061a3726 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -30,6 +30,7 @@ use sc_service::{ AbstractService, ServiceBuilder, config::Configuration, error::{Error as ServiceError}, }; use sp_inherents::InherentDataProviders; +use sc_network::construct_simple_protocol; use sc_service::{Service, NetworkStatus}; use sc_client::{Client, LocalCallExecutor}; @@ -39,6 +40,11 @@ use node_executor::NativeExecutor; use sc_network::NetworkService; use sc_offchain::OffchainWorkers; +construct_simple_protocol! { + /// Demo protocol attachment for substrate. + pub struct NodeProtocol where Block = Block { } +} + /// Starts a `ServiceBuilder` for a full service. /// /// Use this macro if you don't actually need the full service, but just the builder in order to @@ -138,7 +144,7 @@ macro_rules! new_full { let (builder, mut import_setup, inherent_data_providers) = new_full_start!($config); - let service = builder + let service = builder.with_network_protocol(|_| Ok(crate::service::NodeProtocol::new()))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, client)) as _) )? @@ -277,7 +283,7 @@ pub fn new_full(config: NodeConfiguration) ConcreteClient, LongestChain, NetworkStatus, - NetworkService::Hash>, + NetworkService::Hash>, ConcreteTransactionPool, OffchainWorkers< ConcreteClient, @@ -342,6 +348,7 @@ pub fn new_light(config: NodeConfiguration) Ok((import_queue, finality_proof_request_builder)) })? + .with_network_protocol(|_| Ok(NodeProtocol::new()))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, client)) as _) )? diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs index 92dcc264502f0..6260ac9a85b12 100644 --- a/client/authority-discovery/src/lib.rs +++ b/client/authority-discovery/src/lib.rs @@ -60,6 +60,7 @@ use libp2p::Multiaddr; use log::{debug, error, log_enabled, warn}; use prost::Message; use sc_client_api::blockchain::HeaderBackend; +use sc_network::specialization::NetworkSpecialization; use sc_network::{DhtEvent, ExHashT, NetworkStateInfo}; use sp_authority_discovery::{AuthorityDiscoveryApi, AuthorityId, AuthoritySignature, AuthorityPair}; use sp_core::crypto::{key_types, Pair}; @@ -476,9 +477,10 @@ pub trait NetworkProvider: NetworkStateInfo { fn get_value(&self, key: &libp2p::kad::record::Key); } -impl NetworkProvider for sc_network::NetworkService +impl NetworkProvider for sc_network::NetworkService where B: BlockT + 'static, + S: NetworkSpecialization, H: ExHashT, { fn set_priority_group( diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index 355c058645097..750e380a1a70e 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -886,10 +886,11 @@ mod tests { const SLOT_DURATION: u64 = 1000; pub struct AuraTestNet { - peers: Vec>, + peers: Vec>, } impl TestNetFactory for AuraTestNet { + type Specialization = DummySpecialization; type Verifier = AuraVerifier; type PeerData = (); @@ -923,15 +924,15 @@ mod tests { } } - fn peer(&mut self, i: usize) -> &mut Peer { + fn peer(&mut self, i: usize) -> &mut Peer { &mut self.peers[i] } - fn peers(&self) -> &Vec> { + fn peers(&self) -> &Vec> { &self.peers } - fn mut_peers>)>(&mut self, closure: F) { + fn mut_peers>)>(&mut self, closure: F) { closure(&mut self.peers); } } diff --git a/client/consensus/babe/src/tests.rs b/client/consensus/babe/src/tests.rs index 4045e18b5c3fb..d8696d59442cc 100644 --- a/client/consensus/babe/src/tests.rs +++ b/client/consensus/babe/src/tests.rs @@ -199,7 +199,7 @@ impl> BlockImport for PanickingBlockImport< } pub struct BabeTestNet { - peers: Vec>>, + peers: Vec, DummySpecialization>>, } type TestHeader = ::Header; @@ -236,6 +236,7 @@ pub struct PeerData { } impl TestNetFactory for BabeTestNet { + type Specialization = DummySpecialization; type Verifier = TestVerifier; type PeerData = Option; @@ -306,17 +307,17 @@ impl TestNetFactory for BabeTestNet { } } - fn peer(&mut self, i: usize) -> &mut Peer { + fn peer(&mut self, i: usize) -> &mut Peer { trace!(target: "babe", "Retrieving a peer"); &mut self.peers[i] } - fn peers(&self) -> &Vec> { + fn peers(&self) -> &Vec> { trace!(target: "babe", "Retrieving peers"); &self.peers } - fn mut_peers>)>( + fn mut_peers>)>( &mut self, closure: F, ) { diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index 1f4e22359712f..b5600c1c0d897 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -120,8 +120,9 @@ pub trait Network: GossipNetwork + Clone + Send + 'static fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); } -impl Network for Arc> where +impl Network for Arc> where B: BlockT, + S: sc_network::specialization::NetworkSpecialization, H: sc_network::ExHashT, { fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { diff --git a/client/finality-grandpa/src/tests.rs b/client/finality-grandpa/src/tests.rs index 16b50ccb9f226..5d01b257b6deb 100644 --- a/client/finality-grandpa/src/tests.rs +++ b/client/finality-grandpa/src/tests.rs @@ -19,7 +19,7 @@ use super::*; use environment::HasVoted; use sc_network_test::{ - Block, Hash, TestNetFactory, BlockImportAdapter, Peer, + Block, DummySpecialization, Hash, TestNetFactory, BlockImportAdapter, Peer, PeersClient, PassThroughVerifier, }; use sc_network::config::{ProtocolConfig, Roles, BoxFinalityProofRequestBuilder}; @@ -68,7 +68,7 @@ type PeerData = > > >; -type GrandpaPeer = Peer; +type GrandpaPeer = Peer; struct GrandpaTestNet { peers: Vec, @@ -90,6 +90,7 @@ impl GrandpaTestNet { } impl TestNetFactory for GrandpaTestNet { + type Specialization = DummySpecialization; type Verifier = PassThroughVerifier; type PeerData = PeerData; diff --git a/client/network-gossip/src/lib.rs b/client/network-gossip/src/lib.rs index 4e4d32366f29d..abb3f32972b0d 100644 --- a/client/network-gossip/src/lib.rs +++ b/client/network-gossip/src/lib.rs @@ -59,7 +59,7 @@ pub use self::state_machine::TopicNotification; pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext, ValidationResult}; use futures::prelude::*; -use sc_network::{Event, ExHashT, NetworkService, PeerId, ReputationChange}; +use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange}; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::{borrow::Cow, pin::Pin, sync::Arc}; @@ -97,7 +97,7 @@ pub trait Network { fn announce(&self, block: B::Hash, associated_data: Vec); } -impl Network for Arc> { +impl, H: ExHashT> Network for Arc> { fn event_stream(&self) -> Pin + Send>> { Box::pin(NetworkService::event_stream(self)) } diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index a03a6caa2f5ee..c8c5e59fe62cc 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -16,8 +16,9 @@ use crate::{ debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour, - Event, protocol::event::DhtEvent, ExHashT, + Event, protocol::event::DhtEvent }; +use crate::{ExHashT, specialization::NetworkSpecialization}; use crate::protocol::{self, light_client_handler, CustomMessageOutcome, Protocol}; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; @@ -32,9 +33,9 @@ use void; /// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] #[behaviour(out_event = "BehaviourOut", poll_method = "poll")] -pub struct Behaviour { +pub struct Behaviour, H: ExHashT> { /// All the substrate-specific protocols. - substrate: Protocol, + substrate: Protocol, /// Periodically pings and identifies the nodes we are connected to, and store information in a /// cache. debug_info: debug_info::DebugInfoBehaviour, @@ -57,10 +58,10 @@ pub enum BehaviourOut { Event(Event), } -impl Behaviour { +impl, H: ExHashT> Behaviour { /// Builds a new `Behaviour`. pub async fn new( - substrate: Protocol, + substrate: Protocol, user_agent: String, local_public_key: PublicKey, known_addresses: Vec<(PeerId, Multiaddr)>, @@ -106,12 +107,12 @@ impl Behaviour { } /// Returns a shared reference to the user protocol. - pub fn user_protocol(&self) -> &Protocol { + pub fn user_protocol(&self) -> &Protocol { &self.substrate } /// Returns a mutable reference to the user protocol. - pub fn user_protocol_mut(&mut self) -> &mut Protocol { + pub fn user_protocol_mut(&mut self) -> &mut Protocol { &mut self.substrate } @@ -132,15 +133,15 @@ impl Behaviour { } } -impl NetworkBehaviourEventProcess for -Behaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess for +Behaviour { fn inject_event(&mut self, event: void::Void) { void::unreachable(event) } } -impl NetworkBehaviourEventProcess> for -Behaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess> for +Behaviour { fn inject_event(&mut self, event: CustomMessageOutcome) { match event { CustomMessageOutcome::BlockImport(origin, blocks) => @@ -173,8 +174,8 @@ Behaviour { } } -impl NetworkBehaviourEventProcess - for Behaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, event: debug_info::DebugInfoEvent) { let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event; if info.listen_addrs.len() > 30 { @@ -191,8 +192,8 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess - for Behaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, out: DiscoveryOut) { match out { DiscoveryOut::UnroutablePeer(_peer_id) => { @@ -220,7 +221,7 @@ impl NetworkBehaviourEventProcess } } -impl Behaviour { +impl, H: ExHashT> Behaviour { fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll>> { if !self.events.is_empty() { return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))) diff --git a/client/network/src/config.rs b/client/network/src/config.rs index f5cad5977fc47..f6a3db4afe88e 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -43,7 +43,7 @@ use std::{error::Error, fs, io::{self, Write}, net::Ipv4Addr, path::{Path, PathB use zeroize::Zeroize; /// Network initialization parameters. -pub struct Params { +pub struct Params { /// Assigned roles for our node (full, light, ...). pub roles: Roles, @@ -88,6 +88,9 @@ pub struct Params { /// valid. pub import_queue: Box>, + /// Customization of the network. Use this to plug additional networking capabilities. + pub specialization: S, + /// Type to check incoming block announcements. pub block_announce_validator: Box + Send>, } diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index a5397a4e3e626..5da26b3346cd1 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -136,6 +136,10 @@ //! - Light-client requests. When a light client requires information, a random node we have a //! substream open with is chosen, and the information is requested from it. //! - Gossiping. Used for example by grandpa. +//! - Network specialization. The network protocol can be specialized through a template parameter +//! of the network service. This specialization is free to send and receive messages with the +//! remote. This is meant to be used by the chain that is being built on top of Substrate +//! (eg. Polkadot). //! //! It is intended that in the future each of these components gets more isolated, so that they //! are free to open and close their own substreams, and so that syncing and light client requests @@ -176,7 +180,7 @@ pub mod error; pub mod network_state; pub use service::{NetworkService, NetworkStateInfo, NetworkWorker, ExHashT, ReportHandle}; -pub use protocol::PeerInfo; +pub use protocol::{PeerInfo, Context, specialization}; pub use protocol::event::{Event, DhtEvent}; pub use protocol::sync::SyncState; pub use libp2p::{Multiaddr, PeerId}; @@ -192,6 +196,10 @@ pub use protocol::message::Status as StatusMessage; pub use sc_peerset::ReputationChange; +// Used by the `construct_simple_protocol!` macro. +#[doc(hidden)] +pub use sp_runtime::traits::Block as BlockT; + /// Extension trait for `NetworkBehaviour` that also accepts discovering nodes. trait DiscoveryNetBehaviour { /// Notify the protocol that we have learned about the existence of nodes. diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index d344321e68dd0..16b9327792b79 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -38,6 +38,7 @@ use sp_arithmetic::traits::SaturatedConversion; use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId}; use message::generic::Message as GenericMessage; use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData}; +use specialization::NetworkSpecialization; use sync::{ChainSync, SyncState}; use crate::service::{TransactionPool, ExHashT}; use crate::config::{BoxFinalityProofRequestBuilder, Roles}; @@ -72,6 +73,7 @@ pub mod message; pub mod event; pub mod light_client_handler; pub mod light_dispatch; +pub mod specialization; pub mod sync; pub use block_requests::BlockRequests; @@ -136,7 +138,7 @@ mod rep { } // Lock must always be taken in order declared here. -pub struct Protocol { +pub struct Protocol, H: ExHashT> { /// Interval at which we call `tick`. tick_timeout: Pin + Send>>, /// Interval at which we call `propagate_extrinsics`. @@ -146,6 +148,7 @@ pub struct Protocol { light_dispatch: LightDispatch, genesis_hash: B::Hash, sync: ChainSync, + specialization: S, context_data: ContextData, /// List of nodes for which we perform additional logging because they are important for the /// user. @@ -334,6 +337,55 @@ impl<'a, B: BlockT> LightDispatchNetwork for LightDispatchIn<'a> { } } +/// Context for a network-specific handler. +pub trait Context { + /// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or + /// irresponsible or appeared lazy. + fn report_peer(&mut self, who: PeerId, reputation: sc_peerset::ReputationChange); + + /// Force disconnecting from a peer. Use this when a peer misbehaved. + fn disconnect_peer(&mut self, who: PeerId); + + /// Send a chain-specific message to a peer. + fn send_chain_specific(&mut self, who: PeerId, message: Vec); +} + +/// Protocol context. +struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { + behaviour: &'a mut GenericProto, + context_data: &'a mut ContextData, + peerset_handle: &'a sc_peerset::PeersetHandle, +} + +impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { + fn new( + context_data: &'a mut ContextData, + behaviour: &'a mut GenericProto, + peerset_handle: &'a sc_peerset::PeersetHandle, + ) -> Self { + ProtocolContext { context_data, peerset_handle, behaviour } + } +} + +impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, H> { + fn report_peer(&mut self, who: PeerId, reputation: sc_peerset::ReputationChange) { + self.peerset_handle.report_peer(who, reputation) + } + + fn disconnect_peer(&mut self, who: PeerId) { + self.behaviour.disconnect_peer(&who) + } + + fn send_chain_specific(&mut self, who: PeerId, message: Vec) { + send_message:: ( + self.behaviour, + &mut self.context_data.stats, + &who, + GenericMessage::ChainSpecific(message) + ) + } +} + /// Data necessary to create a context. struct ContextData { // All connected peers @@ -360,19 +412,20 @@ impl Default for ProtocolConfig { } } -impl Protocol { +impl, H: ExHashT> Protocol { /// Create a new instance. pub fn new( config: ProtocolConfig, chain: Arc>, checker: Arc>, + specialization: S, transaction_pool: Arc>, finality_proof_provider: Option>>, finality_proof_request_builder: Option>, protocol_id: ProtocolId, peerset_config: sc_peerset::PeersetConfig, block_announce_validator: Box + Send> - ) -> error::Result<(Protocol, sc_peerset::PeersetHandle)> { + ) -> error::Result<(Protocol, sc_peerset::PeersetHandle)> { let info = chain.info(); let sync = ChainSync::new( config.roles, @@ -408,6 +461,7 @@ impl Protocol { light_dispatch: LightDispatch::new(checker), genesis_hash: info.genesis_hash, sync, + specialization, handshaking_peers: HashMap::new(), important_peers, transaction_pool, @@ -629,6 +683,11 @@ impl Protocol { CustomMessageOutcome::None }; }, + GenericMessage::ChainSpecific(msg) => self.specialization.on_message( + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), + who, + msg, + ), } CustomMessageOutcome::None @@ -653,6 +712,14 @@ impl Protocol { ); } + /// Locks `self` and returns a context plus the network specialization. + pub fn specialization_lock<'a>( + &'a mut self, + ) -> (impl Context + 'a, &'a mut S) { + let context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); + (context, &mut self.specialization) + } + /// Called when a new peer is connected pub fn on_peer_connected(&mut self, who: PeerId) { trace!(target: "sync", "Connecting {}", who); @@ -674,7 +741,9 @@ impl Protocol { self.context_data.peers.remove(&peer) }; if let Some(_peer_data) = removed { + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); self.sync.peer_disconnected(peer.clone()); + self.specialization.on_disconnect(&mut context, peer.clone()); self.light_dispatch.on_disconnect(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), @@ -894,6 +963,9 @@ impl Protocol { } } + self.specialization.maintain_peers( + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle) + ); for p in aborting { self.behaviour.disconnect_peer(&p); self.peerset_handle.report_peer(p, rep::TIMEOUT); @@ -1009,6 +1081,9 @@ impl Protocol { } } + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); + self.specialization.on_connect(&mut context, who.clone(), status); + // Notify all the notification protocols as open. CustomMessageOutcome::NotificationStreamOpened { remote: who, @@ -1240,7 +1315,7 @@ impl Protocol { roles: self.config.roles.into(), best_number: info.best_number, best_hash: info.best_hash, - chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible + chain_status: self.specialization.status(), }; self.send_message(&who, GenericMessage::Status(status)) @@ -1309,10 +1384,15 @@ impl Protocol { /// Call this when a block has been imported in the import queue and we should announce it on /// the network. - pub fn on_block_imported(&mut self, header: &B::Header, data: Vec, is_best: bool) { + pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header, data: Vec, is_best: bool) { if is_best { self.sync.update_chain_info(header); } + self.specialization.on_block_imported( + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), + hash.clone(), + header, + ); // blocks are not announced by light clients if self.config.roles.is_light() { @@ -1815,7 +1895,8 @@ fn send_message( behaviour.send_packet(who, encoded); } -impl NetworkBehaviour for Protocol { +impl, H: ExHashT> NetworkBehaviour for +Protocol { type ProtocolsHandler = ::ProtocolsHandler; type OutEvent = CustomMessageOutcome; @@ -1972,13 +2053,13 @@ impl NetworkBehaviour for Protocol { } } -impl DiscoveryNetBehaviour for Protocol { +impl, H: ExHashT> DiscoveryNetBehaviour for Protocol { fn add_discovered_nodes(&mut self, peer_ids: impl Iterator) { self.behaviour.add_discovered_nodes(peer_ids) } } -impl Drop for Protocol { +impl, H: ExHashT> Drop for Protocol { fn drop(&mut self) { debug!(target: "sync", "Network stats:\n{}", self.format_stats()); } diff --git a/client/network/src/protocol/generic_proto/tests.rs b/client/network/src/protocol/generic_proto/tests.rs index b8436e2c7f704..b331b3c2378c3 100644 --- a/client/network/src/protocol/generic_proto/tests.rs +++ b/client/network/src/protocol/generic_proto/tests.rs @@ -25,8 +25,7 @@ use libp2p::swarm::{PollParameters, NetworkBehaviour, NetworkBehaviourAction}; use libp2p::{PeerId, Multiaddr, Transport}; use rand::seq::SliceRandom; use std::{error, io, task::Context, task::Poll, time::Duration}; -use std::collections::HashSet; -use crate::message::{generic::BlockResponse, Message}; +use crate::message::Message; use crate::protocol::generic_proto::{GenericProto, GenericProtoOut}; use sp_test_primitives::Block; @@ -228,10 +227,7 @@ fn two_nodes_transfer_lots_of_packets() { for n in 0 .. NUM_PACKETS { service1.send_packet( &peer_id, - Message::::BlockResponse(BlockResponse { - id: n as _, - blocks: Vec::new(), - }).encode() + Message::::ChainSpecific(vec![(n % 256) as u8]).encode() ); } }, @@ -247,8 +243,8 @@ fn two_nodes_transfer_lots_of_packets() { Some(GenericProtoOut::CustomProtocolOpen { .. }) => {}, Some(GenericProtoOut::CustomMessage { message, .. }) => { match Message::::decode(&mut &message[..]).unwrap() { - Message::::BlockResponse(BlockResponse { id: _, blocks }) => { - assert!(blocks.is_empty()); + Message::::ChainSpecific(message) => { + assert_eq!(message.len(), 1); packet_counter += 1; if packet_counter == NUM_PACKETS { return Poll::Ready(()) @@ -274,21 +270,9 @@ fn basic_two_nodes_requests_in_parallel() { // Generate random messages with or without a request id. let mut to_send = { let mut to_send = Vec::new(); - let mut existing_ids = HashSet::new(); for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode. - let req_id = loop { - let req_id = rand::random::(); - - // ensure uniqueness - odds of randomly sampling collisions - // is unlikely, but possible to cause spurious test failures. - if existing_ids.insert(req_id) { - break req_id; - } - }; - - to_send.push(Message::::BlockResponse( - BlockResponse { id: req_id, blocks: Vec::new() } - )); + let msg = (0..10).map(|_| rand::random::()).collect::>(); + to_send.push(Message::::ChainSpecific(msg)); } to_send }; diff --git a/client/network/src/protocol/message.rs b/client/network/src/protocol/message.rs index a12c26da2e47e..a2261b2059110 100644 --- a/client/network/src/protocol/message.rs +++ b/client/network/src/protocol/message.rs @@ -219,6 +219,9 @@ pub mod generic { FinalityProofResponse(FinalityProofResponse), /// Batch of consensus protocol messages. ConsensusBatch(Vec), + /// Chain-specific message. + #[codec(index = "255")] + ChainSpecific(Vec), } impl Message { @@ -243,6 +246,7 @@ pub mod generic { Message::FinalityProofRequest(_) => "FinalityProofRequest", Message::FinalityProofResponse(_) => "FinalityProofResponse", Message::ConsensusBatch(_) => "ConsensusBatch", + Message::ChainSpecific(_) => "ChainSpecific", } } } diff --git a/client/network/src/protocol/specialization.rs b/client/network/src/protocol/specialization.rs new file mode 100644 index 0000000000000..b410959509dc8 --- /dev/null +++ b/client/network/src/protocol/specialization.rs @@ -0,0 +1,171 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Specializations of the substrate network protocol to allow more complex forms of communication. + +pub use crate::protocol::event::{DhtEvent, Event}; + +use crate::protocol::Context; +use libp2p::PeerId; +use sp_runtime::traits::Block as BlockT; + +/// A specialization of the substrate network protocol. Handles events and sends messages. +pub trait NetworkSpecialization: Send + Sync + 'static { + /// Get the current specialization-status. + fn status(&self) -> Vec; + + /// Called when a peer successfully handshakes. + fn on_connect(&mut self, ctx: &mut dyn Context, who: PeerId, status: crate::protocol::message::Status); + + /// Called when a peer is disconnected. If the peer ID is unknown, it should be ignored. + fn on_disconnect(&mut self, ctx: &mut dyn Context, who: PeerId); + + /// Called when a network-specific message arrives. + fn on_message( + &mut self, + ctx: &mut dyn Context, + who: PeerId, + message: Vec + ); + + /// Called periodically to maintain peers and handle timeouts. + fn maintain_peers(&mut self, _ctx: &mut dyn Context) { } + + /// Called when a block is _imported_ at the head of the chain (not during major sync). + /// Not guaranteed to be called for every block, but will be most of the after major sync. + fn on_block_imported(&mut self, _ctx: &mut dyn Context, _hash: B::Hash, _header: &B::Header) { } +} + +/// A specialization that does nothing. +#[derive(Clone)] +pub struct DummySpecialization; + +impl NetworkSpecialization for DummySpecialization { + fn status(&self) -> Vec { + vec![] + } + + fn on_connect( + &mut self, + _ctx: &mut dyn Context, + _peer_id: PeerId, + _status: crate::protocol::message::Status + ) {} + + fn on_disconnect(&mut self, _ctx: &mut dyn Context, _peer_id: PeerId) {} + + fn on_message( + &mut self, + _ctx: &mut dyn Context, + _peer_id: PeerId, + _message: Vec, + ) {} +} + +/// Construct a simple protocol that is composed of several sub protocols. +/// Each "sub protocol" needs to implement `Specialization` and needs to provide a `new()` function. +/// For more fine grained implementations, this macro is not usable. +/// +/// # Example +/// +/// ```nocompile +/// construct_simple_protocol! { +/// pub struct MyProtocol where Block = MyBlock { +/// consensus_gossip: ConsensusGossip, +/// other_protocol: MyCoolStuff, +/// } +/// } +/// ``` +/// +/// You can also provide an optional parameter after `where Block = MyBlock`, so it looks like +/// `where Block = MyBlock, Status = consensus_gossip`. This will instruct the implementation to +/// use the `status()` function from the `ConsensusGossip` protocol. By default, `status()` returns +/// an empty vector. +#[macro_export] +macro_rules! construct_simple_protocol { + ( + $( #[ $attr:meta ] )* + pub struct $protocol:ident where + Block = $block:ident + $( , Status = $status_protocol_name:ident )* + { + $( $sub_protocol_name:ident : $sub_protocol:ident $( <$protocol_block:ty> )*, )* + } + ) => { + $( #[$attr] )* + pub struct $protocol { + $( $sub_protocol_name: $sub_protocol $( <$protocol_block> )*, )* + } + + impl $protocol { + /// Instantiate a node protocol handler. + pub fn new() -> Self { + Self { + $( $sub_protocol_name: $sub_protocol::new(), )* + } + } + } + + impl $crate::specialization::NetworkSpecialization<$block> for $protocol { + fn status(&self) -> Vec { + $( + let status = self.$status_protocol_name.status(); + + if !status.is_empty() { + return status; + } + )* + + Vec::new() + } + + fn on_connect( + &mut self, + _ctx: &mut $crate::Context<$block>, + _who: $crate::PeerId, + _status: $crate::StatusMessage<$block> + ) { + $( self.$sub_protocol_name.on_connect(_ctx, _who, _status); )* + } + + fn on_disconnect(&mut self, _ctx: &mut $crate::Context<$block>, _who: $crate::PeerId) { + $( self.$sub_protocol_name.on_disconnect(_ctx, _who); )* + } + + fn on_message( + &mut self, + _ctx: &mut $crate::Context<$block>, + _who: $crate::PeerId, + _message: Vec, + ) { + $( self.$sub_protocol_name.on_message(_ctx, _who, _message); )* + } + + fn maintain_peers(&mut self, _ctx: &mut $crate::Context<$block>) { + $( self.$sub_protocol_name.maintain_peers(_ctx); )* + } + + fn on_block_imported( + &mut self, + _ctx: &mut $crate::Context<$block>, + _hash: <$block as $crate::BlockT>::Hash, + _header: &<$block as $crate::BlockT>::Header + ) { + $( self.$sub_protocol_name.on_block_imported(_ctx, _hash, _header); )* + } + } + } +} diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 288289d95c81e..26facd98af996 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -45,8 +45,9 @@ use crate::{transport, config::NonReservedPeerMode, ReputationChange}; use crate::config::{Params, TransportConfig}; use crate::error::Error; use crate::network_state::{NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer}; -use crate::protocol::{self, Protocol, PeerInfo}; +use crate::protocol::{self, Protocol, Context, PeerInfo}; use crate::protocol::{event::Event, light_dispatch::{AlwaysBadChecker, RequestData}}; +use crate::protocol::specialization::NetworkSpecialization; use crate::protocol::sync::SyncState; /// Minimum Requirements for a Hash within Networking @@ -100,7 +101,7 @@ impl ReportHandle { } /// Substrate network service. Handles network IO and manages connectivity. -pub struct NetworkService { +pub struct NetworkService, H: ExHashT> { /// Number of peers we're connected to. num_connected: Arc, /// The local external addresses. @@ -115,19 +116,19 @@ pub struct NetworkService { /// nodes it should be connected to or not. peerset: PeersetHandle, /// Channel that sends messages to the actual worker. - to_worker: mpsc::UnboundedSender>, + to_worker: mpsc::UnboundedSender>, /// Marker to pin the `H` generic. Serves no purpose except to not break backwards /// compatibility. _marker: PhantomData, } -impl NetworkWorker { +impl, H: ExHashT> NetworkWorker { /// Creates the network service. /// /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new(params: Params) -> Result, Error> { + pub fn new(params: Params) -> Result, Error> { let (to_worker, from_worker) = mpsc::unbounded(); if let Some(ref path) = params.network_config.net_config_path { @@ -204,6 +205,7 @@ impl NetworkWorker { }, params.chain.clone(), checker.clone(), + params.specialization, params.transaction_pool, params.finality_proof_provider.clone(), params.finality_proof_request_builder, @@ -213,7 +215,7 @@ impl NetworkWorker { )?; // Build the swarm. - let (mut swarm, bandwidth): (Swarm::, _) = { + let (mut swarm, bandwidth): (Swarm::, _) = { let user_agent = format!( "{} ({})", params.network_config.client_version, @@ -261,14 +263,14 @@ impl NetworkWorker { // Listen on multiaddresses. for addr in ¶ms.network_config.listen_addresses { - if let Err(err) = Swarm::::listen_on(&mut swarm, addr.clone()) { + if let Err(err) = Swarm::::listen_on(&mut swarm, addr.clone()) { warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err) } } // Add external addresses. for addr in ¶ms.network_config.public_addresses { - Swarm::::add_external_address(&mut swarm, addr.clone()); + Swarm::::add_external_address(&mut swarm, addr.clone()); } let external_addresses = Arc::new(Mutex::new(Vec::new())); @@ -349,13 +351,13 @@ impl NetworkWorker { /// Return a `NetworkService` that can be shared through the code base and can be used to /// manipulate the worker. - pub fn service(&self) -> &Arc> { + pub fn service(&self) -> &Arc> { &self.service } /// You must call this when a new block is imported by the client. - pub fn on_block_imported(&mut self, header: B::Header, data: Vec, is_best: bool) { - self.network_service.user_protocol_mut().on_block_imported(&header, data, is_best); + pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header, data: Vec, is_best: bool) { + self.network_service.user_protocol_mut().on_block_imported(hash, &header, data, is_best); } /// You must call this when a new block is finalized by the client. @@ -413,9 +415,9 @@ impl NetworkWorker { }; NetworkState { - peer_id: Swarm::::local_peer_id(&swarm).to_base58(), - listened_addresses: Swarm::::listeners(&swarm).cloned().collect(), - external_addresses: Swarm::::external_addresses(&swarm).cloned().collect(), + peer_id: Swarm::::local_peer_id(&swarm).to_base58(), + listened_addresses: Swarm::::listeners(&swarm).cloned().collect(), + external_addresses: Swarm::::external_addresses(&swarm).cloned().collect(), average_download_per_sec: self.service.bandwidth.average_download_per_sec(), average_upload_per_sec: self.service.bandwidth.average_upload_per_sec(), connected_peers, @@ -444,7 +446,7 @@ impl NetworkWorker { } } -impl NetworkService { +impl, H: ExHashT> NetworkService { /// Writes a message on an open notifications channel. Has no effect if the notifications /// channel with this protocol name is closed. /// @@ -543,6 +545,15 @@ impl NetworkService { .unbounded_send(ServiceToWorkerMsg::RequestJustification(hash.clone(), number)); } + /// Execute a closure with the chain-specific network specialization. + pub fn with_spec(&self, f: F) + where F: FnOnce(&mut S, &mut dyn Context) + Send + 'static + { + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::ExecuteWithSpec(Box::new(f))); + } + /// Are we in the process of downloading the chain? pub fn is_major_syncing(&self) -> bool { self.is_major_syncing.load(Ordering::Relaxed) @@ -630,8 +641,8 @@ impl NetworkService { } } -impl sp_consensus::SyncOracle - for NetworkService +impl, H: ExHashT> sp_consensus::SyncOracle + for NetworkService { fn is_major_syncing(&mut self) -> bool { NetworkService::is_major_syncing(self) @@ -642,8 +653,8 @@ impl sp_consensus::SyncOracle } } -impl<'a, B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle - for &'a NetworkService +impl<'a, B: BlockT + 'static, S: NetworkSpecialization, H: ExHashT> sp_consensus::SyncOracle + for &'a NetworkService { fn is_major_syncing(&mut self) -> bool { NetworkService::is_major_syncing(self) @@ -663,9 +674,10 @@ pub trait NetworkStateInfo { fn local_peer_id(&self) -> PeerId; } -impl NetworkStateInfo for NetworkService +impl NetworkStateInfo for NetworkService where B: sp_runtime::traits::Block, + S: NetworkSpecialization, H: ExHashT, { /// Returns the local external addresses. @@ -682,11 +694,12 @@ impl NetworkStateInfo for NetworkService /// Messages sent from the `NetworkService` to the `NetworkWorker`. /// /// Each entry corresponds to a method of `NetworkService`. -enum ServiceToWorkerMsg { +enum ServiceToWorkerMsg> { PropagateExtrinsic(H), PropagateExtrinsics, RequestJustification(B::Hash, NumberFor), AnnounceBlock(B::Hash, Vec), + ExecuteWithSpec(Box) + Send>), GetValue(record::Key), PutValue(record::Key, Vec), AddKnownAddress(PeerId, Multiaddr), @@ -708,7 +721,7 @@ enum ServiceToWorkerMsg { /// /// You are encouraged to poll this in a separate background thread or task. #[must_use = "The NetworkWorker must be polled in order for the network to work"] -pub struct NetworkWorker { +pub struct NetworkWorker, H: ExHashT> { /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. external_addresses: Arc>>, /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. @@ -716,20 +729,20 @@ pub struct NetworkWorker { /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. is_major_syncing: Arc, /// The network service that can be extracted and shared through the codebase. - service: Arc>, + service: Arc>, /// The *actual* network. - network_service: Swarm, + network_service: Swarm, /// The import queue that was passed as initialization. import_queue: Box>, /// Messages from the `NetworkService` and that must be processed. - from_worker: mpsc::UnboundedReceiver>, + from_worker: mpsc::UnboundedReceiver>, /// Receiver for queries from the light client that must be processed. light_client_rqs: Option>>, /// Senders for events that happen on the network. event_streams: Vec>, } -impl Future for NetworkWorker { +impl, H: ExHashT> Future for NetworkWorker { type Output = Result<(), io::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { @@ -756,6 +769,11 @@ impl Future for NetworkWorker { }; match msg { + ServiceToWorkerMsg::ExecuteWithSpec(task) => { + let protocol = this.network_service.user_protocol_mut(); + let (mut context, spec) = protocol.specialization_lock(); + task(spec, &mut context); + }, ServiceToWorkerMsg::AnnounceBlock(hash, data) => this.network_service.user_protocol_mut().announce_block(hash, data), ServiceToWorkerMsg::RequestJustification(hash, number) => @@ -821,7 +839,7 @@ impl Future for NetworkWorker { // Update the variables shared with the `NetworkService`. this.num_connected.store(this.network_service.user_protocol_mut().num_connected_peers(), Ordering::Relaxed); { - let external_addresses = Swarm::::external_addresses(&this.network_service).cloned().collect(); + let external_addresses = Swarm::::external_addresses(&this.network_service).cloned().collect(); *this.external_addresses.lock() = external_addresses; } this.is_major_syncing.store(match this.network_service.user_protocol_mut().sync_state() { @@ -833,18 +851,20 @@ impl Future for NetworkWorker { } } -impl Unpin for NetworkWorker { +impl, H: ExHashT> Unpin for NetworkWorker { } /// The libp2p swarm, customized for our needs. -type Swarm = libp2p::swarm::Swarm>; +type Swarm = libp2p::swarm::Swarm< + Behaviour +>; // Implementation of `import_queue::Link` trait using the available local variables. -struct NetworkLink<'a, B: BlockT, H: ExHashT> { - protocol: &'a mut Swarm, +struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> { + protocol: &'a mut Swarm, } -impl<'a, B: BlockT, H: ExHashT> Link for NetworkLink<'a, B, H> { +impl<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> Link for NetworkLink<'a, B, S, H> { fn blocks_processed( &mut self, imported: usize, diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index d09897e853ec2..ecbd810abc8ea 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -52,14 +52,17 @@ use sc_network::config::{NetworkConfiguration, TransportConfig, BoxFinalityProof use libp2p::PeerId; use parking_lot::Mutex; use sp_core::H256; -use sc_network::config::{ProtocolConfig, TransactionPool}; +use sc_network::config::ProtocolConfig; use sp_runtime::generic::{BlockId, OpaqueDigestItemId}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use sp_runtime::Justification; +use sc_network::config::TransactionPool; +use sc_network::specialization::NetworkSpecialization; use substrate_test_runtime_client::{self, AccountKeyring}; pub use substrate_test_runtime_client::runtime::{Block, Extrinsic, Hash, Transfer}; pub use substrate_test_runtime_client::{TestClient, TestClientBuilder, TestClientBuilderExt}; +pub use sc_network::specialization::DummySpecialization; type AuthorityId = sp_consensus_babe::AuthorityId; @@ -175,7 +178,7 @@ impl PeersClient { } } -pub struct Peer { +pub struct Peer> { pub data: D, client: PeersClient, /// We keep a copy of the verifier so that we can invoke it for locally-generated blocks, @@ -186,12 +189,12 @@ pub struct Peer { block_import: BlockImportAdapter<()>, select_chain: Option>, backend: Option>, - network: NetworkWorker::Hash>, + network: NetworkWorker::Hash>, imported_blocks_stream: Box, Error = ()> + Send>, finality_notification_stream: Box, Error = ()> + Send>, } -impl Peer { +impl> Peer { /// Get this peer ID. pub fn id(&self) -> PeerId { self.network.service().local_peer_id() @@ -281,7 +284,7 @@ impl Peer { Default::default() }; self.block_import.import_block(import_block, cache).expect("block_import failed"); - self.network.on_block_imported(header, Vec::new(), true); + self.network.on_block_imported(hash, header, Vec::new(), true); at = hash; } @@ -352,7 +355,7 @@ impl Peer { } /// Get a reference to the network service. - pub fn network_service(&self) -> &Arc::Hash>> { + pub fn network_service(&self) -> &Arc::Hash>> { &self.network.service() } @@ -408,6 +411,16 @@ impl TransactionPool for EmptyTransactionPool { fn transaction(&self, _h: &Hash) -> Option { None } } +pub trait SpecializationFactory { + fn create() -> Self; +} + +impl SpecializationFactory for DummySpecialization { + fn create() -> DummySpecialization { + DummySpecialization + } +} + /// Implements `BlockImport` for any `Transaction`. Internally the transaction is /// "converted", aka the field is set to `None`. /// @@ -528,6 +541,7 @@ impl VerifierAdapter { } pub trait TestNetFactory: Sized { + type Specialization: NetworkSpecialization + SpecializationFactory; type Verifier: 'static + Verifier; type PeerData: Default; @@ -541,9 +555,9 @@ pub trait TestNetFactory: Sized { ) -> Self::Verifier; /// Get reference to peer. - fn peer(&mut self, i: usize) -> &mut Peer; - fn peers(&self) -> &Vec>; - fn mut_peers>)>( + fn peer(&mut self, i: usize) -> &mut Peer; + fn peers(&self) -> &Vec>; + fn mut_peers>)>( &mut self, closure: F, ); @@ -641,6 +655,7 @@ pub trait TestNetFactory: Sized { transaction_pool: Arc::new(EmptyTransactionPool), protocol_id: ProtocolId::from(&b"test-protocol-name"[..]), import_queue, + specialization: self::SpecializationFactory::create(), block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())) }).unwrap(); @@ -716,6 +731,7 @@ pub trait TestNetFactory: Sized { transaction_pool: Arc::new(EmptyTransactionPool), protocol_id: ProtocolId::from(&b"test-protocol-name"[..]), import_queue, + specialization: self::SpecializationFactory::create(), block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())) }).unwrap(); @@ -811,6 +827,7 @@ pub trait TestNetFactory: Sized { // We poll `imported_blocks_stream`. while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() { peer.network.on_block_imported( + notification.hash, notification.header, Vec::new(), true, @@ -831,10 +848,11 @@ pub trait TestNetFactory: Sized { } pub struct TestNet { - peers: Vec>, + peers: Vec>, } impl TestNetFactory for TestNet { + type Specialization = DummySpecialization; type Verifier = PassThroughVerifier; type PeerData = (); @@ -851,15 +869,15 @@ impl TestNetFactory for TestNet { PassThroughVerifier(false) } - fn peer(&mut self, i: usize) -> &mut Peer<()> { + fn peer(&mut self, i: usize) -> &mut Peer<(), Self::Specialization> { &mut self.peers[i] } - fn peers(&self) -> &Vec> { + fn peers(&self) -> &Vec> { &self.peers } - fn mut_peers>)>(&mut self, closure: F) { + fn mut_peers>)>(&mut self, closure: F) { closure(&mut self.peers); } } @@ -883,6 +901,7 @@ impl JustificationImport for ForceFinalized { pub struct JustificationTestNet(TestNet); impl TestNetFactory for JustificationTestNet { + type Specialization = DummySpecialization; type Verifier = PassThroughVerifier; type PeerData = (); @@ -894,16 +913,17 @@ impl TestNetFactory for JustificationTestNet { self.0.make_verifier(client, config, peer_data) } - fn peer(&mut self, i: usize) -> &mut Peer { + fn peer(&mut self, i: usize) -> &mut Peer { self.0.peer(i) } - fn peers(&self) -> &Vec> { + fn peers(&self) -> &Vec> { self.0.peers() } fn mut_peers>, + &mut Vec>, )>(&mut self, closure: F) { self.0.mut_peers(closure) } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 7159e532c4259..20078f2d4ff75 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -35,7 +35,7 @@ use futures::{ use sc_keystore::{Store as Keystore}; use log::{info, warn, error}; use sc_network::config::{FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder}; -use sc_network::{NetworkService, NetworkStateInfo}; +use sc_network::{NetworkService, NetworkStateInfo, specialization::NetworkSpecialization}; use parking_lot::{Mutex, RwLock}; use sp_runtime::generic::BlockId; use sp_runtime::traits::{ @@ -102,6 +102,7 @@ pub type BackgroundTask = Pin + Send>>; /// /// - [`with_select_chain`](ServiceBuilder::with_select_chain) /// - [`with_import_queue`](ServiceBuilder::with_import_queue) +/// - [`with_network_protocol`](ServiceBuilder::with_network_protocol) /// - [`with_finality_proof_provider`](ServiceBuilder::with_finality_proof_provider) /// - [`with_transaction_pool`](ServiceBuilder::with_transaction_pool) /// @@ -111,7 +112,7 @@ pub type BackgroundTask = Pin + Send>>; /// generics is done when you call `build`. /// pub struct ServiceBuilder + TNetP, TExPool, TRpc, Backend> { config: Configuration, pub (crate) client: Arc, @@ -122,6 +123,7 @@ pub struct ServiceBuilder, finality_proof_provider: Option, + network_protocol: TNetP, transaction_pool: Arc, rpc_extensions: TRpc, remote_backend: Option>>, @@ -264,7 +266,7 @@ fn new_full_parts( Ok((client, backend, keystore)) } -impl ServiceBuilder<(), (), TGen, TCSExt, (), (), (), (), (), (), (), (), ()> +impl ServiceBuilder<(), (), TGen, TCSExt, (), (), (), (), (), (), (), (), (), ()> where TGen: RuntimeGenesis, TCSExt: Extension { /// Start the service builder with a configuration. pub fn new_full( @@ -282,6 +284,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension { Arc>, (), (), + (), TFullBackend, >, Error> { let (client, backend, keystore) = new_full_parts(&config)?; @@ -298,6 +301,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension { import_queue: (), finality_proof_request_builder: None, finality_proof_provider: None, + network_protocol: (), transaction_pool: Arc::new(()), rpc_extensions: Default::default(), remote_backend: None, @@ -323,6 +327,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension { Arc>, (), (), + (), TLightBackend, >, Error> { let keystore = match &config.keystore { @@ -383,6 +388,7 @@ where TGen: RuntimeGenesis, TCSExt: Extension { import_queue: (), finality_proof_request_builder: None, finality_proof_provider: None, + network_protocol: (), transaction_pool: Arc::new(()), rpc_extensions: Default::default(), remote_backend: Some(remote_blockchain), @@ -393,9 +399,9 @@ where TGen: RuntimeGenesis, TCSExt: Extension { } } -impl +impl ServiceBuilder { + TNetP, TExPool, TRpc, Backend> { /// Returns a reference to the client that was stored in this builder. pub fn client(&self) -> &Arc { @@ -443,7 +449,7 @@ impl, &Arc ) -> Result, Error> ) -> Result, Error> { + TNetP, TExPool, TRpc, Backend>, Error> { let select_chain = select_chain_builder(&self.config, &self.backend)?; Ok(ServiceBuilder { @@ -456,6 +462,7 @@ impl, &Arc) -> Result ) -> Result, Error> { + TNetP, TExPool, TRpc, Backend>, Error> { self.with_opt_select_chain(|cfg, b| builder(cfg, b).map(Option::Some)) } @@ -480,7 +487,7 @@ impl, Arc, Option, Arc) -> Result ) -> Result, Error> + TNetP, TExPool, TRpc, Backend>, Error> where TSc: Clone { let import_queue = builder( &self.config, @@ -499,6 +506,35 @@ impl( + self, + network_protocol_builder: impl FnOnce(&Configuration) -> Result + ) -> Result, Error> { + let network_protocol = network_protocol_builder(&self.config)?; + + Ok(ServiceBuilder { + config: self.config, + client: self.client, + backend: self.backend, + keystore: self.keystore, + fetcher: self.fetcher, + select_chain: self.select_chain, + import_queue: self.import_queue, + finality_proof_request_builder: self.finality_proof_request_builder, + finality_proof_provider: self.finality_proof_provider, + network_protocol, transaction_pool: self.transaction_pool, rpc_extensions: self.rpc_extensions, remote_backend: self.remote_backend, @@ -523,6 +559,7 @@ impl>, + TNetP, TExPool, TRpc, Backend, @@ -539,6 +576,7 @@ impl>, + TNetP, TExPool, TRpc, Backend, @@ -582,7 +621,7 @@ impl, ) -> Result<(UImpQu, Option), Error> ) -> Result, Error> + TNetP, TExPool, TRpc, Backend>, Error> where TSc: Clone, TFchr: Clone { let (import_queue, fprb) = builder( &self.config, @@ -603,6 +642,7 @@ impl, ) -> Result<(UImpQu, UFprb), Error> ) -> Result, Error> + TNetP, TExPool, TRpc, Backend>, Error> where TSc: Clone, TFchr: Clone { self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx| builder(cfg, cl, b, f, sc, tx) @@ -641,7 +681,7 @@ impl, ) -> Result<(UExPool, Option), Error> ) -> Result, Error> + TNetP, UExPool, TRpc, Backend>, Error> where TSc: Clone, TFchr: Clone { let (transaction_pool, background_task) = transaction_pool_builder( self.config.transaction_pool.clone(), @@ -663,6 +703,7 @@ impl Result, ) -> Result, Error> + TNetP, TExPool, URpc, Backend>, Error> where TSc: Clone, TFchr: Clone { let rpc_extensions = rpc_ext_builder(&self)?; @@ -691,6 +732,7 @@ impl Pin> + Send>>; } -impl +impl ServiceBuilder< TBl, TRtApi, @@ -770,6 +813,7 @@ ServiceBuilder< TImpQu, BoxFinalityProofRequestBuilder, Arc>, + TNetP, TExPool, TRpc, TBackend, @@ -790,6 +834,7 @@ ServiceBuilder< TExec: 'static + sc_client::CallExecutor + Send + Sync + Clone, TSc: Clone, TImpQu: 'static + ImportQueue, + TNetP: NetworkSpecialization, TExPool: MaintainedTransactionPool::Hash> + MallocSizeOfWasm + 'static, TRpc: sc_rpc::RpcExtension + Clone, { @@ -806,7 +851,7 @@ ServiceBuilder< Client, TSc, NetworkStatus, - NetworkService::Hash>, + NetworkService::Hash>, TExPool, sc_offchain::OffchainWorkers< Client, @@ -825,6 +870,7 @@ ServiceBuilder< import_queue, finality_proof_request_builder, finality_proof_provider, + network_protocol, transaction_pool, rpc_extensions, remote_backend, @@ -905,6 +951,7 @@ ServiceBuilder< transaction_pool: transaction_pool_adapter.clone() as _, import_queue, protocol_id, + specialization: network_protocol, block_announce_validator, }; diff --git a/client/service/src/chain_ops.rs b/client/service/src/chain_ops.rs index a0724f3e1decc..f415ea213a40f 100644 --- a/client/service/src/chain_ops.rs +++ b/client/service/src/chain_ops.rs @@ -46,12 +46,12 @@ pub fn build_spec(spec: ChainSpec, raw: bool) -> error::Result ServiceBuilderCommand for ServiceBuilder< TBl, TRtApi, TGen, TCSExt, Client>, TBl, TRtApi>, - TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend + TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, Backend > where TBl: BlockT, TBackend: 'static + sc_client_api::backend::Backend + Send, diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 99b45453411d3..8c5d5deccacc4 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -45,7 +45,10 @@ use futures::{ sink::SinkExt, task::{Spawn, FutureObj, SpawnError}, }; -use sc_network::{NetworkService, network_state::NetworkState, PeerId, ReportHandle}; +use sc_network::{ + NetworkService, network_state::NetworkState, specialization::NetworkSpecialization, + PeerId, ReportHandle, +}; use log::{log, warn, debug, error, Level}; use codec::{Encode, Decode}; use sp_runtime::generic::BlockId; @@ -173,6 +176,8 @@ pub trait AbstractService: 'static + Future> + type SelectChain: sp_consensus::SelectChain; /// Transaction pool. type TransactionPool: TransactionPool + MallocSizeOfWasm; + /// Network specialization. + type NetworkSpecialization: NetworkSpecialization; /// Get event stream for telemetry connection established events. fn telemetry_on_connect_stream(&self) -> futures::channel::mpsc::UnboundedReceiver<()>; @@ -213,7 +218,7 @@ pub trait AbstractService: 'static + Future> + /// Get shared network instance. fn network(&self) - -> Arc::Hash>>; + -> Arc::Hash>>; /// Returns a receiver that periodically receives a status of the network. fn network_status(&self, interval: Duration) -> mpsc::UnboundedReceiver<(NetworkStatus, NetworkState)>; @@ -225,9 +230,9 @@ pub trait AbstractService: 'static + Future> + fn on_exit(&self) -> ::exit_future::Exit; } -impl AbstractService for +impl AbstractService for Service, TSc, NetworkStatus, - NetworkService, TExPool, TOc> + NetworkService, TExPool, TOc> where TBl: BlockT + Unpin, TBackend: 'static + sc_client_api::backend::Backend, @@ -236,6 +241,7 @@ where TSc: sp_consensus::SelectChain + 'static + Clone + Send + Unpin, TExPool: 'static + TransactionPool + MallocSizeOfWasm, TOc: 'static + Send + Sync, + TNetSpec: NetworkSpecialization, { type Block = TBl; type Backend = TBackend; @@ -243,6 +249,7 @@ where type RuntimeApi = TRtApi; type SelectChain = TSc; type TransactionPool = TExPool; + type NetworkSpecialization = TNetSpec; fn telemetry_on_connect_stream(&self) -> futures::channel::mpsc::UnboundedReceiver<()> { let (sink, stream) = futures::channel::mpsc::unbounded(); @@ -308,7 +315,7 @@ where } fn network(&self) - -> Arc::Hash>> + -> Arc::Hash>> { self.network.clone() } @@ -372,10 +379,11 @@ impl Spawn for fn build_network_future< B: BlockT, C: sc_client::BlockchainEvents, + S: sc_network::specialization::NetworkSpecialization, H: sc_network::ExHashT > ( roles: Roles, - mut network: sc_network::NetworkWorker, + mut network: sc_network::NetworkWorker, client: Arc, status_sinks: Arc, NetworkState)>>>, mut rpc_rx: mpsc::UnboundedReceiver>, @@ -389,7 +397,7 @@ fn build_network_future< // We poll `imported_blocks_stream`. while let Poll::Ready(Some(notification)) = Pin::new(&mut imported_blocks_stream).poll_next(cx) { - network.on_block_imported(notification.header, Vec::new(), notification.is_new_best); + network.on_block_imported(notification.hash, notification.header, Vec::new(), notification.is_new_best); } // We poll `finality_notification_stream`, but we only take the last event.