From 626365436be2315a03817af3f90058b94cb13f22 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 14 Sep 2022 13:42:25 +0300 Subject: [PATCH 01/11] Move transaction protocol to its own crate --- client/beefy/Cargo.toml | 1 + client/beefy/src/lib.rs | 4 +- client/cli/Cargo.toml | 1 + client/cli/src/params/network_params.rs | 5 +- client/finality-grandpa/src/lib.rs | 8 +- client/network/common/Cargo.toml | 3 + client/network/common/src/config.rs | 128 +++++++++++ client/network/{ => common}/src/error.rs | 3 +- client/network/common/src/lib.rs | 8 + client/network/{ => common}/src/utils.rs | 0 client/network/src/config.rs | 209 +----------------- client/network/src/discovery.rs | 3 +- client/network/src/lib.rs | 9 - client/network/src/peer_info.rs | 2 +- client/network/src/protocol.rs | 13 +- client/network/src/protocol/message.rs | 7 +- client/network/src/service.rs | 55 ++--- client/network/src/service/tests.rs | 56 ++--- client/network/test/src/lib.rs | 14 +- client/network/transactions/Cargo.toml | 27 +++ client/network/transactions/src/config.rs | 98 ++++++++ .../src/lib.rs} | 106 ++++----- client/service/Cargo.toml | 1 + client/service/src/builder.rs | 58 +++-- client/service/src/config.rs | 6 +- client/service/src/error.rs | 3 +- client/service/src/lib.rs | 7 +- client/service/test/src/lib.rs | 7 +- 28 files changed, 429 insertions(+), 413 deletions(-) rename client/network/{ => common}/src/error.rs (96%) rename client/network/{ => common}/src/utils.rs (100%) create mode 100644 client/network/transactions/Cargo.toml create mode 100644 client/network/transactions/src/config.rs rename client/network/{src/transactions.rs => transactions/src/lib.rs} (85%) diff --git a/client/beefy/Cargo.toml b/client/beefy/Cargo.toml index 5cc2952e26ba5..a634b91f2113c 100644 --- a/client/beefy/Cargo.toml +++ b/client/beefy/Cargo.toml @@ -27,6 +27,7 @@ sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } sc-finality-grandpa = { version = "0.10.0-dev", path = "../../client/finality-grandpa" } sc-keystore = { version = "4.0.0-dev", path = "../keystore" } sc-network = { version = "0.10.0-dev", path = "../network" } +sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-network-gossip = { version = "0.10.0-dev", path = "../network-gossip" } sc-utils = { version = "4.0.0-dev", path = "../utils" } sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } diff --git a/client/beefy/src/lib.rs b/client/beefy/src/lib.rs index cdb7fca10320d..b5b3ece1d508d 100644 --- a/client/beefy/src/lib.rs +++ b/client/beefy/src/lib.rs @@ -82,8 +82,8 @@ pub(crate) mod beefy_protocol_name { /// For standard protocol name see [`beefy_protocol_name::standard_name`]. pub fn beefy_peers_set_config( protocol_name: ProtocolName, -) -> sc_network::config::NonDefaultSetConfig { - let mut cfg = sc_network::config::NonDefaultSetConfig::new(protocol_name, 1024 * 1024); +) -> sc_network_common::config::NonDefaultSetConfig { + let mut cfg = sc_network_common::config::NonDefaultSetConfig::new(protocol_name, 1024 * 1024); cfg.allow_non_reserved(25, 25); cfg.add_fallback_names(beefy_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect()); diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index 50d110d40eabd..3cccc0bf41529 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -34,6 +34,7 @@ sc-client-api = { version = "4.0.0-dev", path = "../api" } sc-client-db = { version = "0.10.0-dev", default-features = false, path = "../db" } sc-keystore = { version = "4.0.0-dev", path = "../keystore" } sc-network = { version = "0.10.0-dev", path = "../network" } +sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-service = { version = "0.10.0-dev", default-features = false, path = "../service" } sc-telemetry = { version = "4.0.0-dev", path = "../telemetry" } sc-tracing = { version = "4.0.0-dev", path = "../tracing" } diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index 74c2db92c3215..0450b5f0e2566 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -19,11 +19,10 @@ use crate::{arg_enums::SyncMode, params::node_key_params::NodeKeyParams}; use clap::Args; use sc_network::{ - config::{ - NetworkConfiguration, NodeKeyConfig, NonReservedPeerMode, SetConfig, TransportConfig, - }, + config::{NetworkConfiguration, NodeKeyConfig}, multiaddr::Protocol, }; +use sc_network_common::config::{NonReservedPeerMode, SetConfig, TransportConfig}; use sc_service::{ config::{Multiaddr, MultiaddrWithPeerId}, ChainSpec, ChainType, diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index 7e47b70bd6b98..d5c05fea78aa2 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -688,18 +688,18 @@ pub struct GrandpaParams { /// For standard protocol name see [`crate::protocol_standard_name`]. pub fn grandpa_peers_set_config( protocol_name: ProtocolName, -) -> sc_network::config::NonDefaultSetConfig { +) -> sc_network_common::config::NonDefaultSetConfig { use communication::grandpa_protocol_name; - sc_network::config::NonDefaultSetConfig { + sc_network_common::config::NonDefaultSetConfig { notifications_protocol: protocol_name, fallback_names: grandpa_protocol_name::LEGACY_NAMES.iter().map(|&n| n.into()).collect(), // Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot. max_notification_size: 1024 * 1024, - set_config: sc_network::config::SetConfig { + set_config: sc_network_common::config::SetConfig { in_peers: 0, out_peers: 0, reserved_nodes: Vec::new(), - non_reserved_mode: sc_network::config::NonReservedPeerMode::Deny, + non_reserved_mode: sc_network_common::config::NonReservedPeerMode::Deny, }, } } diff --git a/client/network/common/Cargo.toml b/client/network/common/Cargo.toml index 47d43e8b4b03f..1ee7b15538366 100644 --- a/client/network/common/Cargo.toml +++ b/client/network/common/Cargo.toml @@ -24,7 +24,10 @@ codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive", ] } futures = "0.3.21" +futures-timer = "3.0.2" libp2p = "0.46.1" +linked_hash_set = "0.1.3" +prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } smallvec = "1.8.0" sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sc-peerset = { version = "4.0.0-dev", path = "../../peerset" } diff --git a/client/network/common/src/config.rs b/client/network/common/src/config.rs index 8b7e045780d7d..fb23cd0174922 100644 --- a/client/network/common/src/config.rs +++ b/client/network/common/src/config.rs @@ -18,6 +18,8 @@ //! Configuration of the networking layer. +use crate::protocol; + use libp2p::{multiaddr, Multiaddr, PeerId}; use std::{fmt, str, str::FromStr}; @@ -171,3 +173,129 @@ impl From for ParseErr { Self::MultiaddrParse(err) } } + +/// Configuration for a set of nodes. +#[derive(Clone, Debug)] +pub struct SetConfig { + /// Maximum allowed number of incoming substreams related to this set. + pub in_peers: u32, + /// Number of outgoing substreams related to this set that we're trying to maintain. + pub out_peers: u32, + /// List of reserved node addresses. + pub reserved_nodes: Vec, + /// Whether nodes that aren't in [`SetConfig::reserved_nodes`] are accepted or automatically + /// refused. + pub non_reserved_mode: NonReservedPeerMode, +} + +impl Default for SetConfig { + fn default() -> Self { + Self { + in_peers: 25, + out_peers: 75, + reserved_nodes: Vec::new(), + non_reserved_mode: NonReservedPeerMode::Accept, + } + } +} + +/// Extension to [`SetConfig`] for sets that aren't the default set. +/// +/// > **Note**: As new fields might be added in the future, please consider using the `new` method +/// > and modifiers instead of creating this struct manually. +#[derive(Clone, Debug)] +pub struct NonDefaultSetConfig { + /// Name of the notifications protocols of this set. A substream on this set will be + /// considered established once this protocol is open. + /// + /// > **Note**: This field isn't present for the default set, as this is handled internally + /// > by the networking code. + pub notifications_protocol: protocol::ProtocolName, + /// If the remote reports that it doesn't support the protocol indicated in the + /// `notifications_protocol` field, then each of these fallback names will be tried one by + /// one. + /// + /// If a fallback is used, it will be reported in + /// `sc_network::protocol::event::Event::NotificationStreamOpened::negotiated_fallback` + pub fallback_names: Vec, + /// Maximum allowed size of single notifications. + pub max_notification_size: u64, + /// Base configuration. + pub set_config: SetConfig, +} + +impl NonDefaultSetConfig { + /// Creates a new [`NonDefaultSetConfig`]. Zero slots and accepts only reserved nodes. + pub fn new(notifications_protocol: protocol::ProtocolName, max_notification_size: u64) -> Self { + Self { + notifications_protocol, + max_notification_size, + fallback_names: Vec::new(), + set_config: SetConfig { + in_peers: 0, + out_peers: 0, + reserved_nodes: Vec::new(), + non_reserved_mode: NonReservedPeerMode::Deny, + }, + } + } + + /// Modifies the configuration to allow non-reserved nodes. + pub fn allow_non_reserved(&mut self, in_peers: u32, out_peers: u32) { + self.set_config.in_peers = in_peers; + self.set_config.out_peers = out_peers; + self.set_config.non_reserved_mode = NonReservedPeerMode::Accept; + } + + /// Add a node to the list of reserved nodes. + pub fn add_reserved(&mut self, peer: MultiaddrWithPeerId) { + self.set_config.reserved_nodes.push(peer); + } + + /// Add a list of protocol names used for backward compatibility. + /// + /// See the explanations in [`NonDefaultSetConfig::fallback_names`]. + pub fn add_fallback_names(&mut self, fallback_names: Vec) { + self.fallback_names.extend(fallback_names); + } +} + +/// Configuration for the transport layer. +#[derive(Clone, Debug)] +pub enum TransportConfig { + /// Normal transport mode. + Normal { + /// If true, the network will use mDNS to discover other libp2p nodes on the local network + /// and connect to them if they support the same chain. + enable_mdns: bool, + + /// If true, allow connecting to private IPv4 addresses (as defined in + /// [RFC1918](https://tools.ietf.org/html/rfc1918)). Irrelevant for addresses that have + /// been passed in `::sc_network::config::NetworkConfiguration::boot_nodes`. + allow_private_ipv4: bool, + }, + + /// Only allow connections within the same process. + /// Only addresses of the form `/memory/...` will be supported. + MemoryOnly, +} + +/// The policy for connections to non-reserved peers. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum NonReservedPeerMode { + /// Accept them. This is the default. + Accept, + /// Deny them. + Deny, +} + +impl NonReservedPeerMode { + /// Attempt to parse the peer mode from a string. + pub fn parse(s: &str) -> Option { + match s { + "accept" => Some(Self::Accept), + "deny" => Some(Self::Deny), + _ => None, + } + } +} diff --git a/client/network/src/error.rs b/client/network/common/src/error.rs similarity index 96% rename from client/network/src/error.rs rename to client/network/common/src/error.rs index b4287ffbd55db..4326b1af52836 100644 --- a/client/network/src/error.rs +++ b/client/network/common/src/error.rs @@ -18,9 +18,8 @@ //! Substrate network possible errors. -use crate::config::TransportConfig; +use crate::{config::TransportConfig, protocol::ProtocolName}; use libp2p::{Multiaddr, PeerId}; -use sc_network_common::protocol::ProtocolName; use std::fmt; diff --git a/client/network/common/src/lib.rs b/client/network/common/src/lib.rs index 3a30d24900199..36e67f11e5cff 100644 --- a/client/network/common/src/lib.rs +++ b/client/network/common/src/lib.rs @@ -19,8 +19,16 @@ //! Common data structures of the networking layer. pub mod config; +pub mod error; pub mod message; pub mod protocol; pub mod request_responses; pub mod service; pub mod sync; +pub mod utils; + +/// Minimum Requirements for a Hash within Networking +pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {} + +impl ExHashT for T where T: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static +{} diff --git a/client/network/src/utils.rs b/client/network/common/src/utils.rs similarity index 100% rename from client/network/src/utils.rs rename to client/network/common/src/utils.rs diff --git a/client/network/src/config.rs b/client/network/src/config.rs index d7ca8b48a7c88..6bfa10e0bd398 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -27,24 +27,24 @@ pub use sc_network_common::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, sync::warp::WarpSyncProvider, + ExHashT, }; pub use libp2p::{build_multiaddr, core::PublicKey, identity}; -use crate::ExHashT; - use core::{fmt, iter}; -use futures::future; use libp2p::{ identity::{ed25519, Keypair}, multiaddr, Multiaddr, }; use prometheus_endpoint::Registry; use sc_consensus::ImportQueue; -use sc_network_common::{config::MultiaddrWithPeerId, protocol::ProtocolName, sync::ChainSync}; +use sc_network_common::{ + config::{MultiaddrWithPeerId, NonDefaultSetConfig, SetConfig, TransportConfig}, + sync::ChainSync, +}; use sp_runtime::traits::Block as BlockT; use std::{ - collections::HashMap, error::Error, fs, future::Future, @@ -52,16 +52,14 @@ use std::{ net::Ipv4Addr, path::{Path, PathBuf}, pin::Pin, - str, sync::Arc, }; use zeroize::Zeroize; /// Network initialization parameters. -pub struct Params +pub struct Params where B: BlockT + 'static, - H: ExHashT, { /// Assigned role for our node (full, light, ...). pub role: Role, @@ -70,21 +68,12 @@ where /// default. pub executor: Option + Send>>) + Send>>, - /// How to spawn the background task dedicated to the transactions handler. - pub transactions_handler_executor: Box + Send>>) + Send>, - /// Network layer configuration. pub network_config: NetworkConfiguration, /// Client that contains the blockchain. pub chain: Arc, - /// Pool of transactions. - /// - /// The network worker will fetch transactions from this object in order to propagate them on - /// the network. - pub transaction_pool: Arc>, - /// Legacy name of the protocol to use on the wire. Should be different for each chain. pub protocol_id: ProtocolId, @@ -166,66 +155,6 @@ impl fmt::Display for Role { } } -/// Result of the transaction import. -#[derive(Clone, Copy, Debug)] -pub enum TransactionImport { - /// Transaction is good but already known by the transaction pool. - KnownGood, - /// Transaction is good and not yet known. - NewGood, - /// Transaction is invalid. - Bad, - /// Transaction import was not performed. - None, -} - -/// Future resolving to transaction import result. -pub type TransactionImportFuture = Pin + Send>>; - -/// Transaction pool interface -pub trait TransactionPool: Send + Sync { - /// Get transactions from the pool that are ready to be propagated. - fn transactions(&self) -> Vec<(H, B::Extrinsic)>; - /// Get hash of transaction. - fn hash_of(&self, transaction: &B::Extrinsic) -> H; - /// Import a transaction into the pool. - /// - /// This will return future. - fn import(&self, transaction: B::Extrinsic) -> TransactionImportFuture; - /// Notify the pool about transactions broadcast. - fn on_broadcasted(&self, propagations: HashMap>); - /// Get transaction by hash. - fn transaction(&self, hash: &H) -> Option; -} - -/// Dummy implementation of the [`TransactionPool`] trait for a transaction pool that is always -/// empty and discards all incoming transactions. -/// -/// Requires the "hash" type to implement the `Default` trait. -/// -/// Useful for testing purposes. -pub struct EmptyTransactionPool; - -impl TransactionPool for EmptyTransactionPool { - fn transactions(&self) -> Vec<(H, B::Extrinsic)> { - Vec::new() - } - - fn hash_of(&self, _transaction: &B::Extrinsic) -> H { - Default::default() - } - - fn import(&self, _transaction: B::Extrinsic) -> TransactionImportFuture { - Box::pin(future::ready(TransactionImport::KnownGood)) - } - - fn on_broadcasted(&self, _: HashMap>) {} - - fn transaction(&self, _h: &H) -> Option { - None - } -} - /// Sync operation mode. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum SyncMode { @@ -394,132 +323,6 @@ impl NetworkConfiguration { } } -/// Configuration for a set of nodes. -#[derive(Clone, Debug)] -pub struct SetConfig { - /// Maximum allowed number of incoming substreams related to this set. - pub in_peers: u32, - /// Number of outgoing substreams related to this set that we're trying to maintain. - pub out_peers: u32, - /// List of reserved node addresses. - pub reserved_nodes: Vec, - /// Whether nodes that aren't in [`SetConfig::reserved_nodes`] are accepted or automatically - /// refused. - pub non_reserved_mode: NonReservedPeerMode, -} - -impl Default for SetConfig { - fn default() -> Self { - Self { - in_peers: 25, - out_peers: 75, - reserved_nodes: Vec::new(), - non_reserved_mode: NonReservedPeerMode::Accept, - } - } -} - -/// Extension to [`SetConfig`] for sets that aren't the default set. -/// -/// > **Note**: As new fields might be added in the future, please consider using the `new` method -/// > and modifiers instead of creating this struct manually. -#[derive(Clone, Debug)] -pub struct NonDefaultSetConfig { - /// Name of the notifications protocols of this set. A substream on this set will be - /// considered established once this protocol is open. - /// - /// > **Note**: This field isn't present for the default set, as this is handled internally - /// > by the networking code. - pub notifications_protocol: ProtocolName, - /// If the remote reports that it doesn't support the protocol indicated in the - /// `notifications_protocol` field, then each of these fallback names will be tried one by - /// one. - /// - /// If a fallback is used, it will be reported in - /// [`crate::Event::NotificationStreamOpened::negotiated_fallback`]. - pub fallback_names: Vec, - /// Maximum allowed size of single notifications. - pub max_notification_size: u64, - /// Base configuration. - pub set_config: SetConfig, -} - -impl NonDefaultSetConfig { - /// Creates a new [`NonDefaultSetConfig`]. Zero slots and accepts only reserved nodes. - pub fn new(notifications_protocol: ProtocolName, max_notification_size: u64) -> Self { - Self { - notifications_protocol, - max_notification_size, - fallback_names: Vec::new(), - set_config: SetConfig { - in_peers: 0, - out_peers: 0, - reserved_nodes: Vec::new(), - non_reserved_mode: NonReservedPeerMode::Deny, - }, - } - } - - /// Modifies the configuration to allow non-reserved nodes. - pub fn allow_non_reserved(&mut self, in_peers: u32, out_peers: u32) { - self.set_config.in_peers = in_peers; - self.set_config.out_peers = out_peers; - self.set_config.non_reserved_mode = NonReservedPeerMode::Accept; - } - - /// Add a node to the list of reserved nodes. - pub fn add_reserved(&mut self, peer: MultiaddrWithPeerId) { - self.set_config.reserved_nodes.push(peer); - } - - /// Add a list of protocol names used for backward compatibility. - /// - /// See the explanations in [`NonDefaultSetConfig::fallback_names`]. - pub fn add_fallback_names(&mut self, fallback_names: Vec) { - self.fallback_names.extend(fallback_names); - } -} - -/// Configuration for the transport layer. -#[derive(Clone, Debug)] -pub enum TransportConfig { - /// Normal transport mode. - Normal { - /// If true, the network will use mDNS to discover other libp2p nodes on the local network - /// and connect to them if they support the same chain. - enable_mdns: bool, - - /// If true, allow connecting to private IPv4 addresses (as defined in - /// [RFC1918](https://tools.ietf.org/html/rfc1918)). Irrelevant for addresses that have - /// been passed in [`NetworkConfiguration::boot_nodes`]. - allow_private_ipv4: bool, - }, - - /// Only allow connections within the same process. - /// Only addresses of the form `/memory/...` will be supported. - MemoryOnly, -} - -/// The policy for connections to non-reserved peers. -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum NonReservedPeerMode { - /// Accept them. This is the default. - Accept, - /// Deny them. - Deny, -} - -impl NonReservedPeerMode { - /// Attempt to parse the peer mode from a string. - pub fn parse(s: &str) -> Option { - match s { - "accept" => Some(Self::Accept), - "deny" => Some(Self::Deny), - _ => None, - } - } -} - /// The configuration of a node's secret key, describing the type of key /// and how it is obtained. A node's identity keypair is the result of /// the evaluation of the node key configuration. diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index ab93662968dc2..8422e34485125 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -46,7 +46,6 @@ //! active mechanism that asks nodes for the addresses they are listening on. Whenever we learn //! of a node's address, you must call `add_self_reported_address`. -use crate::utils::LruHashSet; use futures::prelude::*; use futures_timer::Delay; use ip_network::IpNetwork; @@ -72,7 +71,7 @@ use libp2p::{ }, }; use log::{debug, error, info, trace, warn}; -use sc_network_common::config::ProtocolId; +use sc_network_common::{config::ProtocolId, utils::LruHashSet}; use sp_core::hexdisplay::HexDisplay; use std::{ cmp, diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 320104d0f9554..b78947e427699 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -251,12 +251,9 @@ mod protocol; mod request_responses; mod service; mod transport; -mod utils; pub mod config; -pub mod error; pub mod network_state; -pub mod transactions; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; @@ -295,9 +292,3 @@ const MAX_CONNECTIONS_PER_PEER: usize = 2; /// The maximum number of concurrent established connections that were incoming. const MAX_CONNECTIONS_ESTABLISHED_INCOMING: u32 = 10_000; - -/// Minimum Requirements for a Hash within Networking -pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {} - -impl ExHashT for T where T: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static -{} diff --git a/client/network/src/peer_info.rs b/client/network/src/peer_info.rs index d668cb25ea455..c62c2ea1c5d98 100644 --- a/client/network/src/peer_info.rs +++ b/client/network/src/peer_info.rs @@ -16,7 +16,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::utils::interval; use fnv::FnvHashMap; use futures::prelude::*; use libp2p::{ @@ -33,6 +32,7 @@ use libp2p::{ Multiaddr, }; use log::{debug, error, trace}; +use sc_network_common::utils::interval; use smallvec::SmallVec; use std::{ collections::hash_map::Entry, diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 9bcf363cd0692..9e88b02202c85 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -16,10 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{ - config, error, - utils::{interval, LruHashSet}, -}; +use crate::config; use bytes::Bytes; use codec::{Decode, DecodeAll, Encode}; @@ -43,7 +40,8 @@ use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Regi use sc_client_api::HeaderBackend; use sc_consensus::import_queue::{BlockImportError, BlockImportStatus, IncomingBlock, Origin}; use sc_network_common::{ - config::ProtocolId, + config::{NonReservedPeerMode, ProtocolId}, + error, protocol::ProtocolName, request_responses::RequestFailure, sync::{ @@ -55,6 +53,7 @@ use sc_network_common::{ OpaqueBlockResponse, OpaqueStateRequest, OpaqueStateResponse, PollBlockAnnounceValidation, SyncStatus, }, + utils::{interval, LruHashSet}, }; use sp_arithmetic::traits::SaturatedConversion; use sp_consensus::BlockOrigin; @@ -339,7 +338,7 @@ where bootnodes, reserved_nodes: default_sets_reserved.clone(), reserved_only: network_config.default_peers_set.non_reserved_mode == - config::NonReservedPeerMode::Deny, + NonReservedPeerMode::Deny, }); for set_cfg in &network_config.extra_sets { @@ -350,7 +349,7 @@ where } let reserved_only = - set_cfg.set_config.non_reserved_mode == config::NonReservedPeerMode::Deny; + set_cfg.set_config.non_reserved_mode == NonReservedPeerMode::Deny; sets.push(sc_peerset::SetConfig { in_peers: set_cfg.set_config.in_peers, diff --git a/client/network/src/protocol/message.rs b/client/network/src/protocol/message.rs index 50c4a264a5f95..9b449d6d7d994 100644 --- a/client/network/src/protocol/message.rs +++ b/client/network/src/protocol/message.rs @@ -36,9 +36,6 @@ pub type Message = generic::Message< ::Extrinsic, >; -/// A set of transactions. -pub type Transactions = Vec; - /// Remote call response. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] pub struct RemoteCallResponse { @@ -59,7 +56,7 @@ pub struct RemoteReadResponse { /// Generic types. pub mod generic { - use super::{RemoteCallResponse, RemoteReadResponse, Transactions}; + use super::{RemoteCallResponse, RemoteReadResponse}; use bitflags::bitflags; use codec::{Decode, Encode, Input, Output}; use sc_client_api::StorageProof; @@ -147,7 +144,7 @@ pub mod generic { /// Block announce. BlockAnnounce(BlockAnnounce
), /// Transactions. - Transactions(Transactions), + Transactions, /// Consensus protocol message. Consensus(ConsensusMessage), /// Remote method call request. diff --git a/client/network/src/service.rs b/client/network/src/service.rs index fb98db3251647..6017543e528e7 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -29,9 +29,8 @@ use crate::{ behaviour::{self, Behaviour, BehaviourOut}, - config::{Params, TransportConfig}, + config::Params, discovery::DiscoveryConfig, - error::Error, network_state::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, @@ -39,7 +38,7 @@ use crate::{ self, message::generic::Roles, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, Ready, }, - transactions, transport, ExHashT, ReputationChange, + transport, ReputationChange, }; use codec::Encode as _; @@ -60,7 +59,8 @@ use metrics::{Histogram, HistogramVec, MetricSources, Metrics}; use parking_lot::Mutex; use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link}; use sc_network_common::{ - config::MultiaddrWithPeerId, + config::{MultiaddrWithPeerId, TransportConfig}, + error::Error, protocol::{ event::{DhtEvent, Event}, ProtocolName, @@ -73,6 +73,7 @@ use sc_network_common::{ NotificationSenderReady as NotificationSenderReadyT, Signature, SigningError, }, sync::{SyncState, SyncStatus}, + ExHashT, }; use sc_peerset::PeersetHandle; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; @@ -144,7 +145,7 @@ where /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new(mut params: Params) -> Result { + pub fn new(mut params: Params) -> Result { // Private and public keys configuration. let local_identity = params.network_config.node_key.clone().into_keypair()?; let local_public = local_identity.public(); @@ -210,21 +211,6 @@ where fs::create_dir_all(path)?; } - let transactions_handler_proto = transactions::TransactionsHandlerPrototype::new( - params.protocol_id.clone(), - params - .chain - .hash(0u32.into()) - .ok() - .flatten() - .expect("Genesis block exists; qed"), - params.fork_id.clone(), - ); - params - .network_config - .extra_sets - .insert(0, transactions_handler_proto.set_config()); - info!( target: "sub-libp2p", "🏷 Local node identity is: {}", @@ -239,11 +225,8 @@ where params.protocol_id.clone(), ¶ms.fork_id, ¶ms.network_config, - iter::once(Vec::new()) - .chain( - (0..params.network_config.extra_sets.len() - 1) - .map(|_| default_notif_handshake_message.clone()), - ) + (0..params.network_config.extra_sets.len()) + .map(|_| default_notif_handshake_message.clone()) .collect(), params.metrics_registry.as_ref(), params.chain_sync, @@ -460,13 +443,6 @@ where _marker: PhantomData, }); - let (tx_handler, tx_handler_controller) = transactions_handler_proto.build( - service.clone(), - params.transaction_pool, - params.metrics_registry.as_ref(), - )?; - (params.transactions_handler_executor)(tx_handler.run().boxed()); - Ok(NetworkWorker { external_addresses, num_connected, @@ -477,9 +453,9 @@ where from_service, event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?, peers_notifications_sinks, - tx_handler_controller, metrics, boot_node_ids, + _marker: Default::default(), }) } @@ -1314,8 +1290,9 @@ where /// For each peer and protocol combination, an object that allows sending notifications to /// that peer. Shared with the [`NetworkService`]. peers_notifications_sinks: Arc>>, - /// Controller for the handler of incoming and outgoing transactions. - tx_handler_controller: transactions::TransactionsHandlerController, + /// Marker to pin the `H` generic. Serves no purpose except to not break backwards + /// compatibility. + _marker: PhantomData, } impl Future for NetworkWorker @@ -1371,10 +1348,8 @@ where .behaviour_mut() .user_protocol_mut() .clear_justification_requests(), - ServiceToWorkerMsg::PropagateTransaction(hash) => - this.tx_handler_controller.propagate_transaction(hash), - ServiceToWorkerMsg::PropagateTransactions => - this.tx_handler_controller.propagate_transactions(), + ServiceToWorkerMsg::PropagateTransaction(_) => {}, + ServiceToWorkerMsg::PropagateTransactions => {}, ServiceToWorkerMsg::GetValue(key) => this.network_service.behaviour_mut().get_value(key), ServiceToWorkerMsg::PutValue(key, value) => @@ -1917,8 +1892,6 @@ where SyncState::Downloading => true, }; - this.tx_handler_controller.set_gossip_enabled(!is_major_syncing); - this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); if let Some(metrics) = this.metrics.as_ref() { diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index a9505c5341c3d..c8f137f79c6dc 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -21,7 +21,7 @@ use crate::{config, NetworkService, NetworkWorker}; use futures::prelude::*; use libp2p::PeerId; use sc_network_common::{ - config::{MultiaddrWithPeerId, ProtocolId}, + config::{MultiaddrWithPeerId, NonDefaultSetConfig, ProtocolId, SetConfig, TransportConfig}, protocol::event::Event, service::{NetworkEventStream, NetworkNotification, NetworkPeers, NetworkStateInfo}, }; @@ -135,12 +135,8 @@ fn build_test_full_node( let worker = NetworkWorker::new(config::Params { role: config::Role::Full, executor: None, - transactions_handler_executor: Box::new(|task| { - async_std::task::spawn(task); - }), network_config, chain: client.clone(), - transaction_pool: Arc::new(config::EmptyTransactionPool), protocol_id, fork_id, import_queue, @@ -178,23 +174,23 @@ fn build_nodes_one_proto() -> ( let listen_addr = config::build_multiaddr![Memory(rand::random::())]; let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration { - extra_sets: vec![config::NonDefaultSetConfig { + extra_sets: vec![NonDefaultSetConfig { notifications_protocol: PROTOCOL_NAME.into(), fallback_names: Vec::new(), max_notification_size: 1024 * 1024, set_config: Default::default(), }], listen_addresses: vec![listen_addr.clone()], - transport: config::TransportConfig::MemoryOnly, + transport: TransportConfig::MemoryOnly, ..config::NetworkConfiguration::new_local() }); let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration { - extra_sets: vec![config::NonDefaultSetConfig { + extra_sets: vec![NonDefaultSetConfig { notifications_protocol: PROTOCOL_NAME.into(), fallback_names: Vec::new(), max_notification_size: 1024 * 1024, - set_config: config::SetConfig { + set_config: SetConfig { reserved_nodes: vec![MultiaddrWithPeerId { multiaddr: listen_addr, peer_id: node1.local_peer_id(), @@ -203,7 +199,7 @@ fn build_nodes_one_proto() -> ( }, }], listen_addresses: vec![], - transport: config::TransportConfig::MemoryOnly, + transport: TransportConfig::MemoryOnly, ..config::NetworkConfiguration::new_local() }); @@ -368,13 +364,13 @@ fn lots_of_incoming_peers_works() { let (main_node, _) = build_test_full_node(config::NetworkConfiguration { listen_addresses: vec![listen_addr.clone()], - extra_sets: vec![config::NonDefaultSetConfig { + extra_sets: vec![NonDefaultSetConfig { notifications_protocol: PROTOCOL_NAME.into(), fallback_names: Vec::new(), max_notification_size: 1024 * 1024, - set_config: config::SetConfig { in_peers: u32::MAX, ..Default::default() }, + set_config: SetConfig { in_peers: u32::MAX, ..Default::default() }, }], - transport: config::TransportConfig::MemoryOnly, + transport: TransportConfig::MemoryOnly, ..config::NetworkConfiguration::new_local() }); @@ -387,11 +383,11 @@ fn lots_of_incoming_peers_works() { for _ in 0..32 { let (_dialing_node, event_stream) = build_test_full_node(config::NetworkConfiguration { listen_addresses: vec![], - extra_sets: vec![config::NonDefaultSetConfig { + extra_sets: vec![NonDefaultSetConfig { notifications_protocol: PROTOCOL_NAME.into(), fallback_names: Vec::new(), max_notification_size: 1024 * 1024, - set_config: config::SetConfig { + set_config: SetConfig { reserved_nodes: vec![MultiaddrWithPeerId { multiaddr: listen_addr.clone(), peer_id: main_node_peer_id, @@ -399,7 +395,7 @@ fn lots_of_incoming_peers_works() { ..Default::default() }, }], - transport: config::TransportConfig::MemoryOnly, + transport: TransportConfig::MemoryOnly, ..config::NetworkConfiguration::new_local() }); @@ -504,23 +500,23 @@ fn fallback_name_working() { let listen_addr = config::build_multiaddr![Memory(rand::random::())]; let (node1, mut events_stream1) = build_test_full_node(config::NetworkConfiguration { - extra_sets: vec![config::NonDefaultSetConfig { + extra_sets: vec![NonDefaultSetConfig { notifications_protocol: NEW_PROTOCOL_NAME.into(), fallback_names: vec![PROTOCOL_NAME.into()], max_notification_size: 1024 * 1024, set_config: Default::default(), }], listen_addresses: vec![listen_addr.clone()], - transport: config::TransportConfig::MemoryOnly, + transport: TransportConfig::MemoryOnly, ..config::NetworkConfiguration::new_local() }); let (_, mut events_stream2) = build_test_full_node(config::NetworkConfiguration { - extra_sets: vec![config::NonDefaultSetConfig { + extra_sets: vec![NonDefaultSetConfig { notifications_protocol: PROTOCOL_NAME.into(), fallback_names: Vec::new(), max_notification_size: 1024 * 1024, - set_config: config::SetConfig { + set_config: SetConfig { reserved_nodes: vec![MultiaddrWithPeerId { multiaddr: listen_addr, peer_id: node1.local_peer_id(), @@ -529,7 +525,7 @@ fn fallback_name_working() { }, }], listen_addresses: vec![], - transport: config::TransportConfig::MemoryOnly, + transport: TransportConfig::MemoryOnly, ..config::NetworkConfiguration::new_local() }); @@ -572,7 +568,7 @@ fn ensure_listen_addresses_consistent_with_transport_memory() { let _ = build_test_full_node(config::NetworkConfiguration { listen_addresses: vec![listen_addr.clone()], - transport: config::TransportConfig::MemoryOnly, + transport: TransportConfig::MemoryOnly, ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) }); } @@ -599,7 +595,7 @@ fn ensure_boot_node_addresses_consistent_with_transport_memory() { let _ = build_test_full_node(config::NetworkConfiguration { listen_addresses: vec![listen_addr.clone()], - transport: config::TransportConfig::MemoryOnly, + transport: TransportConfig::MemoryOnly, boot_nodes: vec![boot_node], ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) }); @@ -632,11 +628,8 @@ fn ensure_reserved_node_addresses_consistent_with_transport_memory() { let _ = build_test_full_node(config::NetworkConfiguration { listen_addresses: vec![listen_addr.clone()], - transport: config::TransportConfig::MemoryOnly, - default_peers_set: config::SetConfig { - reserved_nodes: vec![reserved_node], - ..Default::default() - }, + transport: TransportConfig::MemoryOnly, + default_peers_set: SetConfig { reserved_nodes: vec![reserved_node], ..Default::default() }, ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) }); } @@ -652,10 +645,7 @@ fn ensure_reserved_node_addresses_consistent_with_transport_not_memory() { let _ = build_test_full_node(config::NetworkConfiguration { listen_addresses: vec![listen_addr.clone()], - default_peers_set: config::SetConfig { - reserved_nodes: vec![reserved_node], - ..Default::default() - }, + default_peers_set: SetConfig { reserved_nodes: vec![reserved_node], ..Default::default() }, ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) }); } @@ -668,7 +658,7 @@ fn ensure_public_addresses_consistent_with_transport_memory() { let _ = build_test_full_node(config::NetworkConfiguration { listen_addresses: vec![listen_addr.clone()], - transport: config::TransportConfig::MemoryOnly, + transport: TransportConfig::MemoryOnly, public_addresses: vec![public_address], ..config::NetworkConfiguration::new("test-node", "test-client", Default::default(), None) }); diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 5a29e587ceff5..c0830087ebd31 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -47,16 +47,14 @@ use sc_consensus::{ ForkChoiceStrategy, ImportResult, JustificationImport, JustificationSyncLink, LongestChain, Verifier, }; -pub use sc_network::config::EmptyTransactionPool; use sc_network::{ - config::{ - NetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, Role, SyncMode, - TransportConfig, - }, + config::{NetworkConfiguration, Role, SyncMode}, Multiaddr, NetworkService, NetworkWorker, }; use sc_network_common::{ - config::{MultiaddrWithPeerId, ProtocolId}, + config::{ + MultiaddrWithPeerId, NonDefaultSetConfig, NonReservedPeerMode, ProtocolId, TransportConfig, + }, protocol::ProtocolName, service::{NetworkBlock, NetworkStateInfo, NetworkSyncForkRequest}, sync::warp::{AuthorityList, EncodedProof, SetId, VerificationResult, WarpSyncProvider}, @@ -872,12 +870,8 @@ where let network = NetworkWorker::new(sc_network::config::Params { role: if config.is_authority { Role::Authority } else { Role::Full }, executor: None, - transactions_handler_executor: Box::new(|task| { - async_std::task::spawn(task); - }), network_config, chain: client.clone(), - transaction_pool: Arc::new(EmptyTransactionPool), protocol_id, fork_id, import_queue, diff --git a/client/network/transactions/Cargo.toml b/client/network/transactions/Cargo.toml new file mode 100644 index 0000000000000..b9a1627c12dc0 --- /dev/null +++ b/client/network/transactions/Cargo.toml @@ -0,0 +1,27 @@ +[package] +description = "Substrate transaction protocol" +name = "sc-network-transactions" +version = "0.10.0-dev" +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" +authors = ["Parity Technologies "] +edition = "2021" +homepage = "https://substrate.io" +repository = "https://github.com/paritytech/substrate/" +documentation = "https://docs.rs/sc-network-transactions" +readme = "README.md" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[dependencies] +codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] } +futures = "0.3.21" +hex = "0.4.0" +libp2p = "0.46.1" +log = "0.4.17" +pin-project = "1.0.10" +prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } +sc-network-common = { version = "0.10.0-dev", path = "../common" } +sc-peerset = { version = "4.0.0-dev", path = "../../peerset" } +sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" } +sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } diff --git a/client/network/transactions/src/config.rs b/client/network/transactions/src/config.rs new file mode 100644 index 0000000000000..abb8cccd301ac --- /dev/null +++ b/client/network/transactions/src/config.rs @@ -0,0 +1,98 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Configuration of the transaction protocol + +use futures::prelude::*; +use sc_network_common::ExHashT; +use sp_runtime::traits::Block as BlockT; +use std::{collections::HashMap, future::Future, pin::Pin, time}; + +/// Interval at which we propagate transactions; +pub(crate) const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900); + +/// Maximum number of known transaction hashes to keep for a peer. +/// +/// This should be approx. 2 blocks full of transactions for the network to function properly. +pub(crate) const MAX_KNOWN_TRANSACTIONS: usize = 10240; // ~300kb per peer + overhead. + +/// Maximum allowed size for a transactions notification. +pub(crate) const MAX_TRANSACTIONS_SIZE: u64 = 16 * 1024 * 1024; + +/// Maximum number of transaction validation request we keep at any moment. +pub(crate) const MAX_PENDING_TRANSACTIONS: usize = 8192; + +/// Result of the transaction import. +#[derive(Clone, Copy, Debug)] +pub enum TransactionImport { + /// Transaction is good but already known by the transaction pool. + KnownGood, + /// Transaction is good and not yet known. + NewGood, + /// Transaction is invalid. + Bad, + /// Transaction import was not performed. + None, +} + +/// Future resolving to transaction import result. +pub type TransactionImportFuture = Pin + Send>>; + +/// Transaction pool interface +pub trait TransactionPool: Send + Sync { + /// Get transactions from the pool that are ready to be propagated. + fn transactions(&self) -> Vec<(H, B::Extrinsic)>; + /// Get hash of transaction. + fn hash_of(&self, transaction: &B::Extrinsic) -> H; + /// Import a transaction into the pool. + /// + /// This will return future. + fn import(&self, transaction: B::Extrinsic) -> TransactionImportFuture; + /// Notify the pool about transactions broadcast. + fn on_broadcasted(&self, propagations: HashMap>); + /// Get transaction by hash. + fn transaction(&self, hash: &H) -> Option; +} + +/// Dummy implementation of the [`TransactionPool`] trait for a transaction pool that is always +/// empty and discards all incoming transactions. +/// +/// Requires the "hash" type to implement the `Default` trait. +/// +/// Useful for testing purposes. +pub struct EmptyTransactionPool; + +impl TransactionPool for EmptyTransactionPool { + fn transactions(&self) -> Vec<(H, B::Extrinsic)> { + Vec::new() + } + + fn hash_of(&self, _transaction: &B::Extrinsic) -> H { + Default::default() + } + + fn import(&self, _transaction: B::Extrinsic) -> TransactionImportFuture { + Box::pin(future::ready(TransactionImport::KnownGood)) + } + + fn on_broadcasted(&self, _: HashMap>) {} + + fn transaction(&self, _h: &H) -> Option { + None + } +} diff --git a/client/network/src/transactions.rs b/client/network/transactions/src/lib.rs similarity index 85% rename from client/network/src/transactions.rs rename to client/network/transactions/src/lib.rs index 1cf532f33ddc6..80e6434a4b68e 100644 --- a/client/network/src/transactions.rs +++ b/client/network/transactions/src/lib.rs @@ -26,27 +26,22 @@ //! - Use [`TransactionsHandlerPrototype::build`] then [`TransactionsHandler::run`] to obtain a //! `Future` that processes transactions. -use crate::{ - config::{self, TransactionImport, TransactionImportFuture, TransactionPool}, - error, - protocol::message, - service::NetworkService, - utils::{interval, LruHashSet}, - ExHashT, -}; - +use crate::config::*; use codec::{Decode, Encode}; use futures::{channel::mpsc, prelude::*, stream::FuturesUnordered}; use libp2p::{multiaddr, PeerId}; use log::{debug, trace, warn}; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use sc_network_common::{ - config::ProtocolId, + config::{NonDefaultSetConfig, NonReservedPeerMode, ProtocolId, SetConfig}, + error, protocol::{ event::{Event, ObservedRole}, ProtocolName, }, service::{NetworkEventStream, NetworkNotification, NetworkPeers}, + utils::{interval, LruHashSet}, + ExHashT, }; use sp_runtime::traits::Block as BlockT; use std::{ @@ -54,27 +49,14 @@ use std::{ iter, num::NonZeroUsize, pin::Pin, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::Arc, task::Poll, - time, }; -/// Interval at which we propagate transactions; -const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900); - -/// Maximum number of known transaction hashes to keep for a peer. -/// -/// This should be approx. 2 blocks full of transactions for the network to function properly. -const MAX_KNOWN_TRANSACTIONS: usize = 10240; // ~300kb per peer + overhead. +pub mod config; -/// Maximum allowed size for a transactions notification. -const MAX_TRANSACTIONS_SIZE: u64 = 16 * 1024 * 1024; - -/// Maximum number of transaction validation request we keep at any moment. -const MAX_PENDING_TRANSACTIONS: usize = 8192; +/// A set of transactions. +pub type Transactions = Vec; mod rep { use sc_peerset::ReputationChange as Rep; @@ -141,7 +123,7 @@ impl TransactionsHandlerPrototype { pub fn new>( protocol_id: ProtocolId, genesis_hash: Hash, - fork_id: Option, + fork_id: Option<&str>, ) -> Self { let protocol_name = if let Some(fork_id) = fork_id { format!("/{}/{}/transactions/1", hex::encode(genesis_hash), fork_id) @@ -157,16 +139,16 @@ impl TransactionsHandlerPrototype { } /// Returns the configuration of the set to put in the network configuration. - pub fn set_config(&self) -> config::NonDefaultSetConfig { - config::NonDefaultSetConfig { + pub fn set_config(&self) -> NonDefaultSetConfig { + NonDefaultSetConfig { notifications_protocol: self.protocol_name.clone(), fallback_names: self.fallback_protocol_names.clone(), max_notification_size: MAX_TRANSACTIONS_SIZE, - set_config: config::SetConfig { + set_config: SetConfig { in_peers: 0, out_peers: 0, reserved_nodes: Vec::new(), - non_reserved_mode: config::NonReservedPeerMode::Deny, + non_reserved_mode: NonReservedPeerMode::Deny, }, } } @@ -175,23 +157,25 @@ impl TransactionsHandlerPrototype { /// the behaviour of the handler while it's running. /// /// Important: the transactions handler is initially disabled and doesn't gossip transactions. - /// You must call [`TransactionsHandlerController::set_gossip_enabled`] to enable it. - pub fn build( + /// You must call `TransactionsHandlerController::set_gossip_enabled` to enable it. + pub fn build< + B: BlockT + 'static, + H: ExHashT, + S: NetworkPeers + NetworkEventStream + NetworkNotification + sp_consensus::SyncOracle, + >( self, - service: Arc>, + service: S, transaction_pool: Arc>, metrics_registry: Option<&Registry>, - ) -> error::Result<(TransactionsHandler, TransactionsHandlerController)> { + ) -> error::Result<(TransactionsHandler, TransactionsHandlerController)> { let event_stream = service.event_stream("transactions-handler"); let (to_handler, from_controller) = mpsc::unbounded(); - let gossip_enabled = Arc::new(AtomicBool::new(false)); let handler = TransactionsHandler { protocol_name: self.protocol_name, propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)), pending_transactions: FuturesUnordered::new(), pending_transactions_peers: HashMap::new(), - gossip_enabled: gossip_enabled.clone(), service, event_stream, peers: HashMap::new(), @@ -204,7 +188,7 @@ impl TransactionsHandlerPrototype { }, }; - let controller = TransactionsHandlerController { to_handler, gossip_enabled }; + let controller = TransactionsHandlerController { to_handler }; Ok((handler, controller)) } @@ -213,15 +197,9 @@ impl TransactionsHandlerPrototype { /// Controls the behaviour of a [`TransactionsHandler`] it is connected to. pub struct TransactionsHandlerController { to_handler: mpsc::UnboundedSender>, - gossip_enabled: Arc, } impl TransactionsHandlerController { - /// Controls whether transactions are being gossiped on the network. - pub fn set_gossip_enabled(&mut self, enabled: bool) { - self.gossip_enabled.store(enabled, Ordering::Relaxed); - } - /// You may call this when new transactions are imported by the transaction pool. /// /// All transactions will be fetched from the `TransactionPool` that was passed at @@ -245,7 +223,11 @@ enum ToHandler { } /// Handler for transactions. Call [`TransactionsHandler::run`] to start the processing. -pub struct TransactionsHandler { +pub struct TransactionsHandler< + B: BlockT + 'static, + H: ExHashT, + S: NetworkPeers + NetworkEventStream + NetworkNotification + sp_consensus::SyncOracle, +> { protocol_name: ProtocolName, /// Interval at which we call `propagate_transactions`. propagate_timeout: Pin + Send>>, @@ -257,13 +239,12 @@ pub struct TransactionsHandler { /// multiple times concurrently. pending_transactions_peers: HashMap>, /// Network service to use to send messages and manage peers. - service: Arc>, + service: S, /// Stream of networking events. event_stream: Pin + Send>>, // All connected peers peers: HashMap>, transaction_pool: Arc>, - gossip_enabled: Arc, from_controller: mpsc::UnboundedReceiver>, /// Prometheus metrics. metrics: Option, @@ -277,7 +258,12 @@ struct Peer { role: ObservedRole, } -impl TransactionsHandler { +impl TransactionsHandler +where + B: BlockT + 'static, + H: ExHashT, + S: NetworkPeers + NetworkEventStream + NetworkNotification + sp_consensus::SyncOracle, +{ /// Turns the [`TransactionsHandler`] into a future that should run forever and not be /// interrupted. pub async fn run(mut self) { @@ -359,9 +345,9 @@ impl TransactionsHandler { continue } - if let Ok(m) = as Decode>::decode( - &mut message.as_ref(), - ) { + if let Ok(m) = + as Decode>::decode(&mut message.as_ref()) + { self.on_transactions(remote, m); } else { warn!(target: "sub-libp2p", "Failed to decode transactions list"); @@ -375,9 +361,9 @@ impl TransactionsHandler { } /// Called when peer sends us new transactions - fn on_transactions(&mut self, who: PeerId, transactions: message::Transactions) { - // Accept transactions only when enabled - if !self.gossip_enabled.load(Ordering::Relaxed) { + fn on_transactions(&mut self, who: PeerId, transactions: Transactions) { + // Accept transactions only when node is not major syncing + if self.service.is_major_syncing() { trace!(target: "sync", "{} Ignoring transactions while disabled", who); return } @@ -427,10 +413,11 @@ impl TransactionsHandler { /// Propagate one transaction. pub fn propagate_transaction(&mut self, hash: &H) { - // Accept transactions only when enabled - if !self.gossip_enabled.load(Ordering::Relaxed) { + // Accept transactions only when node is not major syncing + if self.service.is_major_syncing() { return } + debug!(target: "sync", "Propagating transaction [{:?}]", hash); if let Some(transaction) = self.transaction_pool.transaction(hash) { let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]); @@ -478,10 +465,11 @@ impl TransactionsHandler { /// Call when we must propagate ready transactions to peers. fn propagate_transactions(&mut self) { - // Accept transactions only when enabled - if !self.gossip_enabled.load(Ordering::Relaxed) { + // Accept transactions only when node is not major syncing + if self.service.is_major_syncing() { return } + debug!(target: "sync", "Propagating transactions"); let transactions = self.transaction_pool.transactions(); let propagated_to = self.do_propagate_transactions(&transactions); diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index 76c61fe17f0da..3305c377343d8 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -55,6 +55,7 @@ sc-network-bitswap = { version = "0.10.0-dev", path = "../network/bitswap" } sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-network-light = { version = "0.10.0-dev", path = "../network/light" } sc-network-sync = { version = "0.10.0-dev", path = "../network/sync" } +sc-network-transactions = { version = "0.10.0-dev", path = "../network/transactions" } sc-chain-spec = { version = "4.0.0-dev", path = "../chain-spec" } sc-client-api = { version = "4.0.0-dev", path = "../api" } sp-api = { version = "4.0.0-dev", path = "../../primitives/api" } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 5a2f4cf978b41..e21c9c6e6d727 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -368,6 +368,9 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> { pub network: Arc>, /// A Sender for RPC requests. pub system_rpc_tx: TracingUnboundedSender>, + /// Controller for transactions handlers + pub tx_handler_controller: + sc_network_transactions::TransactionsHandlerController<::Hash>, /// Telemetry instance for this node. pub telemetry: Option<&'a mut Telemetry>, } @@ -446,6 +449,7 @@ where rpc_builder, network, system_rpc_tx, + tx_handler_controller, telemetry, } = params; @@ -481,7 +485,11 @@ where spawn_handle.spawn( "on-transaction-imported", Some("transaction-pool"), - transaction_notifications(transaction_pool.clone(), network.clone(), telemetry.clone()), + transaction_notifications( + transaction_pool.clone(), + tx_handler_controller, + telemetry.clone(), + ), ); // Prometheus metrics. @@ -544,20 +552,21 @@ where Ok(rpc_handlers) } -async fn transaction_notifications( +async fn transaction_notifications( transaction_pool: Arc, - network: Network, + tx_handler_controller: sc_network_transactions::TransactionsHandlerController< + ::Hash, + >, telemetry: Option, ) where Block: BlockT, ExPool: MaintainedTransactionPool::Hash>, - Network: NetworkTransaction<::Hash> + Send + Sync, { // transaction notifications transaction_pool .import_notification_stream() .for_each(move |hash| { - network.propagate_transaction(hash); + tx_handler_controller.propagate_transaction(hash); let status = transaction_pool.status(); telemetry!( telemetry; @@ -719,6 +728,7 @@ pub fn build_network( ( Arc::Hash>>, TracingUnboundedSender>, + sc_network_transactions::TransactionsHandlerController<::Hash>, NetworkStarter, ), Error, @@ -761,9 +771,6 @@ where } } - let transaction_pool_adapter = - Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }); - let protocol_id = config.protocol_id(); let block_announce_validator = if let Some(f) = block_announce_validator_builder { @@ -845,7 +852,7 @@ where protocol_config })); - let network_params = sc_network::config::Params { + let mut network_params = sc_network::config::Params { role: config.role.clone(), executor: { let spawn_handle = Clone::clone(&spawn_handle); @@ -853,16 +860,9 @@ where spawn_handle.spawn("libp2p-node", Some("networking"), fut); })) }, - transactions_handler_executor: { - let spawn_handle = Clone::clone(&spawn_handle); - Box::new(move |fut| { - spawn_handle.spawn("network-transactions-handler", Some("networking"), fut); - }) - }, network_config: config.network.clone(), chain: client.clone(), - transaction_pool: transaction_pool_adapter as _, - protocol_id, + protocol_id: protocol_id.clone(), fork_id: config.chain_spec.fork_id().map(ToOwned::to_owned), import_queue: Box::new(import_queue), chain_sync: Box::new(chain_sync), @@ -877,10 +877,32 @@ where .collect::>(), }; + // crate transactions protocol and add it to the list of supported protocols of `network_params` + let transactions_handler_proto = sc_network_transactions::TransactionsHandlerPrototype::new( + protocol_id.clone(), + client + .block_hash(0u32.into()) + .ok() + .flatten() + .expect("Genesis block exists; qed"), + config.chain_spec.fork_id(), + ); + network_params + .network_config + .extra_sets + .insert(0, transactions_handler_proto.set_config()); + let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); let network_mut = sc_network::NetworkWorker::new(network_params)?; let network = network_mut.service().clone(); + let (tx_handler, tx_handler_controller) = transactions_handler_proto.build( + network.clone(), + Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }), + config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(), + )?; + spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run()); + let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc"); let future = build_network_future( @@ -928,7 +950,7 @@ where future.await }); - Ok((network, system_rpc_tx, NetworkStarter(network_start_tx))) + Ok((network, system_rpc_tx, tx_handler_controller, NetworkStarter(network_start_tx))) } /// Object used to start the network. diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 44153e3b914f3..bca0697bcbd08 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -24,13 +24,11 @@ pub use sc_executor::WasmExecutionMethod; #[cfg(feature = "wasmtime")] pub use sc_executor::WasmtimeInstantiationStrategy; pub use sc_network::{ - config::{ - NetworkConfiguration, NodeKeyConfig, NonDefaultSetConfig, Role, SetConfig, TransportConfig, - }, + config::{NetworkConfiguration, NodeKeyConfig, Role}, Multiaddr, }; pub use sc_network_common::{ - config::{MultiaddrWithPeerId, ProtocolId}, + config::{MultiaddrWithPeerId, NonDefaultSetConfig, ProtocolId, SetConfig, TransportConfig}, request_responses::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, diff --git a/client/service/src/error.rs b/client/service/src/error.rs index 0d702c7f37b98..001a83922d776 100644 --- a/client/service/src/error.rs +++ b/client/service/src/error.rs @@ -19,7 +19,6 @@ //! Errors that can occur during the service operation. use sc_keystore; -use sc_network; use sp_blockchain; use sp_consensus; @@ -41,7 +40,7 @@ pub enum Error { Consensus(#[from] sp_consensus::Error), #[error(transparent)] - Network(#[from] sc_network::error::Error), + Network(#[from] sc_network_common::error::Error), #[error(transparent)] Keystore(#[from] sc_keystore::Error), diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 19358c1e5bc4c..091b4bbe9fe5f 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -72,7 +72,7 @@ pub use sc_chain_spec::{ pub use sc_consensus::ImportQueue; pub use sc_executor::NativeExecutionDispatch; #[doc(hidden)] -pub use sc_network::config::{TransactionImport, TransactionImportFuture}; +pub use sc_network_transactions::config::{TransactionImport, TransactionImportFuture}; pub use sc_rpc::{ RandomIntegerSubscriptionId, RandomStringSubscriptionId, RpcSubscriptionIdProvider, }; @@ -148,7 +148,7 @@ async fn build_network_future< + Send + Sync + 'static, - H: sc_network::ExHashT, + H: sc_network_common::ExHashT, >( role: Role, mut network: sc_network::NetworkWorker, @@ -415,7 +415,8 @@ where .collect() } -impl sc_network::config::TransactionPool for TransactionPoolAdapter +impl sc_network_transactions::config::TransactionPool + for TransactionPoolAdapter where C: HeaderBackend + BlockBackend diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 11c1cbaf7afb1..1e1b2ecbbc81c 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -22,12 +22,9 @@ use futures::{task::Poll, Future, TryFutureExt as _}; use log::{debug, info}; use parking_lot::Mutex; use sc_client_api::{Backend, CallExecutor}; -use sc_network::{ - config::{NetworkConfiguration, TransportConfig}, - multiaddr, -}; +use sc_network::{config::NetworkConfiguration, multiaddr}; use sc_network_common::{ - config::MultiaddrWithPeerId, + config::{MultiaddrWithPeerId, TransportConfig}, service::{NetworkBlock, NetworkPeers, NetworkStateInfo}, }; use sc_service::{ From 7c86afef9a4216b47d1739154f5ed67978b69ea8 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 14 Sep 2022 13:49:04 +0300 Subject: [PATCH 02/11] Update Cargo.lock --- Cargo.lock | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 34d2296e019ce..f00f87212d780 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -456,6 +456,7 @@ dependencies = [ "sc-finality-grandpa", "sc-keystore", "sc-network", + "sc-network-common", "sc-network-gossip", "sc-network-test", "sc-utils", @@ -7894,6 +7895,7 @@ dependencies = [ "sc-client-db", "sc-keystore", "sc-network", + "sc-network-common", "sc-service", "sc-telemetry", "sc-tracing", @@ -8508,7 +8510,9 @@ dependencies = [ "bitflags", "bytes", "futures", + "futures-timer", "libp2p", + "linked_hash_set", "parity-scale-codec", "prost-build 0.10.4", "sc-consensus", @@ -8519,6 +8523,7 @@ dependencies = [ "sp-consensus", "sp-finality-grandpa", "sp-runtime", + "substrate-prometheus-endpoint", "thiserror", ] @@ -8624,6 +8629,23 @@ dependencies = [ "substrate-test-runtime-client", ] +[[package]] +name = "sc-network-transactions" +version = "0.10.0-dev" +dependencies = [ + "futures", + "hex", + "libp2p", + "log", + "parity-scale-codec", + "pin-project", + "sc-network-common", + "sc-peerset", + "sp-consensus", + "sp-runtime", + "substrate-prometheus-endpoint", +] + [[package]] name = "sc-offchain" version = "4.0.0-dev" @@ -8801,6 +8823,7 @@ dependencies = [ "sc-network-common", "sc-network-light", "sc-network-sync", + "sc-network-transactions", "sc-offchain", "sc-rpc", "sc-rpc-server", From b7fa77bef41620587a44b3f963635ad486dc6dd1 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 14 Sep 2022 14:13:20 +0300 Subject: [PATCH 03/11] Fix binaries --- bin/node-template/node/src/service.rs | 3 ++- bin/node/cli/src/service.rs | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index caa01761636df..b291ebd100404 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -191,7 +191,7 @@ pub fn new_full(mut config: Configuration) -> Result Vec::default(), )); - let (network, system_rpc_tx, network_starter) = + let (network, system_rpc_tx, tx_handler_controller, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, client: client.clone(), @@ -238,6 +238,7 @@ pub fn new_full(mut config: Configuration) -> Result rpc_builder: rpc_extensions_builder, backend, system_rpc_tx, + tx_handler_controller, config, telemetry: telemetry.as_mut(), })?; diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index c058e22fa7a97..53c67d3e9a433 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -354,7 +354,7 @@ pub fn new_full_base( Vec::default(), )); - let (network, system_rpc_tx, network_starter) = + let (network, system_rpc_tx, tx_handler_controller, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { config: &config, client: client.clone(), @@ -392,6 +392,7 @@ pub fn new_full_base( transaction_pool: transaction_pool.clone(), task_manager: &mut task_manager, system_rpc_tx, + tx_handler_controller, telemetry: telemetry.as_mut(), })?; From d3af3206b773f4463eef129b834d441177b17b5e Mon Sep 17 00:00:00 2001 From: Aaro Altonen <48052676+altonen@users.noreply.github.com> Date: Wed, 14 Sep 2022 16:18:27 +0300 Subject: [PATCH 04/11] Update client/network/transactions/src/lib.rs Co-authored-by: Dmitry Markin --- client/network/transactions/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/transactions/src/lib.rs b/client/network/transactions/src/lib.rs index 80e6434a4b68e..f0a3c0b2f7ecf 100644 --- a/client/network/transactions/src/lib.rs +++ b/client/network/transactions/src/lib.rs @@ -364,7 +364,7 @@ where fn on_transactions(&mut self, who: PeerId, transactions: Transactions) { // Accept transactions only when node is not major syncing if self.service.is_major_syncing() { - trace!(target: "sync", "{} Ignoring transactions while disabled", who); + trace!(target: "sync", "{} Ignoring transactions while major syncing", who); return } From 19981497852e0f71c78ad92b3ece55483ac9c039 Mon Sep 17 00:00:00 2001 From: Aaro Altonen <48052676+altonen@users.noreply.github.com> Date: Wed, 14 Sep 2022 16:42:47 +0300 Subject: [PATCH 05/11] Update client/service/src/builder.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/service/src/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index e21c9c6e6d727..cda15645fabbf 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -899,7 +899,7 @@ where let (tx_handler, tx_handler_controller) = transactions_handler_proto.build( network.clone(), Arc::new(TransactionPoolAdapter { pool: transaction_pool, client: client.clone() }), - config.prometheus_config.as_ref().map(|config| config.registry.clone()).as_ref(), + config.prometheus_config.as_ref().map(|config| &config.registry), )?; spawn_handle.spawn("network-transactions-handler", Some("networking"), tx_handler.run()); From 32a1f81c4722451e4f45e8688ea1c34c286d1623 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 14 Sep 2022 16:46:02 +0300 Subject: [PATCH 06/11] Apply review comments --- client/network/common/src/service.rs | 29 -------------------------- client/network/src/lib.rs | 2 +- client/network/src/service.rs | 26 ++++------------------- client/network/transactions/src/lib.rs | 2 +- client/service/src/builder.rs | 10 ++------- 5 files changed, 8 insertions(+), 61 deletions(-) diff --git a/client/network/common/src/service.rs b/client/network/common/src/service.rs index 88583832e4c38..aa4967ba51700 100644 --- a/client/network/common/src/service.rs +++ b/client/network/common/src/service.rs @@ -604,35 +604,6 @@ where } } -/// Provides ability to propagate transactions over the network. -pub trait NetworkTransaction { - /// You may call this when new transactions are imported by the transaction pool. - /// - /// All transactions will be fetched from the `TransactionPool` that was passed at - /// initialization as part of the configuration and propagated to peers. - fn trigger_repropagate(&self); - - /// You must call when new transaction is imported by the transaction pool. - /// - /// This transaction will be fetched from the `TransactionPool` that was passed at - /// initialization as part of the configuration and propagated to peers. - fn propagate_transaction(&self, hash: H); -} - -impl NetworkTransaction for Arc -where - T: ?Sized, - T: NetworkTransaction, -{ - fn trigger_repropagate(&self) { - T::trigger_repropagate(self) - } - - fn propagate_transaction(&self, hash: H) { - T::propagate_transaction(self, hash) - } -} - /// Provides ability to announce blocks to the network. pub trait NetworkBlock { /// Make sure an important block is propagated to peers. diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index b78947e427699..37605e10fcbdf 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -267,7 +267,7 @@ pub use sc_network_common::{ service::{ KademliaKey, NetworkBlock, NetworkDHTProvider, NetworkRequest, NetworkSigner, NetworkStateInfo, NetworkStatus, NetworkStatusProvider, NetworkSyncForkRequest, - NetworkTransaction, Signature, SigningError, + Signature, SigningError, }, sync::{ warp::{WarpSyncPhase, WarpSyncProgress}, diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 6017543e528e7..268c6e4cb9f09 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -102,7 +102,7 @@ mod out_events; mod tests; pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey}; -use sc_network_common::service::{NetworkBlock, NetworkRequest, NetworkTransaction}; +use sc_network_common::service::{NetworkBlock, NetworkRequest}; /// Substrate network service. Handles network IO and manages connectivity. pub struct NetworkService { @@ -122,7 +122,7 @@ pub struct NetworkService { /// nodes it should be connected to or not. peerset: PeersetHandle, /// Channel that sends messages to the actual worker. - to_worker: TracingUnboundedSender>, + to_worker: TracingUnboundedSender>, /// For each peer and protocol combination, an object that allows sending notifications to /// that peer. Updated by the [`NetworkWorker`]. peers_notifications_sinks: Arc>>, @@ -1120,20 +1120,6 @@ where } } -impl NetworkTransaction for NetworkService -where - B: BlockT + 'static, - H: ExHashT, -{ - fn trigger_repropagate(&self) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransactions); - } - - fn propagate_transaction(&self, hash: H) { - let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateTransaction(hash)); - } -} - impl NetworkBlock> for NetworkService where B: BlockT + 'static, @@ -1220,9 +1206,7 @@ impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> { /// Messages sent from the `NetworkService` to the `NetworkWorker`. /// /// Each entry corresponds to a method of `NetworkService`. -enum ServiceToWorkerMsg { - PropagateTransaction(H), - PropagateTransactions, +enum ServiceToWorkerMsg { RequestJustification(B::Hash, NumberFor), ClearJustificationRequests, AnnounceBlock(B::Hash, Option>), @@ -1280,7 +1264,7 @@ where /// The import queue that was passed at initialization. import_queue: Box>, /// Messages from the [`NetworkService`] that must be processed. - from_service: TracingUnboundedReceiver>, + from_service: TracingUnboundedReceiver>, /// Senders for events that happen on the network. event_streams: out_events::OutChannels, /// Prometheus network metrics. @@ -1348,8 +1332,6 @@ where .behaviour_mut() .user_protocol_mut() .clear_justification_requests(), - ServiceToWorkerMsg::PropagateTransaction(_) => {}, - ServiceToWorkerMsg::PropagateTransactions => {}, ServiceToWorkerMsg::GetValue(key) => this.network_service.behaviour_mut().get_value(key), ServiceToWorkerMsg::PutValue(key, value) => diff --git a/client/network/transactions/src/lib.rs b/client/network/transactions/src/lib.rs index f0a3c0b2f7ecf..6f41921758926 100644 --- a/client/network/transactions/src/lib.rs +++ b/client/network/transactions/src/lib.rs @@ -157,7 +157,7 @@ impl TransactionsHandlerPrototype { /// the behaviour of the handler while it's running. /// /// Important: the transactions handler is initially disabled and doesn't gossip transactions. - /// You must call `TransactionsHandlerController::set_gossip_enabled` to enable it. + /// Gossiping is enabled when major syncing is done. pub fn build< B: BlockT + 'static, H: ExHashT, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index cda15645fabbf..a193c089dea94 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -40,7 +40,7 @@ use sc_keystore::LocalKeystore; use sc_network::{config::SyncMode, NetworkService}; use sc_network_bitswap::BitswapRequestHandler; use sc_network_common::{ - service::{NetworkStateInfo, NetworkStatusProvider, NetworkTransaction}, + service::{NetworkStateInfo, NetworkStatusProvider}, sync::warp::WarpSyncProvider, }; use sc_network_light::light_client_requests::handler::LightClientRequestHandler; @@ -326,7 +326,6 @@ where pub trait SpawnTaskNetwork: sc_offchain::NetworkProvider + NetworkStateInfo - + NetworkTransaction + NetworkStatusProvider + Send + Sync @@ -339,7 +338,6 @@ where Block: BlockT, T: sc_offchain::NetworkProvider + NetworkStateInfo - + NetworkTransaction + NetworkStatusProvider + Send + Sync @@ -852,7 +850,7 @@ where protocol_config })); - let mut network_params = sc_network::config::Params { + let network_params = sc_network::config::Params { role: config.role.clone(), executor: { let spawn_handle = Clone::clone(&spawn_handle); @@ -887,10 +885,6 @@ where .expect("Genesis block exists; qed"), config.chain_spec.fork_id(), ); - network_params - .network_config - .extra_sets - .insert(0, transactions_handler_proto.set_config()); let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); let network_mut = sc_network::NetworkWorker::new(network_params)?; From f675f9fd72f3bb389ad0edd4c4afce8424775a28 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 14 Sep 2022 17:07:01 +0300 Subject: [PATCH 07/11] Revert one change and apply cargo-fmt --- client/network/src/lib.rs | 4 ++-- client/service/src/builder.rs | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 37605e10fcbdf..d17f47328b804 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -266,8 +266,8 @@ pub use sc_network_common::{ request_responses::{IfDisconnected, RequestFailure}, service::{ KademliaKey, NetworkBlock, NetworkDHTProvider, NetworkRequest, NetworkSigner, - NetworkStateInfo, NetworkStatus, NetworkStatusProvider, NetworkSyncForkRequest, - Signature, SigningError, + NetworkStateInfo, NetworkStatus, NetworkStatusProvider, NetworkSyncForkRequest, Signature, + SigningError, }, sync::{ warp::{WarpSyncPhase, WarpSyncProgress}, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index a193c089dea94..dfd532a14c172 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -850,7 +850,7 @@ where protocol_config })); - let network_params = sc_network::config::Params { + let mut network_params = sc_network::config::Params { role: config.role.clone(), executor: { let spawn_handle = Clone::clone(&spawn_handle); @@ -885,6 +885,10 @@ where .expect("Genesis block exists; qed"), config.chain_spec.fork_id(), ); + network_params + .network_config + .extra_sets + .insert(0, transactions_handler_proto.set_config()); let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); let network_mut = sc_network::NetworkWorker::new(network_params)?; From cda101a33ea37aae6a894e2c6a34af280de3dbf7 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 21 Sep 2022 08:14:04 +0300 Subject: [PATCH 08/11] Remove Transaction from Message --- client/network/src/protocol/message.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/network/src/protocol/message.rs b/client/network/src/protocol/message.rs index 9b449d6d7d994..38deecfc8d0b1 100644 --- a/client/network/src/protocol/message.rs +++ b/client/network/src/protocol/message.rs @@ -143,9 +143,8 @@ pub mod generic { BlockResponse(BlockResponse), /// Block announce. BlockAnnounce(BlockAnnounce
), - /// Transactions. - Transactions, /// Consensus protocol message. + #[codec(index = 6)] Consensus(ConsensusMessage), /// Remote method call request. RemoteCallRequest(RemoteCallRequest), From 9ffd00aa5c3edbbbe9b93353a7b0cb7c01ae7c18 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 21 Sep 2022 10:42:31 +0300 Subject: [PATCH 09/11] Add array-bytes --- Cargo.lock | 1 + client/network/transactions/Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 7a897bc272f31..4425ea3043bc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8646,6 +8646,7 @@ dependencies = [ name = "sc-network-transactions" version = "0.10.0-dev" dependencies = [ + "array-bytes", "futures", "hex", "libp2p", diff --git a/client/network/transactions/Cargo.toml b/client/network/transactions/Cargo.toml index b9a1627c12dc0..5578bb2c7191e 100644 --- a/client/network/transactions/Cargo.toml +++ b/client/network/transactions/Cargo.toml @@ -14,6 +14,7 @@ readme = "README.md" targets = ["x86_64-unknown-linux-gnu"] [dependencies] +array-bytes = "4.1" codec = { package = "parity-scale-codec", version = "3.0.0", features = ["derive"] } futures = "0.3.21" hex = "0.4.0" From 3d5df3f8f3fbd081dbcb01a0ff325ec0ce50b601 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 21 Sep 2022 11:43:49 +0300 Subject: [PATCH 10/11] trigger CI From dce383820b88af43b9912b5940ff61dbcbc29c64 Mon Sep 17 00:00:00 2001 From: Aaro Altonen Date: Wed, 21 Sep 2022 14:05:26 +0300 Subject: [PATCH 11/11] Add comment about codec index --- client/network/src/protocol/message.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/network/src/protocol/message.rs b/client/network/src/protocol/message.rs index 38deecfc8d0b1..3e1281753b82c 100644 --- a/client/network/src/protocol/message.rs +++ b/client/network/src/protocol/message.rs @@ -144,6 +144,8 @@ pub mod generic { /// Block announce. BlockAnnounce(BlockAnnounce
), /// Consensus protocol message. + // NOTE: index is incremented by 1 due to transaction-related + // message that was removed #[codec(index = 6)] Consensus(ConsensusMessage), /// Remote method call request.