diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f2823272..43be12f72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * fix: connect using DialPeer instead of DialAddress [#454] * fix: compilation error when used as a dependency [#470] * perf: use hash_hasher where the key is Cid [#467] +* chore: upgrade to libp2p 0.39.1, update most of the other deps with the notable exception of cid and multihash [#472] [#429]: https://github.com/rs-ipfs/rust-ipfs/pull/429 [#428]: https://github.com/rs-ipfs/rust-ipfs/pull/428 @@ -23,6 +24,7 @@ [#454]: https://github.com/rs-ipfs/rust-ipfs/pull/454 [#470]: https://github.com/rs-ipfs/rust-ipfs/pull/470 [#467]: https://github.com/rs-ipfs/rust-ipfs/pull/467 +[#472]: https://github.com/rs-ipfs/rust-ipfs/pull/472 # 0.2.1 diff --git a/Cargo.toml b/Cargo.toml index 7e7513e38..d492b456b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ test_js_interop = [] anyhow = "1.0" async-stream = { default-features = false, version = "0.3" } async-trait = { default-features = false, version = "0.1" } -base64 = { default-features = false, features = ["alloc"], version = "0.12" } +base64 = { default-features = false, features = ["alloc"], version = "0.13" } ipfs-bitswap = { version = "0.1", path = "bitswap" } byteorder = { default-features = false, version = "1.3" } bytes = { default-features = false, version = "1" } @@ -31,10 +31,10 @@ either = { default-features = false, version = "1.5" } futures = { default-features = false, version = "0.3.9", features = ["alloc", "std"] } hash_hasher = "2.0.3" ipfs-unixfs = { version = "0.2", path = "unixfs" } -libp2p = { default-features = false, features = ["floodsub", "identify", "kad", "tcp-tokio", "mplex", "noise", "ping", "yamux", "dns"], version = "0.34" } -multibase = { default-features = false, version = "0.8" } +libp2p = { default-features = false, features = ["floodsub", "identify", "kad", "tcp-tokio", "mplex", "noise", "ping", "yamux", "dns-tokio"], version = "0.39" } +multibase = { default-features = false, version = "0.9" } multihash = { default-features = false, version = "0.11" } -prost = { default-features = false, version = "0.7" } +prost = { default-features = false, version = "0.8" } serde = { default-features = false, features = ["derive"], version = "1.0" } serde_json = { default-features = false, features = ["std"], version = "1.0" } thiserror = { default-features = false, version = "1.0" } @@ -49,7 +49,7 @@ sled = "0.34" once_cell = "1.5.2" [build-dependencies] -prost-build = { default-features = false, version = "0.7" } +prost-build = { default-features = false, version = "0.8" } [dev-dependencies] criterion = { default-features = false, version = "0.3" } diff --git a/bitswap/Cargo.toml b/bitswap/Cargo.toml index 3fcbb3302..e171aefdc 100644 --- a/bitswap/Cargo.toml +++ b/bitswap/Cargo.toml @@ -8,17 +8,17 @@ license = "MIT OR Apache-2.0" repository = "https://github.com/rs-ipfs/rust-ipfs" [build-dependencies] -prost-build = { default-features = false, version = "0.7" } +prost-build = { default-features = false, version = "0.8" } [dependencies] cid = { default-features = false, version = "0.5" } fnv = { default-features = false, version = "1.0" } futures = { default-features = false, version = "0.3" } hash_hasher = "2.0.3" -libp2p-core = { default-features = false, version = "0.27" } -libp2p-swarm = { default-features = false, version = "0.27" } +libp2p-core = { default-features = false, version = "0.29" } +libp2p-swarm = { default-features = false, version = "0.30" } multihash = { default-features = false, version = "0.11" } -prost = { default-features = false, version = "0.7" } +prost = { default-features = false, version = "0.8" } thiserror = { default-features = false, version = "1.0" } tokio = { default-features = false, version = "1", features = ["rt"] } tracing = { default-features = false, version = "0.1" } diff --git a/bitswap/src/error.rs b/bitswap/src/error.rs index f2a26874e..e66160b1f 100644 --- a/bitswap/src/error.rs +++ b/bitswap/src/error.rs @@ -3,7 +3,7 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum BitswapError { #[error("Error while reading from socket: {0}")] - ReadError(#[from] libp2p_core::upgrade::ReadOneError), + ReadError(#[from] std::io::Error), #[error("Error while decoding bitswap message: {0}")] ProtobufError(#[from] prost::DecodeError), #[error("Error while parsing cid: {0}")] diff --git a/bitswap/src/protocol.rs b/bitswap/src/protocol.rs index 093413fa8..91e99df07 100644 --- a/bitswap/src/protocol.rs +++ b/bitswap/src/protocol.rs @@ -8,7 +8,10 @@ use crate::ledger::Message; use core::future::Future; use core::iter; use core::pin::Pin; -use futures::io::{AsyncRead, AsyncWrite}; +use futures::{ + io::{AsyncRead, AsyncWrite}, + AsyncWriteExt, +}; use libp2p_core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use std::io; @@ -42,7 +45,7 @@ where #[inline] fn upgrade_inbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future { Box::pin(async move { - let packet = upgrade::read_one(&mut socket, MAX_BUF_SIZE).await?; + let packet = upgrade::read_length_prefixed(&mut socket, MAX_BUF_SIZE).await?; let message = Message::from_bytes(&packet)?; Ok(message) }) @@ -71,7 +74,8 @@ where fn upgrade_outbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future { Box::pin(async move { let bytes = self.to_bytes(); - upgrade::write_one(&mut socket, bytes).await + upgrade::write_length_prefixed(&mut socket, bytes).await?; + socket.close().await }) } } diff --git a/http/Cargo.toml b/http/Cargo.toml index 087fbafc2..ab4ff1d63 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -6,7 +6,7 @@ name = "ipfs-http" version = "0.1.0" [build-dependencies] -prost-build = { default-features = false, version = "0.7" } +prost-build = { default-features = false, version = "0.8" } vergen = { default-features = false, version = "3.1" } [dependencies] @@ -19,12 +19,12 @@ humantime = { default-features = false, version = "2.0" } ipfs = { path = "../" } mime = { default-features = false, version = "0.3" } mpart-async = { default-features = false, version = "0.5" } -multibase = { default-features = false, version = "0.8" } +multibase = { default-features = false, features = ["std"], version = "0.9" } multihash = { default-features = false, version = "0.11" } # openssl is required for rsa keygen but not used by the rust-ipfs or its dependencies openssl = { default-features = false, version = "0.10" } percent-encoding = { default-features = false, version = "2.1" } -prost = { default-features = false, version = "0.7" } +prost = { default-features = false, version = "0.8" } serde = { default-features = false, features = ["derive"], version = "1.0" } serde_json = { default-features = false, version = "1.0" } structopt = { default-features = false, version = "0.3" } diff --git a/src/lib.rs b/src/lib.rs index 5b7d85c76..d9991263b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1358,7 +1358,7 @@ impl Future for IpfsFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll { - use libp2p::{swarm::SwarmEvent, Swarm}; + use libp2p::swarm::SwarmEvent; // begin by polling the swarm so that initially it'll first have chance to bind listeners // and such. @@ -1368,7 +1368,8 @@ impl Future for IpfsFuture { loop { loop { let inner = { - let next = self.swarm.next_event(); + use futures::StreamExt; + let next = self.swarm.select_next_some(); futures::pin_mut!(next); match next.poll(ctx) { Poll::Ready(inner) => inner, @@ -1381,8 +1382,8 @@ impl Future for IpfsFuture { // off the events from Ipfs and ... this looping goes on for a while. done = false; match inner { - SwarmEvent::NewListenAddr(addr) => { - self.complete_listening_address_adding(addr); + SwarmEvent::NewListenAddr { address, .. } => { + self.complete_listening_address_adding(address); } _ => trace!("{:?}", inner), } @@ -1401,24 +1402,22 @@ impl Future for IpfsFuture { match inner { IpfsEvent::Connect(target, ret) => { - ret.send(self.swarm.connect(target)).ok(); + ret.send(self.swarm.behaviour_mut().connect(target)).ok(); } IpfsEvent::Addresses(ret) => { - let addrs = self.swarm.addrs(); + let addrs = self.swarm.behaviour_mut().addrs(); ret.send(Ok(addrs)).ok(); } IpfsEvent::Listeners(ret) => { - let listeners = Swarm::listeners(&self.swarm) - .cloned() - .collect::>(); + let listeners = self.swarm.listeners().cloned().collect::>(); ret.send(Ok(listeners)).ok(); } IpfsEvent::Connections(ret) => { - let connections = self.swarm.connections(); + let connections = self.swarm.behaviour_mut().connections(); ret.send(Ok(connections.collect())).ok(); } IpfsEvent::Disconnect(addr, ret) => { - if let Some(disconnector) = self.swarm.disconnect(addr) { + if let Some(disconnector) = self.swarm.behaviour_mut().disconnect(addr) { disconnector.disconnect(&mut self.swarm); } ret.send(Ok(())).ok(); @@ -1426,48 +1425,49 @@ impl Future for IpfsFuture { IpfsEvent::GetAddresses(ret) => { // perhaps this could be moved under `IpfsEvent` or free functions? let mut addresses = Vec::new(); - addresses.extend(Swarm::listeners(&self.swarm).cloned()); - addresses.extend( - Swarm::external_addresses(&self.swarm).map(|ar| ar.addr.clone()), - ); + addresses.extend(self.swarm.listeners().map(|a| a.to_owned())); + addresses + .extend(self.swarm.external_addresses().map(|ar| ar.addr.to_owned())); // ignore error, perhaps caller went away already let _ = ret.send(addresses); } IpfsEvent::PubsubSubscribe(topic, ret) => { - let _ = ret.send(self.swarm.pubsub().subscribe(topic)); + let _ = ret.send(self.swarm.behaviour_mut().pubsub().subscribe(topic)); } IpfsEvent::PubsubUnsubscribe(topic, ret) => { - let _ = ret.send(self.swarm.pubsub().unsubscribe(topic)); + let _ = ret.send(self.swarm.behaviour_mut().pubsub().unsubscribe(topic)); } IpfsEvent::PubsubPublish(topic, data, ret) => { - self.swarm.pubsub().publish(topic, data); + self.swarm.behaviour_mut().pubsub().publish(topic, data); let _ = ret.send(()); } IpfsEvent::PubsubPeers(Some(topic), ret) => { let topic = libp2p::floodsub::Topic::new(topic); - let _ = ret.send(self.swarm.pubsub().subscribed_peers(&topic)); + let _ = + ret.send(self.swarm.behaviour_mut().pubsub().subscribed_peers(&topic)); } IpfsEvent::PubsubPeers(None, ret) => { - let _ = ret.send(self.swarm.pubsub().known_peers()); + let _ = ret.send(self.swarm.behaviour_mut().pubsub().known_peers()); } IpfsEvent::PubsubSubscribed(ret) => { - let _ = ret.send(self.swarm.pubsub().subscribed_topics()); + let _ = ret.send(self.swarm.behaviour_mut().pubsub().subscribed_topics()); } IpfsEvent::WantList(peer, ret) => { let list = if let Some(peer) = peer { self.swarm + .behaviour_mut() .bitswap() .peer_wantlist(&peer) .unwrap_or_default() } else { - self.swarm.bitswap().local_wantlist() + self.swarm.behaviour_mut().bitswap().local_wantlist() }; let _ = ret.send(list); } IpfsEvent::BitswapStats(ret) => { - let stats = self.swarm.bitswap().stats(); - let peers = self.swarm.bitswap().peers(); - let wantlist = self.swarm.bitswap().local_wantlist(); + let stats = self.swarm.behaviour_mut().bitswap().stats(); + let peers = self.swarm.behaviour_mut().bitswap().peers(); + let wantlist = self.swarm.behaviour_mut().bitswap().local_wantlist(); let _ = ret.send((stats, peers, wantlist).into()); } IpfsEvent::AddListeningAddress(addr, ret) => { @@ -1476,7 +1476,7 @@ impl Future for IpfsFuture { IpfsEvent::RemoveListeningAddress(addr, ret) => { let removed = if let Some((id, _)) = self.listening_addresses.remove(&addr) { - Swarm::remove_listener(&mut self.swarm, id).map_err(|_: ()| { + self.swarm.remove_listener(id).map_err(|_: ()| { format_err!( "Failed to remove previously added listening address: {}", addr @@ -1489,19 +1489,20 @@ impl Future for IpfsFuture { let _ = ret.send(removed); } IpfsEvent::Bootstrap(ret) => { - let future = self.swarm.bootstrap(); + let future = self.swarm.behaviour_mut().bootstrap(); let _ = ret.send(future); } IpfsEvent::AddPeer(peer_id, addr) => { - self.swarm.add_peer(peer_id, addr); + self.swarm.behaviour_mut().add_peer(peer_id, addr); } IpfsEvent::GetClosestPeers(peer_id, ret) => { - let future = self.swarm.get_closest_peers(peer_id); + let future = self.swarm.behaviour_mut().get_closest_peers(peer_id); let _ = ret.send(future); } IpfsEvent::GetBitswapPeers(ret) => { let peers = self .swarm + .behaviour_mut() .bitswap() .connected_peers .keys() @@ -1510,52 +1511,55 @@ impl Future for IpfsFuture { let _ = ret.send(peers); } IpfsEvent::FindPeer(peer_id, local_only, ret) => { - let swarm_addrs = self.swarm.swarm.connections_to(&peer_id); + let swarm_addrs = self.swarm.behaviour_mut().swarm.connections_to(&peer_id); let locally_known_addrs = if !swarm_addrs.is_empty() { swarm_addrs } else { - self.swarm.kademlia().addresses_of_peer(&peer_id) + self.swarm + .behaviour_mut() + .kademlia() + .addresses_of_peer(&peer_id) }; let addrs = if !locally_known_addrs.is_empty() || local_only { Either::Left(locally_known_addrs) } else { - Either::Right(self.swarm.get_closest_peers(peer_id)) + Either::Right(self.swarm.behaviour_mut().get_closest_peers(peer_id)) }; let _ = ret.send(addrs); } IpfsEvent::GetProviders(cid, ret) => { - let future = self.swarm.get_providers(cid); + let future = self.swarm.behaviour_mut().get_providers(cid); let _ = ret.send(future); } IpfsEvent::Provide(cid, ret) => { - let _ = ret.send(self.swarm.start_providing(cid)); + let _ = ret.send(self.swarm.behaviour_mut().start_providing(cid)); } IpfsEvent::DhtGet(key, quorum, ret) => { - let future = self.swarm.dht_get(key, quorum); + let future = self.swarm.behaviour_mut().dht_get(key, quorum); let _ = ret.send(future); } IpfsEvent::DhtPut(key, value, quorum, ret) => { - let future = self.swarm.dht_put(key, value, quorum); + let future = self.swarm.behaviour_mut().dht_put(key, value, quorum); let _ = ret.send(future); } IpfsEvent::GetBootstrappers(ret) => { - let list = self.swarm.get_bootstrappers(); + let list = self.swarm.behaviour_mut().get_bootstrappers(); let _ = ret.send(list); } IpfsEvent::AddBootstrapper(addr, ret) => { - let result = self.swarm.add_bootstrapper(addr); + let result = self.swarm.behaviour_mut().add_bootstrapper(addr); let _ = ret.send(result); } IpfsEvent::RemoveBootstrapper(addr, ret) => { - let result = self.swarm.remove_bootstrapper(addr); + let result = self.swarm.behaviour_mut().remove_bootstrapper(addr); let _ = ret.send(result); } IpfsEvent::ClearBootstrappers(ret) => { - let list = self.swarm.clear_bootstrappers(); + let list = self.swarm.behaviour_mut().clear_bootstrappers(); let _ = ret.send(list); } IpfsEvent::RestoreBootstrappers(ret) => { - let list = self.swarm.restore_bootstrappers(); + let list = self.swarm.behaviour_mut().restore_bootstrappers(); let _ = ret.send(list); } IpfsEvent::Exit => { @@ -1569,21 +1573,25 @@ impl Future for IpfsFuture { // wants this to be written with a `while let`. while let Poll::Ready(Some(evt)) = Pin::new(&mut self.repo_events).poll_next(ctx) { match evt { - RepoEvent::WantBlock(cid) => self.swarm.want_block(cid), - RepoEvent::UnwantBlock(cid) => self.swarm.bitswap().cancel_block(&cid), + RepoEvent::WantBlock(cid) => self.swarm.behaviour_mut().want_block(cid), + RepoEvent::UnwantBlock(cid) => { + self.swarm.behaviour_mut().bitswap().cancel_block(&cid) + } RepoEvent::NewBlock(cid, ret) => { // TODO: consider if cancel is applicable in cases where we provide the // associated Block ourselves - self.swarm.bitswap().cancel_block(&cid); + self.swarm.behaviour_mut().bitswap().cancel_block(&cid); // currently disabled; see https://github.com/rs-ipfs/rust-ipfs/pull/281#discussion_r465583345 // for details regarding the concerns about enabling this functionality as-is if false { - let _ = ret.send(self.swarm.start_providing(cid)); + let _ = ret.send(self.swarm.behaviour_mut().start_providing(cid)); } else { let _ = ret.send(Err(anyhow!("not actively providing blocks yet"))); } } - RepoEvent::RemovedBlock(cid) => self.swarm.stop_providing_block(&cid), + RepoEvent::RemovedBlock(cid) => { + self.swarm.behaviour_mut().stop_providing_block(&cid) + } } } diff --git a/src/p2p/addr.rs b/src/p2p/addr.rs index 9648c1fc1..b4368ffeb 100644 --- a/src/p2p/addr.rs +++ b/src/p2p/addr.rs @@ -215,6 +215,14 @@ pub(crate) fn could_be_bound_from_ephemeral( } } +// Checks if two instances of multiaddr are equal comparing as many protocol segments as possible +pub(crate) fn eq_greedy(addr0: &Multiaddr, addr1: &Multiaddr) -> bool { + if addr0.is_empty() != addr1.is_empty() { + return false; + } + addr0.iter().zip(addr1.iter()).all(|(a, b)| a == b) +} + #[cfg(test)] mod tests { use super::*; @@ -301,4 +309,40 @@ mod tests { &build_multiaddr!(Ip4([127, 0, 0, 1]), Tcp(44444u16)) )); } + + #[test] + fn greedy_multiaddr_comparison() { + assert!(eq_greedy(&Multiaddr::empty(), &Multiaddr::empty())); + assert!(eq_greedy( + &build_multiaddr!(Ip4([192, 168, 0, 1])), + &build_multiaddr!(Ip4([192, 168, 0, 1])) + )); + assert!(eq_greedy( + &build_multiaddr!(Ip4([192, 168, 0, 1]), Tcp(44444u16)), + &build_multiaddr!(Ip4([192, 168, 0, 1])) + )); + assert!(eq_greedy( + &build_multiaddr!(Ip4([192, 168, 0, 1])), + &build_multiaddr!(Ip4([192, 168, 0, 1]), Tcp(44444u16)) + )); + + // At least one protocol segment needs to be there + assert!(!eq_greedy( + &Multiaddr::empty(), + &build_multiaddr!(Ip4([192, 168, 0, 1])) + )); + assert!(!eq_greedy( + &build_multiaddr!(Ip4([192, 168, 0, 1])), + &Multiaddr::empty() + )); + + assert!(!eq_greedy( + &build_multiaddr!(Ip4([192, 168, 0, 1]), Tcp(44444u16)), + &build_multiaddr!(Ip4([192, 168, 0, 2])) + )); + assert!(!eq_greedy( + &build_multiaddr!(Ip4([192, 168, 0, 2])), + &build_multiaddr!(Ip4([192, 168, 0, 1]), Tcp(44444u16)) + )); + } } diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 616a2acc1..d3a813c9c 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -9,7 +9,7 @@ use anyhow::anyhow; use cid::Cid; use ipfs_bitswap::{Bitswap, BitswapEvent}; use libp2p::core::{Multiaddr, PeerId}; -use libp2p::identify::{Identify, IdentifyEvent}; +use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent}; use libp2p::kad::record::{store::MemoryStore, Key, Record}; use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, Quorum}; // use libp2p::mdns::{MdnsEvent, TokioMdns}; @@ -84,7 +84,10 @@ impl NetworkBehaviourEventProcess for Behaviour }; match event { - QueryResult { result, id, .. } => { + InboundRequestServed { request } => { + trace!("kad: inbound {:?} request handled", request); + } + OutboundQueryCompleted { result, id, .. } => { // make sure the query is exhausted if self.kademlia.query(&id).is_none() { match result { @@ -185,7 +188,7 @@ impl NetworkBehaviourEventProcess for Behaviour let key = multibase::encode(Base::Base32Lower, key); warn!("kad: timed out while trying to republish provider {}", key); } - GetRecord(Ok(GetRecordOk { records })) => { + GetRecord(Ok(GetRecordOk { records, .. })) => { if self.kademlia.query(&id).is_none() { let records = records.into_iter().map(|rec| rec.record).collect(); self.kad_subscriptions @@ -296,7 +299,9 @@ impl NetworkBehaviourEventProcess for Behaviour } RoutingUpdated { peer, + is_new_peer: _, addresses, + bucket_range: _, old_peer: _, } => { trace!("kad: routing updated; {}: {:?}", peer, addresses); @@ -445,9 +450,8 @@ impl Behaviour { let bitswap = Bitswap::default(); let ping = Ping::default(); let identify = Identify::new( - "/ipfs/0.1.0".into(), - "rust-ipfs".into(), - options.keypair.public(), + IdentifyConfig::new("/ipfs/0.1.0".into(), options.keypair.public()) + .with_agent_version("rust-ipfs".into()), ); let pubsub = Pubsub::new(options.peer_id); let mut swarm = SwarmApi::default(); diff --git a/src/p2p/pubsub.rs b/src/p2p/pubsub.rs index c5f181c3e..4aa6e4e43 100644 --- a/src/p2p/pubsub.rs +++ b/src/p2p/pubsub.rs @@ -300,12 +300,12 @@ impl NetworkBehaviour for Pubsub { self.floodsub.inject_dial_failure(peer_id) } - fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { - self.floodsub.inject_new_listen_addr(addr) + fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { + self.floodsub.inject_new_listen_addr(id, addr) } - fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { - self.floodsub.inject_expired_listen_addr(addr) + fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { + self.floodsub.inject_expired_listen_addr(id, addr) } fn inject_new_external_addr(&mut self, addr: &Multiaddr) { @@ -431,6 +431,15 @@ impl NetworkBehaviour for Pubsub { score, }); } + NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + } => { + return Poll::Ready(NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + }); + } } } } diff --git a/src/p2p/swarm.rs b/src/p2p/swarm.rs index 8c1c8f2e4..7a068db6f 100644 --- a/src/p2p/swarm.rs +++ b/src/p2p/swarm.rs @@ -1,4 +1,4 @@ -use crate::p2p::{MultiaddrWithPeerId, MultiaddrWithoutPeerId}; +use crate::p2p::{addr::eq_greedy, MultiaddrWithPeerId, MultiaddrWithoutPeerId}; use crate::subscription::{SubscriptionFuture, SubscriptionRegistry}; use core::task::{Context, Poll}; use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; @@ -174,7 +174,7 @@ impl NetworkBehaviour for SwarmApi { ) { // TODO: could be that the connection is not yet fully established at this point trace!("inject_connection_established {} {:?}", peer_id, cp); - let addr: MultiaddrWithoutPeerId = connection_point_addr(cp).to_owned().try_into().unwrap(); + let addr = connection_point_addr(cp); self.peers.insert(*peer_id); let connections = self.connected_peers.entry(*peer_id).or_default(); @@ -194,16 +194,15 @@ impl NetworkBehaviour for SwarmApi { match self.pending_connections.entry(*peer_id) { Entry::Occupied(mut oe) => { let addresses = oe.get_mut(); - let just_connected = addresses.iter().position(|x| x == address); + let just_connected = addresses.iter().position(|x| eq_greedy(x, address)); if let Some(just_connected) = just_connected { addresses.swap_remove(just_connected); if addresses.is_empty() { oe.remove(); } - let addr = MultiaddrWithoutPeerId::try_from(address.clone()) - .expect("dialed address did not contain peerid in libp2p 0.34") - .with(*peer_id); + let addr = MultiaddrWithPeerId::try_from(address.clone()) + .expect("dialed address contains peerid in libp2p 0.38"); self.connect_registry .finish_subscription(addr.into(), Ok(())); @@ -258,7 +257,7 @@ impl NetworkBehaviour for SwarmApi { cp: &ConnectedPoint, ) { trace!("inject_connection_closed {} {:?}", peer_id, cp); - let closed_addr = connection_point_addr(cp).to_owned().try_into().unwrap(); + let closed_addr = connection_point_addr(cp); match self.connected_peers.entry(*peer_id) { Entry::Occupied(mut oe) => { @@ -383,13 +382,12 @@ impl NetworkBehaviour for SwarmApi { match self.pending_connections.entry(*peer_id) { Entry::Occupied(mut oe) => { let addresses = oe.get_mut(); - let pos = addresses.iter().position(|a| a == addr); + let pos = addresses.iter().position(|a| eq_greedy(a, addr)); if let Some(pos) = pos { addresses.swap_remove(pos); - let addr = MultiaddrWithoutPeerId::try_from(addr.clone()) - .expect("multiaddr didn't contain peer id in libp2p 0.34") - .with(*peer_id); + let addr = MultiaddrWithPeerId::try_from(addr.clone()) + .expect("dialed address contains peerid in libp2p 0.38"); self.connect_registry .finish_subscription(addr.into(), Err(error.to_string())); } @@ -416,10 +414,15 @@ impl NetworkBehaviour for SwarmApi { } } -fn connection_point_addr(cp: &ConnectedPoint) -> &Multiaddr { +fn connection_point_addr(cp: &ConnectedPoint) -> MultiaddrWithoutPeerId { match cp { - ConnectedPoint::Dialer { address } => address, - ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, + ConnectedPoint::Dialer { address } => MultiaddrWithPeerId::try_from(address.to_owned()) + .expect("dialed address contains peerid in libp2p 0.38") + .into(), + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr + .to_owned() + .try_into() + .expect("send back address does not contain peerid in libp2p 0.38"), } } @@ -444,7 +447,7 @@ mod tests { Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); loop { - if let SwarmEvent::NewListenAddr(_) = swarm1.next_event().await { + if let Some(SwarmEvent::NewListenAddr { .. }) = swarm1.next().await { break; } } @@ -456,12 +459,15 @@ mod tests { Multihash::from_bytes(&peer1_id.to_bytes()).unwrap(), )); - let mut sub = swarm2.connect(addr.try_into().unwrap()).unwrap(); + let mut sub = swarm2 + .behaviour_mut() + .connect(addr.try_into().unwrap()) + .unwrap(); loop { tokio::select! { - _ = (&mut swarm1).next_event() => {}, - _ = (&mut swarm2).next_event() => {}, + _ = (&mut swarm1).next() => {}, + _ = (&mut swarm2).next() => {}, res = (&mut sub) => { // this is currently a success even though the connection is never really // established, the DummyProtocolsHandler doesn't do anything nor want the @@ -481,7 +487,7 @@ mod tests { res.unwrap(); // just to confirm that there are no connections. - assert_eq!(Vec::::new(), swarm1.connections_to(&peer2_id)); + assert_eq!(Vec::::new(), swarm1.behaviour().connections_to(&peer2_id)); break; } } @@ -498,19 +504,20 @@ mod tests { Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - let address; + let addr; loop { - if let SwarmEvent::NewListenAddr(addr) = swarm1.next_event().await { + if let Some(SwarmEvent::NewListenAddr { address, .. }) = swarm1.next().await { // wonder if there should be a timeout? - address = addr; + addr = address; break; } } let mut fut = swarm2 + .behaviour_mut() .connect( - MultiaddrWithoutPeerId::try_from(address) + MultiaddrWithoutPeerId::try_from(addr) .unwrap() .with(peer3_id), ) @@ -520,8 +527,8 @@ mod tests { loop { tokio::select! { - _ = swarm1.next_event() => {}, - _ = swarm2.next_event() => {}, + _ = swarm1.next() => {}, + _ = swarm2.next() => {}, res = &mut fut => { assert_eq!(res.unwrap_err(), Some("Pending connection: Invalid peer ID.".into())); return; @@ -538,19 +545,19 @@ mod tests { Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); Swarm::listen_on(&mut swarm1, "/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - let mut addresses = Vec::with_capacity(2); + let mut addr = Vec::with_capacity(2); - while addresses.len() < 2 { - if let SwarmEvent::NewListenAddr(addr) = swarm1.next_event().await { - addresses.push(addr); + while addr.len() < 2 { + if let Some(SwarmEvent::NewListenAddr { address, .. }) = swarm1.next().await { + addr.push(address); } } let targets = ( - MultiaddrWithoutPeerId::try_from(addresses[0].clone()) + MultiaddrWithoutPeerId::try_from(addr[0].clone()) .unwrap() .with(peer1_id), - MultiaddrWithoutPeerId::try_from(addresses[1].clone()) + MultiaddrWithoutPeerId::try_from(addr[1].clone()) .unwrap() .with(peer1_id), ); @@ -559,8 +566,8 @@ mod tests { // these two should be attempted in parallel. since we know both of them work, and they are // given in this order, we know that in libp2p 0.34 only the first should win, however // both should always be finished. - connections.push(swarm2.connect(targets.0).unwrap()); - connections.push(swarm2.connect(targets.1).unwrap()); + connections.push(swarm2.behaviour_mut().connect(targets.0).unwrap()); + connections.push(swarm2.behaviour_mut().connect(targets.1).unwrap()); let ready = connections // turn the private error type into Option .map_err(|e| e.into_inner()) @@ -570,8 +577,8 @@ mod tests { loop { tokio::select! { - _ = swarm1.next_event() => {} - _ = swarm2.next_event() => {} + _ = swarm1.next() => {} + _ = swarm2.next() => {} res = &mut ready => { assert_eq!( diff --git a/src/p2p/transport.rs b/src/p2p/transport.rs index 358f7d6eb..d6ead3c7d 100644 --- a/src/p2p/transport.rs +++ b/src/p2p/transport.rs @@ -2,7 +2,7 @@ use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::upgrade::Version; use libp2p::core::transport::Boxed; use libp2p::core::upgrade::SelectUpgrade; -use libp2p::dns::DnsConfig; +use libp2p::dns::TokioDnsConfig; use libp2p::identity; use libp2p::mplex::MplexConfig; use libp2p::noise::{self, NoiseConfig}; @@ -24,7 +24,7 @@ pub fn build_transport(keypair: identity::Keypair) -> io::Result { .unwrap(); let noise_config = NoiseConfig::xx(xx_keypair).into_authenticated(); - Ok(DnsConfig::new(TokioTcpConfig::new().nodelay(true))? + Ok(TokioDnsConfig::system(TokioTcpConfig::new())? .upgrade(Version::V1) .authenticate(noise_config) .multiplex(SelectUpgrade::new( diff --git a/unixfs/Cargo.toml b/unixfs/Cargo.toml index 52279bf83..2238bf0b8 100644 --- a/unixfs/Cargo.toml +++ b/unixfs/Cargo.toml @@ -16,14 +16,14 @@ cid = { default-features = false, version = "0.5" } either = { default-features = false, version = "1.5" } filetime = { optional = true, version = "0.2.12" } multihash = { default-features = false, version = "0.11" } -quick-protobuf = { default-features = false, features = ["std"], version = "0.7" } +quick-protobuf = { default-features = false, features = ["std"], version = "0.8" } sha2 = { default-features = false, version = "0.9" } [dev-dependencies] hash_hasher = "2.0.3" hex-literal = { default-features = false, version = "0.3" } libc = { default-features = false, version = "0.2.71" } -multibase = { default-features = false, version = "0.8.0" } +multibase = { default-features = false, version = "0.9" } tar = { default-features = false, version = "0.4" } criterion = { default-features = false, version = "0.3" }