Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(swarm)!: don't be generic over Transport #3272

Merged
merged 7 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions misc/metrics/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,8 @@ enum PendingInboundConnectionError {
ConnectionLimit,
}

impl<TTransErr> From<&libp2p_swarm::PendingInboundConnectionError<TTransErr>>
for PendingInboundConnectionError
{
fn from(error: &libp2p_swarm::PendingInboundConnectionError<TTransErr>) -> Self {
impl From<&libp2p_swarm::PendingInboundConnectionError> for PendingInboundConnectionError {
fn from(error: &libp2p_swarm::PendingInboundConnectionError) -> Self {
match error {
libp2p_swarm::PendingInboundConnectionError::WrongPeerId { .. } => {
PendingInboundConnectionError::WrongPeerId
Expand Down
4 changes: 4 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@

- Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134].

- Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`.
These two types are always used with `std::io::Error`. See [PR 3272].

[PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170
[PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134
[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153
[PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272

# 0.41.1

Expand Down
7 changes: 3 additions & 4 deletions swarm/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ impl<THandlerErr> From<io::Error> for ConnectionError<THandlerErr> {
/// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to
/// [`PendingInboundConnectionError`], one or more [`TransportError`]s can occur for a single
/// connection.
pub type PendingOutboundConnectionError<TTransErr> =
PendingConnectionError<Vec<(Multiaddr, TransportError<TTransErr>)>>;
pub type PendingOutboundConnectionError =
PendingConnectionError<Vec<(Multiaddr, TransportError<io::Error>)>>;

/// Errors that can occur in the context of a pending incoming `Connection`.
pub type PendingInboundConnectionError<TTransErr> =
PendingConnectionError<TransportError<TTransErr>>;
pub type PendingInboundConnectionError = PendingConnectionError<TransportError<io::Error>>;

/// Errors that can occur in the context of a pending `Connection`.
#[derive(Debug)]
Expand Down
48 changes: 16 additions & 32 deletions swarm/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError,
PendingInboundConnectionError, PendingOutboundConnectionError,
},
transport::{Transport, TransportError},
transport::TransportError,
ConnectedPoint, ConnectionHandler, Executor, IntoConnectionHandler, Multiaddr, PeerId,
};
use concurrent_dial::ConcurrentDial;
Expand Down Expand Up @@ -79,9 +79,8 @@ impl ExecSwitch {
}

/// A connection `Pool` manages a set of connections for each peer.
pub struct Pool<THandler, TTrans>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think of making this pub (crate) instead? I know that it doesn't change anything as pool is not public but it helps visually understand that the type is shared across the crate (as opposed to just being private, which it can't be). pool is also pub (crate).
(this suggestion also applies to PoolEvent)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I think I'd prefer to not use pub(crate) but I could be convinced otherwise.

It would be nice if we can assume that:

  • All modules are private
  • pub types therefore don't leak into the public API
  • pub(crate) is therefore a workaround because rhe module isn't private for some reason

I am not fully convinced it is a good idea though, might cause a lot of complexity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish Rust had a way to whitelist what is in the public API of a crate and fail to compile if something is being added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we decide that every pub type follows the convention used in #3238 we could use pub (crate) only inside the the crate itself to distinguish from the actually pub types on the main lib.rs for example, one issue with the assumptions above for example is that we have pub mod but in the end no strong opinion, I can understand your POV as well and share the same frustrations.

pub struct Pool<THandler>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
{
local_id: PeerId,
Expand Down Expand Up @@ -124,10 +123,10 @@ where

/// Sender distributed to pending tasks for reporting events back
/// to the pool.
pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent<TTrans>>,
pending_connection_events_tx: mpsc::Sender<task::PendingConnectionEvent>,

/// Receiver for events reported from pending tasks.
pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent<TTrans>>,
pending_connection_events_rx: mpsc::Receiver<task::PendingConnectionEvent>,

/// Sender distributed to established tasks for reporting events back
/// to the pool.
Expand Down Expand Up @@ -213,7 +212,7 @@ impl<THandler> PendingConnection<THandler> {
}
}

impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THandler, TTrans> {
impl<THandler: IntoConnectionHandler> fmt::Debug for Pool<THandler> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("Pool")
.field("counters", &self.counters)
Expand All @@ -223,10 +222,7 @@ impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THa

/// Event that can happen on the `Pool`.
#[derive(Debug)]
pub enum PoolEvent<THandler: IntoConnectionHandler, TTrans>
where
TTrans: Transport,
{
pub enum PoolEvent<THandler: IntoConnectionHandler> {
/// A new connection has been established.
ConnectionEstablished {
id: ConnectionId,
Expand All @@ -239,7 +235,7 @@ where
/// [`Some`] when the new connection is an outgoing connection.
/// Addresses are dialed in parallel. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial.
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<TTrans::Error>)>>,
concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<std::io::Error>)>>,
/// How long it took to establish this connection.
established_in: std::time::Duration,
},
Expand Down Expand Up @@ -272,7 +268,7 @@ where
/// The ID of the failed connection.
id: ConnectionId,
/// The error that occurred.
error: PendingOutboundConnectionError<TTrans::Error>,
error: PendingOutboundConnectionError,
/// The handler that was supposed to handle the connection.
handler: THandler,
/// The (expected) peer of the failed connection.
Expand All @@ -288,7 +284,7 @@ where
/// Local connection address.
local_addr: Multiaddr,
/// The error that occurred.
error: PendingInboundConnectionError<TTrans::Error>,
error: PendingInboundConnectionError,
/// The handler that was supposed to handle the connection.
handler: THandler,
},
Expand All @@ -312,10 +308,9 @@ where
},
}

impl<THandler, TTrans> Pool<THandler, TTrans>
impl<THandler> Pool<THandler>
where
THandler: IntoConnectionHandler,
TTrans: Transport,
{
/// Creates a new empty `Pool`.
pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self {
Expand Down Expand Up @@ -429,12 +424,9 @@ where
}
}

impl<THandler, TTrans> Pool<THandler, TTrans>
impl<THandler> Pool<THandler>
where
THandler: IntoConnectionHandler,
TTrans: Transport + 'static,
TTrans::Output: Send + 'static,
TTrans::Error: Send + 'static,
{
/// Adds a pending outgoing connection to the pool in the form of a `Future`
/// that establishes and negotiates the connection.
Expand All @@ -448,22 +440,15 @@ where
'static,
(
Multiaddr,
Result<
<TTrans as Transport>::Output,
TransportError<<TTrans as Transport>::Error>,
>,
Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
),
>,
>,
peer: Option<PeerId>,
handler: THandler,
role_override: Endpoint,
dial_concurrency_factor_override: Option<NonZeroU8>,
) -> Result<ConnectionId, (ConnectionLimit, THandler)>
where
TTrans: Send,
TTrans::Dial: Send + 'static,
{
) -> Result<ConnectionId, (ConnectionLimit, THandler)> {
if let Err(limit) = self.counters.check_max_pending_outgoing() {
return Err((limit, handler));
};
Expand Down Expand Up @@ -515,7 +500,7 @@ where
info: IncomingInfo<'_>,
) -> Result<ConnectionId, (ConnectionLimit, THandler)>
where
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
{
let endpoint = info.create_connected_point();

Expand Down Expand Up @@ -552,9 +537,8 @@ where
}

/// Polls the connection pool for events.
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler, TTrans>>
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler>>
where
TTrans: Transport<Output = (PeerId, StreamMuxerBox)>,
THandler: IntoConnectionHandler + 'static,
THandler::Handler: ConnectionHandler + Send,
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
Expand Down Expand Up @@ -677,7 +661,7 @@ where
),
};

let error: Result<(), PendingInboundConnectionError<_>> = self
let error = self
.counters
// Check general established connection limit.
.check_max_established(&endpoint)
Expand Down
42 changes: 16 additions & 26 deletions swarm/src/connection/pool/concurrent_dial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,38 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{
transport::{Transport, TransportError},
Multiaddr,
};
use crate::{transport::TransportError, Multiaddr};
use futures::{
future::{BoxFuture, Future},
ready,
stream::{FuturesUnordered, StreamExt},
};
use libp2p_core::muxing::StreamMuxerBox;
use libp2p_core::PeerId;
use std::{
num::NonZeroU8,
pin::Pin,
task::{Context, Poll},
};

type Dial<TTrans> = BoxFuture<
type Dial = BoxFuture<
'static,
(
Multiaddr,
Result<<TTrans as Transport>::Output, TransportError<<TTrans as Transport>::Error>>,
Result<(PeerId, StreamMuxerBox), TransportError<std::io::Error>>,
),
>;

pub struct ConcurrentDial<TTrans: Transport> {
dials: FuturesUnordered<Dial<TTrans>>,
pending_dials: Box<dyn Iterator<Item = Dial<TTrans>> + Send>,
errors: Vec<(Multiaddr, TransportError<TTrans::Error>)>,
pub struct ConcurrentDial {
dials: FuturesUnordered<Dial>,
pending_dials: Box<dyn Iterator<Item = Dial> + Send>,
errors: Vec<(Multiaddr, TransportError<std::io::Error>)>,
}

impl<TTrans: Transport> Unpin for ConcurrentDial<TTrans> {}
impl Unpin for ConcurrentDial {}

impl<TTrans> ConcurrentDial<TTrans>
where
TTrans: Transport + Send + 'static,
TTrans::Output: Send,
TTrans::Error: Send,
TTrans::Dial: Send + 'static,
{
pub(crate) fn new(pending_dials: Vec<Dial<TTrans>>, concurrency_factor: NonZeroU8) -> Self {
impl ConcurrentDial {
pub(crate) fn new(pending_dials: Vec<Dial>, concurrency_factor: NonZeroU8) -> Self {
let mut pending_dials = pending_dials.into_iter();

let dials = FuturesUnordered::new();
Expand All @@ -75,20 +68,17 @@ where
}
}

impl<TTrans> Future for ConcurrentDial<TTrans>
where
TTrans: Transport,
{
impl Future for ConcurrentDial {
type Output = Result<
// Either one dial succeeded, returning the negotiated [`PeerId`], the address, the
// muxer and the addresses and errors of the dials that failed before.
(
Multiaddr,
TTrans::Output,
Vec<(Multiaddr, TransportError<TTrans::Error>)>,
(PeerId, StreamMuxerBox),
Vec<(Multiaddr, TransportError<std::io::Error>)>,
),
// Or all dials failed, thus returning the address and error for each dial.
Vec<(Multiaddr, TransportError<TTrans::Error>)>,
Vec<(Multiaddr, TransportError<std::io::Error>)>,
>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Expand Down
35 changes: 13 additions & 22 deletions swarm/src/connection/pool/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
connection::{
self, ConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError,
},
transport::{Transport, TransportError},
transport::TransportError,
ConnectionHandler, Multiaddr, PeerId,
};
use futures::{
Expand All @@ -35,6 +35,7 @@ use futures::{
SinkExt, StreamExt,
};
use libp2p_core::connection::ConnectionId;
use libp2p_core::muxing::StreamMuxerBox;
use std::pin::Pin;
use void::Void;

Expand All @@ -48,26 +49,19 @@ pub enum Command<T> {
Close,
}

#[derive(Debug)]
pub enum PendingConnectionEvent<TTrans>
where
TTrans: Transport,
{
pub enum PendingConnectionEvent {
ConnectionEstablished {
id: ConnectionId,
output: TTrans::Output,
output: (PeerId, StreamMuxerBox),
/// [`Some`] when the new connection is an outgoing connection.
/// Addresses are dialed in parallel. Contains the addresses and errors
/// of dial attempts that failed before the one successful dial.
outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<TTrans::Error>)>)>,
outgoing: Option<(Multiaddr, Vec<(Multiaddr, TransportError<std::io::Error>)>)>,
},
/// A pending connection failed.
PendingFailed {
id: ConnectionId,
error: Either<
PendingOutboundConnectionError<TTrans::Error>,
PendingInboundConnectionError<TTrans::Error>,
>,
error: Either<PendingOutboundConnectionError, PendingInboundConnectionError>,
},
}

Expand Down Expand Up @@ -97,14 +91,12 @@ pub enum EstablishedConnectionEvent<THandler: ConnectionHandler> {
},
}

pub async fn new_for_pending_outgoing_connection<TTrans>(
pub async fn new_for_pending_outgoing_connection(
connection_id: ConnectionId,
dial: ConcurrentDial<TTrans>,
dial: ConcurrentDial,
abort_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
) where
TTrans: Transport,
{
mut events: mpsc::Sender<PendingConnectionEvent>,
) {
match futures::future::select(abort_receiver, Box::pin(dial)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
let _ = events
Expand Down Expand Up @@ -135,14 +127,13 @@ pub async fn new_for_pending_outgoing_connection<TTrans>(
}
}

pub async fn new_for_pending_incoming_connection<TFut, TTrans>(
pub async fn new_for_pending_incoming_connection<TFut>(
connection_id: ConnectionId,
future: TFut,
abort_receiver: oneshot::Receiver<Void>,
mut events: mpsc::Sender<PendingConnectionEvent<TTrans>>,
mut events: mpsc::Sender<PendingConnectionEvent>,
) where
TTrans: Transport,
TFut: Future<Output = Result<TTrans::Output, TTrans::Error>> + Send + 'static,
TFut: Future<Output = Result<(PeerId, StreamMuxerBox), std::io::Error>> + Send + 'static,
{
match futures::future::select(abort_receiver, Box::pin(future)).await {
Either::Left((Err(oneshot::Canceled), _)) => {
Expand Down
Loading