diff --git a/benches/benches/benchmarks/overall.rs b/benches/benches/benchmarks/overall.rs index 103cab0893..17b8c91ce8 100644 --- a/benches/benches/benchmarks/overall.rs +++ b/benches/benches/benchmarks/overall.rs @@ -5,7 +5,7 @@ use ckb_chain::{start_chain_services, ChainController}; use ckb_chain_spec::consensus::{ConsensusBuilder, ProposalWindow}; use ckb_dao_utils::genesis_dao_data; use ckb_jsonrpc_types::JsonBytes; -use ckb_network::{Flags, NetworkController, NetworkService, NetworkState}; +use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState}; use ckb_shared::{Shared, SharedBuilder}; use ckb_store::ChainStore; use ckb_types::{ @@ -77,6 +77,7 @@ fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::COMPATIBILITY, ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/chain/src/tests/util.rs b/chain/src/tests/util.rs index f29cd97ad7..2d1cde059f 100644 --- a/chain/src/tests/util.rs +++ b/chain/src/tests/util.rs @@ -4,7 +4,7 @@ use ckb_app_config::{BlockAssemblerConfig, NetworkConfig}; use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder}; use ckb_dao_utils::genesis_dao_data; use ckb_jsonrpc_types::ScriptHashType; -use ckb_network::{Flags, NetworkController, NetworkService, NetworkState}; +use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState}; use ckb_shared::{Shared, SharedBuilder}; use ckb_store::ChainStore; use ckb_test_chain_utils::{always_success_cell, create_always_success_tx}; @@ -123,6 +123,7 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::COMPATIBILITY, ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/network/src/network.rs b/network/src/network.rs index d51fe87916..bc19edf9ac 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -105,15 +105,14 @@ impl NetworkState { .iter() .chain(config.public_addresses.iter()) .cloned() - .filter_map(|mut addr| { - multiaddr_to_socketaddr(&addr) - .filter(|addr| is_reachable(addr.ip())) - .and({ - if extract_peer_id(&addr).is_none() { - addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))); - } - Some(addr) - }) + .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) { + Some(socket_addr) if !is_reachable(socket_addr.ip()) => None, + _ => { + if extract_peer_id(&addr).is_none() { + addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))); + } + Some(addr) + } }) .collect(); info!("Loading the peer store. This process may take a few seconds to complete."); @@ -158,15 +157,14 @@ impl NetworkState { .iter() .chain(config.public_addresses.iter()) .cloned() - .filter_map(|mut addr| { - multiaddr_to_socketaddr(&addr) - .filter(|addr| is_reachable(addr.ip())) - .and({ - if extract_peer_id(&addr).is_none() { - addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))); - } - Some(addr) - }) + .filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) { + Some(socket_addr) if !is_reachable(socket_addr.ip()) => None, + _ => { + if extract_peer_id(&addr).is_none() { + addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes()))); + } + Some(addr) + } }) .collect(); info!("Loading the peer store. This process may take a few seconds to complete."); @@ -831,6 +829,7 @@ impl NetworkService { required_protocol_ids: Vec, // name, version, flags identify_announce: (String, String, Flags), + transport_type: TransportType, ) -> Self { let config = &network_state.config; @@ -1017,7 +1016,7 @@ impl NetworkService { service_builder = service_builder.tcp_config(bind_fn); } } - TransportType::Ws => { + TransportType::Ws | TransportType::Wss => { // only bind once if matches!(init, BindType::Ws) { continue; @@ -1074,6 +1073,7 @@ impl NetworkService { Arc::clone(&network_state), p2p_service.control().to_owned().into(), Duration::from_secs(config.connect_outbound_interval_secs), + transport_type, ); bg_services.push(Box::pin(outbound_peer_service) as Pin>); }; @@ -1520,19 +1520,24 @@ pub(crate) async fn async_disconnect_with_message( control.disconnect(peer_index).await } +/// Transport type on ckb #[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)] -pub(crate) enum TransportType { +pub enum TransportType { + /// Tcp Tcp, + /// Ws Ws, + /// Wss only on wasm + Wss, } pub(crate) fn find_type(addr: &Multiaddr) -> TransportType { - if addr - .iter() - .any(|proto| matches!(proto, Protocol::Ws | Protocol::Wss)) - { - TransportType::Ws - } else { - TransportType::Tcp - } + let mut iter = addr.iter(); + + iter.find_map(|proto| match proto { + Protocol::Ws => Some(TransportType::Ws), + Protocol::Wss => Some(TransportType::Wss), + _ => None, + }) + .unwrap_or(TransportType::Tcp) } diff --git a/network/src/peer_registry.rs b/network/src/peer_registry.rs index dcc97d160f..c1719c99ad 100644 --- a/network/src/peer_registry.rs +++ b/network/src/peer_registry.rs @@ -1,6 +1,7 @@ //! Peer registry use crate::network_group::Group; use crate::peer_store::PeerStore; +use crate::Flags; use crate::{ errors::{Error, PeerError}, extract_peer_id, Peer, PeerId, SessionType, @@ -24,7 +25,7 @@ pub struct PeerRegistry { // Only whitelist peers or allow all peers. whitelist_only: bool, whitelist_peers: HashSet, - feeler_peers: HashSet, + feeler_peers: HashMap, } /// Global network connection status @@ -63,7 +64,7 @@ impl PeerRegistry { PeerRegistry { peers: HashMap::with_capacity_and_hasher(20, Default::default()), whitelist_peers: whitelist_peers.iter().filter_map(extract_peer_id).collect(), - feeler_peers: HashSet::default(), + feeler_peers: HashMap::default(), max_inbound, max_outbound, whitelist_only, @@ -191,10 +192,26 @@ impl PeerRegistry { /// Add feeler dail task pub fn add_feeler(&mut self, addr: &Multiaddr) { if let Some(peer_id) = extract_peer_id(addr) { - self.feeler_peers.insert(peer_id); + self.feeler_peers.insert(peer_id, Flags::COMPATIBILITY); } } + /// Identify change feeler flags + pub fn change_feeler_flags(&mut self, addr: &Multiaddr, flags: Flags) -> bool { + if let Some(peer_id) = extract_peer_id(addr) { + if let Some(i) = self.feeler_peers.get_mut(&peer_id) { + *i = flags; + return true; + } + } + false + } + + /// Get feeler session flags + pub fn feeler_flags(&self, addr: &Multiaddr) -> Option { + extract_peer_id(addr).and_then(|peer_id| self.feeler_peers.get(&peer_id).cloned()) + } + /// Remove feeler dail task on session disconnects or fails pub fn remove_feeler(&mut self, addr: &Multiaddr) { if let Some(peer_id) = extract_peer_id(addr) { @@ -205,7 +222,7 @@ impl PeerRegistry { /// Whether this session is feeler session pub fn is_feeler(&self, addr: &Multiaddr) -> bool { extract_peer_id(addr) - .map(|peer_id| self.feeler_peers.contains(&peer_id)) + .map(|peer_id| self.feeler_peers.contains_key(&peer_id)) .unwrap_or_default() } diff --git a/network/src/peer_store/addr_manager.rs b/network/src/peer_store/addr_manager.rs index 127b5e75a1..62e96ee169 100644 --- a/network/src/peer_store/addr_manager.rs +++ b/network/src/peer_store/addr_manager.rs @@ -1,17 +1,17 @@ //! Address manager -#[cfg(target_family = "wasm")] -use crate::network::{find_type, TransportType}; use crate::peer_store::types::AddrInfo; -use p2p::{multiaddr::Multiaddr, utils::multiaddr_to_socketaddr}; +use p2p::{ + multiaddr::{Multiaddr, Protocol}, + utils::multiaddr_to_socketaddr, +}; use rand::Rng; use std::collections::{HashMap, HashSet}; -use std::net::SocketAddr; /// Address manager #[derive(Default)] pub struct AddrManager { next_id: u64, - addr_to_id: HashMap, + addr_to_id: HashMap, id_to_info: HashMap, random_ids: Vec, } @@ -19,27 +19,25 @@ pub struct AddrManager { impl AddrManager { /// Add an address information to address manager pub fn add(&mut self, mut addr_info: AddrInfo) { - if let Some(key) = multiaddr_to_socketaddr(&addr_info.addr) { - if let Some(&id) = self.addr_to_id.get(&key) { - let (exist_last_connected_at_ms, random_id_pos) = { - let info = self.id_to_info.get(&id).expect("must exists"); - (info.last_connected_at_ms, info.random_id_pos) - }; - // Get time earlier than record time, return directly - if addr_info.last_connected_at_ms >= exist_last_connected_at_ms { - addr_info.random_id_pos = random_id_pos; - self.id_to_info.insert(id, addr_info); - } - return; + if let Some(&id) = self.addr_to_id.get(&addr_info.addr) { + let (exist_last_connected_at_ms, random_id_pos) = { + let info = self.id_to_info.get(&id).expect("must exists"); + (info.last_connected_at_ms, info.random_id_pos) + }; + // Get time earlier than record time, return directly + if addr_info.last_connected_at_ms >= exist_last_connected_at_ms { + addr_info.random_id_pos = random_id_pos; + self.id_to_info.insert(id, addr_info); } - - let id = self.next_id; - self.addr_to_id.insert(key, id); - addr_info.random_id_pos = self.random_ids.len(); - self.id_to_info.insert(id, addr_info); - self.random_ids.push(id); - self.next_id += 1; + return; } + + let id = self.next_id; + self.addr_to_id.insert(addr_info.addr.clone(), id); + addr_info.random_id_pos = self.random_ids.len(); + self.id_to_info.insert(id, addr_info); + self.random_ids.push(id); + self.next_id += 1; } /// Randomly return addrs that worth to try or connect. @@ -51,33 +49,36 @@ impl AddrManager { let mut addr_infos = Vec::with_capacity(count); let mut rng = rand::thread_rng(); let now_ms = ckb_systemtime::unix_time_as_millis(); - #[cfg(target_family = "wasm")] - let filter = |peer_addr: &AddrInfo| { - filter(peer_addr) && matches!(find_type(&peer_addr.addr), TransportType::Ws) - }; for i in 0..self.random_ids.len() { // reuse the for loop to shuffle random ids // https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle let j = rng.gen_range(i..self.random_ids.len()); self.swap_random_id(j, i); let addr_info: AddrInfo = self.id_to_info[&self.random_ids[i]].to_owned(); - if let Some(socket_addr) = multiaddr_to_socketaddr(&addr_info.addr) { - let ip = socket_addr.ip(); - let is_unique_ip = !duplicate_ips.contains(&ip); - // A trick to make our tests work - // TODO remove this after fix the network tests. - let is_test_ip = ip.is_unspecified() || ip.is_loopback(); - if (is_test_ip || is_unique_ip) - && addr_info.is_connectable(now_ms) - && filter(&addr_info) - { - duplicate_ips.insert(ip); - addr_infos.push(addr_info); + match multiaddr_to_socketaddr(&addr_info.addr) { + Some(socket_addr) => { + let ip = socket_addr.ip(); + let is_unique_ip = !duplicate_ips.contains(&ip); + // A trick to make our tests work + // TODO remove this after fix the network tests. + let is_test_ip = ip.is_unspecified() || ip.is_loopback(); + if (is_test_ip || is_unique_ip) + && addr_info.is_connectable(now_ms) + && filter(&addr_info) + { + duplicate_ips.insert(ip); + addr_infos.push(addr_info); + } } - if addr_infos.len() == count { - break; + None => { + if addr_info.is_connectable(now_ms) && filter(&addr_info) { + addr_infos.push(addr_info); + } } } + if addr_infos.len() == count { + break; + } } addr_infos } @@ -94,34 +95,65 @@ impl AddrManager { /// Remove an address by ip and port pub fn remove(&mut self, addr: &Multiaddr) -> Option { - multiaddr_to_socketaddr(addr).and_then(|addr| { - self.addr_to_id.remove(&addr).and_then(|id| { - let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos; - // swap with last index, then remove the last index - self.swap_random_id(random_id_pos, self.random_ids.len() - 1); - self.random_ids.pop(); - self.id_to_info.remove(&id) + let base_addr = addr + .iter() + .filter_map(|p| { + if matches!( + p, + Protocol::Ws | Protocol::Wss | Protocol::Memory(_) | Protocol::Tls(_) + ) { + None + } else { + Some(p) + } }) + .collect(); + self.addr_to_id.remove(&base_addr).and_then(|id| { + let random_id_pos = self.id_to_info.get(&id).expect("exists").random_id_pos; + // swap with last index, then remove the last index + self.swap_random_id(random_id_pos, self.random_ids.len() - 1); + self.random_ids.pop(); + self.id_to_info.remove(&id) }) } /// Get an address information by ip and port pub fn get(&self, addr: &Multiaddr) -> Option<&AddrInfo> { - multiaddr_to_socketaddr(addr).and_then(|addr| { - self.addr_to_id - .get(&addr) - .and_then(|id| self.id_to_info.get(id)) - }) + let base_addr = addr + .iter() + .filter_map(|p| { + if matches!( + p, + Protocol::Ws | Protocol::Wss | Protocol::Memory(_) | Protocol::Tls(_) + ) { + None + } else { + Some(p) + } + }) + .collect(); + self.addr_to_id + .get(&base_addr) + .and_then(|id| self.id_to_info.get(id)) } /// Get a mutable address information by ip and port pub fn get_mut(&mut self, addr: &Multiaddr) -> Option<&mut AddrInfo> { - if let Some(addr) = multiaddr_to_socketaddr(addr) { - if let Some(id) = self.addr_to_id.get(&addr) { - self.id_to_info.get_mut(id) - } else { - None - } + let base_addr = addr + .iter() + .filter_map(|p| { + if matches!( + p, + Protocol::Ws | Protocol::Wss | Protocol::Memory(_) | Protocol::Tls(_) + ) { + None + } else { + Some(p) + } + }) + .collect(); + if let Some(id) = self.addr_to_id.get(&base_addr) { + self.id_to_info.get_mut(id) } else { None } diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index 0dd50d6e1e..a8078b8877 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -1,4 +1,3 @@ -use crate::network::{find_type, TransportType}; use crate::{ errors::{PeerStoreError, Result}, extract_peer_id, multiaddr_to_socketaddr, @@ -65,10 +64,6 @@ impl PeerStore { if self.ban_list.is_addr_banned(&addr) { return Ok(()); } - #[cfg(target_family = "wasm")] - if !matches!(find_type(&addr), TransportType::Ws) { - return Ok(()); - } self.check_purge()?; let score = self.score_config.default_score; self.addr_manager @@ -180,12 +175,6 @@ impl PeerStore { && required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags)) }; - // Any protocol expect websocket - #[cfg(not(target_family = "wasm"))] - let filter = |peer_addr: &AddrInfo| { - filter(peer_addr) && !matches!(find_type(&peer_addr.addr), TransportType::Ws) - }; - // get addrs that can attempt. self.addr_manager.fetch_random(count, filter) } diff --git a/network/src/peer_store/types.rs b/network/src/peer_store/types.rs index 58712c7bd0..2db34c306b 100644 --- a/network/src/peer_store/types.rs +++ b/network/src/peer_store/types.rs @@ -62,7 +62,20 @@ impl AddrInfo { /// Init pub fn new(addr: Multiaddr, last_connected_at_ms: u64, score: Score, flags: u64) -> Self { AddrInfo { - addr, + // only store tcp protocol + addr: addr + .iter() + .filter_map(|p| { + if matches!( + p, + Protocol::Ws | Protocol::Wss | Protocol::Memory(_) | Protocol::Tls(_) + ) { + None + } else { + Some(p) + } + }) + .collect(), score, last_connected_at_ms, last_tried_at_ms: 0, diff --git a/network/src/protocols/discovery/mod.rs b/network/src/protocols/discovery/mod.rs index 573fad175a..b64ca17141 100644 --- a/network/src/protocols/discovery/mod.rs +++ b/network/src/protocols/discovery/mod.rs @@ -325,10 +325,10 @@ impl AddressManager for DiscoveryAddressManager { fn is_valid_addr(&self, addr: &Multiaddr) -> bool { if !self.discovery_local_address { - let local_or_invalid = multiaddr_to_socketaddr(addr) - .map(|socket_addr| !is_reachable(socket_addr.ip())) - .unwrap_or(true); - !local_or_invalid + match multiaddr_to_socketaddr(addr) { + Some(socket_addr) => is_reachable(socket_addr.ip()), + None => true, + } } else { true } diff --git a/network/src/protocols/feeler.rs b/network/src/protocols/feeler.rs index 95d68c1bdc..6fe688d9d7 100644 --- a/network/src/protocols/feeler.rs +++ b/network/src/protocols/feeler.rs @@ -34,11 +34,8 @@ impl ServiceProtocol for Feeler { .remove(&session.address); } else if context.session.ty.is_outbound() { let flags = self.network_state.with_peer_registry(|reg| { - if let Some(p) = reg.get_peer(session.id) { - p.identify_info - .as_ref() - .map(|i| i.flags) - .unwrap_or(Flags::COMPATIBILITY) + if let Some(p) = reg.feeler_flags(&session.address) { + p } else { Flags::COMPATIBILITY } diff --git a/network/src/protocols/identify/mod.rs b/network/src/protocols/identify/mod.rs index e534dda78f..84f2f3ffd7 100644 --- a/network/src/protocols/identify/mod.rs +++ b/network/src/protocols/identify/mod.rs @@ -139,10 +139,9 @@ impl IdentifyProtocol { let global_ip_only = self.global_ip_only; let reachable_addrs = listens .into_iter() - .filter(|addr| { - multiaddr_to_socketaddr(addr) - .map(|socket_addr| !global_ip_only || is_reachable(socket_addr.ip())) - .unwrap_or(false) + .filter(|addr| match multiaddr_to_socketaddr(addr) { + Some(socket_addr) => !global_ip_only || is_reachable(socket_addr.ip()), + None => true, }) .collect::>(); self.callback @@ -463,10 +462,9 @@ impl Callback for IdentifyCallback { }); } - if self - .network_state - .with_peer_registry(|reg| reg.is_feeler(&context.session.address)) - { + if self.network_state.with_peer_registry_mut(|reg| { + reg.change_feeler_flags(&context.session.address, flags) + }) { let _ = context .open_protocols( context.session.id, diff --git a/network/src/services/outbound_peer.rs b/network/src/services/outbound_peer.rs index 5a59d76684..0836bbabbb 100644 --- a/network/src/services/outbound_peer.rs +++ b/network/src/services/outbound_peer.rs @@ -1,4 +1,5 @@ use crate::{ + network::TransportType, peer_store::{types::AddrInfo, PeerStore}, NetworkState, }; @@ -6,7 +7,10 @@ use ckb_logger::trace; use ckb_systemtime::unix_time_as_millis; use futures::{Future, StreamExt}; use p2p::runtime::{Interval, MissedTickBehavior}; -use p2p::{multiaddr::MultiAddr, service::ServiceControl}; +use p2p::{ + multiaddr::{MultiAddr, Protocol}, + service::ServiceControl, +}; use rand::prelude::IteratorRandom; use std::{ pin::Pin, @@ -27,6 +31,8 @@ pub struct OutboundPeerService { interval: Option, try_connect_interval: Duration, try_identify_count: u8, + transport_type: TransportType, + update_outbound_connected_count: u8, } impl OutboundPeerService { @@ -34,6 +40,7 @@ impl OutboundPeerService { network_state: Arc, p2p_control: ServiceControl, try_connect_interval: Duration, + transport_type: TransportType, ) -> Self { OutboundPeerService { network_state, @@ -41,6 +48,8 @@ impl OutboundPeerService { interval: None, try_connect_interval, try_identify_count: 0, + update_outbound_connected_count: 0, + transport_type, } } @@ -63,8 +72,15 @@ impl OutboundPeerService { attempt_peers, ); - for addr in attempt_peers.into_iter().map(|info| info.addr) { - self.network_state.dial_feeler(&self.p2p_control, addr); + for mut addr in attempt_peers.into_iter().map(|info| info.addr) { + self.network_state.dial_feeler(&self.p2p_control, { + match &self.transport_type { + TransportType::Tcp => (), + TransportType::Ws => addr.push(Protocol::Ws), + TransportType::Wss => addr.push(Protocol::Wss), + } + addr + }); } } @@ -132,14 +148,28 @@ impl OutboundPeerService { Box::new(attempt_peers.into_iter().map(|info| info.addr)) }; - for addr in peers { - self.network_state.dial_identify(&self.p2p_control, addr); + for mut addr in peers { + self.network_state.dial_identify(&self.p2p_control, { + match &self.transport_type { + TransportType::Tcp => (), + TransportType::Ws => addr.push(Protocol::Ws), + TransportType::Wss => addr.push(Protocol::Wss), + } + addr + }); } } fn try_dial_whitelist(&self) { - for addr in self.network_state.config.whitelist_peers() { - self.network_state.dial_identify(&self.p2p_control, addr); + for mut addr in self.network_state.config.whitelist_peers() { + self.network_state.dial_identify(&self.p2p_control, { + match &self.transport_type { + TransportType::Tcp => (), + TransportType::Ws => addr.push(Protocol::Ws), + TransportType::Wss => addr.push(Protocol::Wss), + } + addr + }); } } @@ -147,6 +177,33 @@ impl OutboundPeerService { self.network_state .try_dial_observed_addrs(&self.p2p_control); } + + fn update_outbound_connected_ms(&mut self) { + if self.update_outbound_connected_count > 10 { + let connected_outbounds: Vec = + self.network_state.with_peer_registry(|re| { + re.peers() + .values() + .filter_map(|p| { + if p.is_outbound() { + Some(p.connected_addr.clone()) + } else { + None + } + }) + .collect() + }); + + self.network_state.with_peer_store_mut(|p| { + for addr in connected_outbounds { + p.update_outbound_addr_last_connected_ms(addr) + } + }); + self.update_outbound_connected_count = 0; + } else { + self.update_outbound_connected_count += 1; + } + } } impl Future for OutboundPeerService { @@ -178,6 +235,8 @@ impl Future for OutboundPeerService { self.try_dial_peers(); // try dial observed addrs self.try_dial_observed(); + // Keep connected nodes up to date in the peer store + self.update_outbound_connected_ms(); } Poll::Pending } diff --git a/network/src/tests/mod.rs b/network/src/tests/mod.rs index dde66545f0..d40867f40f 100644 --- a/network/src/tests/mod.rs +++ b/network/src/tests/mod.rs @@ -20,7 +20,7 @@ fn random_addr_v6() -> crate::multiaddr::Multiaddr { multi_addr.push(crate::multiaddr::Protocol::Tcp(43)); multi_addr.push(crate::multiaddr::Protocol::P2P( - crate::PeerId::random().to_base58().into_bytes().into(), + crate::PeerId::random().into_bytes().into(), )); multi_addr } diff --git a/network/src/tests/peer_store.rs b/network/src/tests/peer_store.rs index ee713f5a7f..85881b278e 100644 --- a/network/src/tests/peer_store.rs +++ b/network/src/tests/peer_store.rs @@ -588,3 +588,36 @@ fn test_addr_unique() { assert_eq!(peer_store.addr_manager().addrs_iter().count(), 2); } + +#[test] +fn test_only_tcp_store() { + let mut peer_store = PeerStore::default(); + let mut addr = random_addr(); + addr.push(p2p::multiaddr::Protocol::Ws); + peer_store + .add_addr(addr.clone(), Flags::COMPATIBILITY) + .unwrap(); + assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1); + assert_eq!(peer_store.fetch_addrs_to_feeler(1)[0].addr, { + addr.pop(); + addr + }); +} + +#[test] +fn test_support_dns_store() { + let mut peer_store = PeerStore::default(); + let addr: Multiaddr = format!( + "/dns4/www.abc.com/tcp/{}/p2p/{}", + rand::random::(), + crate::PeerId::random().to_base58() + ) + .parse() + .unwrap(); + + peer_store + .add_addr(addr.clone(), Flags::COMPATIBILITY) + .unwrap(); + assert_eq!(peer_store.fetch_addrs_to_feeler(2).len(), 1); + assert_eq!(peer_store.fetch_addrs_to_feeler(1)[0].addr, addr); +} diff --git a/rpc/src/tests/setup.rs b/rpc/src/tests/setup.rs index 888186fe57..4afea1fa6d 100644 --- a/rpc/src/tests/setup.rs +++ b/rpc/src/tests/setup.rs @@ -9,7 +9,7 @@ use ckb_chain::start_chain_services; use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder}; use ckb_chain_spec::versionbits::{ActiveMode, Deployment, DeploymentPos}; use ckb_dao_utils::genesis_dao_data; -use ckb_network::{Flags, NetworkService, NetworkState}; +use ckb_network::{network::TransportType, Flags, NetworkService, NetworkState}; use ckb_network_alert::alert_relayer::AlertRelayer; use ckb_notify::NotifyService; use ckb_shared::SharedBuilder; @@ -112,6 +112,7 @@ pub(crate) fn setup_rpc_test_suite(height: u64, consensus: Option) -> "0.1.0".to_string(), Flags::COMPATIBILITY, ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/sync/src/relayer/tests/helper.rs b/sync/src/relayer/tests/helper.rs index a11ceb1fe8..ed872ac550 100644 --- a/sync/src/relayer/tests/helper.rs +++ b/sync/src/relayer/tests/helper.rs @@ -5,9 +5,9 @@ use ckb_chain_spec::consensus::{build_genesis_epoch_ext, ConsensusBuilder}; use ckb_dao::DaoCalculator; use ckb_dao_utils::genesis_dao_data; use ckb_network::{ - async_trait, bytes::Bytes as P2pBytes, Behaviour, CKBProtocolContext, Error, Flags, - NetworkController, NetworkService, NetworkState, Peer, PeerIndex, ProtocolId, SupportProtocols, - TargetSession, + async_trait, bytes::Bytes as P2pBytes, network::TransportType, Behaviour, CKBProtocolContext, + Error, Flags, NetworkController, NetworkService, NetworkState, Peer, PeerIndex, ProtocolId, + SupportProtocols, TargetSession, }; use ckb_reward_calculator::RewardCalculator; use ckb_shared::{Shared, SharedBuilder, Snapshot}; @@ -127,6 +127,7 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::COMPATIBILITY, ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed") diff --git a/test/src/net.rs b/test/src/net.rs index b8f1375f7f..a1de238d0b 100644 --- a/test/src/net.rs +++ b/test/src/net.rs @@ -6,9 +6,9 @@ use ckb_chain_spec::consensus::Consensus; use ckb_channel::{self as channel, unbounded, Receiver, RecvTimeoutError, Sender}; use ckb_logger::info; use ckb_network::{ - async_trait, bytes::Bytes, extract_peer_id, CKBProtocol, CKBProtocolContext, - CKBProtocolHandler, Flags, NetworkController, NetworkService, NetworkState, PeerIndex, - ProtocolId, SupportProtocols, + async_trait, bytes::Bytes, extract_peer_id, network::TransportType, CKBProtocol, + CKBProtocolContext, CKBProtocolHandler, Flags, NetworkController, NetworkService, NetworkState, + PeerIndex, ProtocolId, SupportProtocols, }; use ckb_util::Mutex; use std::collections::HashMap; @@ -73,6 +73,7 @@ impl Net { "0.1.0".to_string(), Flags::COMPATIBILITY, ), + TransportType::Tcp, ) .start(&async_handle) .unwrap(); diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 87e87a2a2f..8bf7105b16 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -15,8 +15,8 @@ use ckb_light_client_protocol_server::LightClientProtocol; use ckb_logger::info; use ckb_logger::internal::warn; use ckb_network::{ - observe_listen_port_occupancy, CKBProtocol, Flags, NetworkController, NetworkService, - NetworkState, SupportProtocols, + network::TransportType, observe_listen_port_occupancy, CKBProtocol, Flags, NetworkController, + NetworkService, NetworkState, SupportProtocols, }; use ckb_network_alert::alert_relayer::AlertRelayer; use ckb_resource::Resource; @@ -384,6 +384,7 @@ impl Launcher { self.version.to_string(), flags, ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed"); diff --git a/util/light-client-protocol-server/src/tests/utils/chain.rs b/util/light-client-protocol-server/src/tests/utils/chain.rs index 03e37e704b..94777dfa67 100644 --- a/util/light-client-protocol-server/src/tests/utils/chain.rs +++ b/util/light-client-protocol-server/src/tests/utils/chain.rs @@ -8,7 +8,7 @@ use ckb_chain::{start_chain_services, ChainController}; use ckb_chain_spec::consensus::{build_genesis_epoch_ext, ConsensusBuilder}; use ckb_dao_utils::genesis_dao_data; use ckb_jsonrpc_types::ScriptHashType; -use ckb_network::{Flags, NetworkController, NetworkService, NetworkState}; +use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState}; use ckb_shared::{Shared, SharedBuilder}; use ckb_systemtime::unix_time_as_millis; use ckb_test_chain_utils::always_success_cell; @@ -240,6 +240,7 @@ fn dummy_network(shared: &Shared) -> NetworkController { "test".to_string(), Flags::all(), ), + TransportType::Tcp, ) .start(shared.async_handle()) .expect("Start network service failed")