Skip to content

Commit

Permalink
swarm/: Rename references of TProtoHandler, ToggleProtoHandler and To…
Browse files Browse the repository at this point in the history
…ggleIntoProtoHandler
  • Loading branch information
maschad committed May 11, 2022
1 parent 725722a commit e2fd6f2
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 60 deletions.
28 changes: 14 additions & 14 deletions swarm/src/behaviour/toggle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ impl<TBehaviour> NetworkBehaviour for Toggle<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
type ConnectionHandler = ToggleIntoProtoHandler<TBehaviour::ConnectionHandler>;
type ConnectionHandler = ToggleIntoConnectionHandler<TBehaviour::ConnectionHandler>;
type OutEvent = TBehaviour::OutEvent;

fn new_handler(&mut self) -> Self::ConnectionHandler {
ToggleIntoProtoHandler {
ToggleIntoConnectionHandler {
inner: self.inner.as_mut().map(|i| i.new_handler()),
}
}
Expand Down Expand Up @@ -223,9 +223,9 @@ where
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(inner) = self.inner.as_mut() {
inner
.poll(cx, params)
.map(|action| action.map_handler(|h| ToggleIntoProtoHandler { inner: Some(h) }))
inner.poll(cx, params).map(|action| {
action.map_handler(|h| ToggleIntoConnectionHandler { inner: Some(h) })
})
} else {
Poll::Pending
}
Expand All @@ -244,22 +244,22 @@ where
}

/// Implementation of `IntoConnectionHandler` that can be in the disabled state.
pub struct ToggleIntoProtoHandler<TInner> {
pub struct ToggleIntoConnectionHandler<TInner> {
inner: Option<TInner>,
}

impl<TInner> IntoConnectionHandler for ToggleIntoProtoHandler<TInner>
impl<TInner> IntoConnectionHandler for ToggleIntoConnectionHandler<TInner>
where
TInner: IntoConnectionHandler,
{
type Handler = ToggleProtoHandler<TInner::Handler>;
type Handler = ToggleConnectionHandler<TInner::Handler>;

fn into_handler(
self,
remote_peer_id: &PeerId,
connected_point: &ConnectedPoint,
) -> Self::Handler {
ToggleProtoHandler {
ToggleConnectionHandler {
inner: self
.inner
.map(|h| h.into_handler(remote_peer_id, connected_point)),
Expand All @@ -276,11 +276,11 @@ where
}

/// Implementation of [`ConnectionHandler`] that can be in the disabled state.
pub struct ToggleProtoHandler<TInner> {
pub struct ToggleConnectionHandler<TInner> {
inner: Option<TInner>,
}

impl<TInner> ConnectionHandler for ToggleProtoHandler<TInner>
impl<TInner> ConnectionHandler for ToggleConnectionHandler<TInner>
where
TInner: ConnectionHandler,
{
Expand Down Expand Up @@ -426,7 +426,7 @@ mod tests {
use super::*;
use crate::handler::DummyConnectionHandler;

/// A disabled [`ToggleProtoHandler`] can receive listen upgrade errors in
/// A disabled [`ToggleConnectionHandler`] can receive listen upgrade errors in
/// the following two cases:
///
/// 1. Protocol negotiation on an incoming stream failed with no protocol
Expand All @@ -439,10 +439,10 @@ mod tests {
/// [`ConnectionHandlerSelect`](crate::connection_handler::ConnectionHandlerSelect)
/// the former might receive an inbound upgrade error even when disabled.
///
/// [`ToggleProtoHandler`] should ignore the error in both of these cases.
/// [`ToggleConnectionHandler`] should ignore the error in both of these cases.
#[test]
fn ignore_listen_upgrade_error_when_disabled() {
let mut handler = ToggleProtoHandler::<DummyConnectionHandler> { inner: None };
let mut handler = ToggleConnectionHandler::<DummyConnectionHandler> { inner: None };

handler.inject_listen_upgrade_error(Either::Right(()), ConnectionHandlerUpgrErr::Timeout);
}
Expand Down
5 changes: 4 additions & 1 deletion swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ where
/// Begins an orderly shutdown of the connection, returning the connection
/// handler and a `Future` that resolves when connection shutdown is complete.
pub fn close(self) -> (THandler, Close<StreamMuxerBox>) {
(self.handler.into_connection_handler(), self.muxing.close().0)
(
self.handler.into_connection_handler(),
self.muxing.close().0,
)
}

/// Polls the handler and the substream, forwarding events from the former to the latter and
Expand Down
38 changes: 19 additions & 19 deletions swarm/src/connection/handler_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,35 @@ use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration};
/// - Driving substream upgrades
/// - Handling connection timeout
// TODO: add a caching system for protocols that are supported or not
pub struct HandlerWrapper<TProtoHandler>
pub struct HandlerWrapper<TConnectionHandler>
where
TProtoHandler: ConnectionHandler,
TConnectionHandler: ConnectionHandler,
{
/// The underlying handler.
handler: TProtoHandler,
handler: TConnectionHandler,
/// Futures that upgrade incoming substreams.
negotiating_in: FuturesUnordered<
SubstreamUpgrade<
TProtoHandler::InboundOpenInfo,
TConnectionHandler::InboundOpenInfo,
InboundUpgradeApply<
Substream<StreamMuxerBox>,
SendWrapper<TProtoHandler::InboundProtocol>,
SendWrapper<TConnectionHandler::InboundProtocol>,
>,
>,
>,
/// Futures that upgrade outgoing substreams.
negotiating_out: FuturesUnordered<
SubstreamUpgrade<
TProtoHandler::OutboundOpenInfo,
TConnectionHandler::OutboundOpenInfo,
OutboundUpgradeApply<
Substream<StreamMuxerBox>,
SendWrapper<TProtoHandler::OutboundProtocol>,
SendWrapper<TConnectionHandler::OutboundProtocol>,
>,
>,
>,
/// For each outbound substream request, how to upgrade it. The first element of the tuple
/// is the unique identifier (see `unique_dial_upgrade_id`).
queued_dial_upgrades: Vec<(u64, SendWrapper<TProtoHandler::OutboundProtocol>)>,
queued_dial_upgrades: Vec<(u64, SendWrapper<TConnectionHandler::OutboundProtocol>)>,
/// Unique identifier assigned to each queued dial upgrade.
unique_dial_upgrade_id: u64,
/// The currently planned connection & handler shutdown.
Expand All @@ -79,7 +79,7 @@ where
substream_upgrade_protocol_override: Option<upgrade::Version>,
}

impl<TProtoHandler: ConnectionHandler> std::fmt::Debug for HandlerWrapper<TProtoHandler> {
impl<TConnectionHandler: ConnectionHandler> std::fmt::Debug for HandlerWrapper<TConnectionHandler> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HandlerWrapper")
.field("negotiating_in", &self.negotiating_in)
Expand All @@ -94,9 +94,9 @@ impl<TProtoHandler: ConnectionHandler> std::fmt::Debug for HandlerWrapper<TProto
}
}

impl<TProtoHandler: ConnectionHandler> HandlerWrapper<TProtoHandler> {
impl<TConnectionHandler: ConnectionHandler> HandlerWrapper<TConnectionHandler> {
pub(crate) fn new(
handler: TProtoHandler,
handler: TConnectionHandler,
substream_upgrade_protocol_override: Option<upgrade::Version>,
) -> Self {
Self {
Expand Down Expand Up @@ -224,22 +224,22 @@ where
}
}

pub type OutboundOpenInfo<TProtoHandler> = (
pub type OutboundOpenInfo<TConnectionHandler> = (
u64,
<TProtoHandler as ConnectionHandler>::OutboundOpenInfo,
<TConnectionHandler as ConnectionHandler>::OutboundOpenInfo,
Duration,
);

impl<TProtoHandler> HandlerWrapper<TProtoHandler>
impl<TConnectionHandler> HandlerWrapper<TConnectionHandler>
where
TProtoHandler: ConnectionHandler,
TConnectionHandler: ConnectionHandler,
{
pub fn inject_substream(
&mut self,
substream: Substream<StreamMuxerBox>,
// The first element of the tuple is the unique upgrade identifier
// (see `unique_dial_upgrade_id`).
endpoint: SubstreamEndpoint<OutboundOpenInfo<TProtoHandler>>,
endpoint: SubstreamEndpoint<OutboundOpenInfo<TConnectionHandler>>,
) {
match endpoint {
SubstreamEndpoint::Listener => {
Expand Down Expand Up @@ -290,7 +290,7 @@ where
}
}

pub fn inject_event(&mut self, event: TProtoHandler::InEvent) {
pub fn inject_event(&mut self, event: TConnectionHandler::InEvent) {
self.handler.inject_event(event);
}

Expand All @@ -303,8 +303,8 @@ where
cx: &mut Context<'_>,
) -> Poll<
Result<
Event<OutboundOpenInfo<TProtoHandler>, TProtoHandler::OutEvent>,
Error<TProtoHandler::Error>,
Event<OutboundOpenInfo<TConnectionHandler>, TConnectionHandler::OutEvent>,
Error<TConnectionHandler::Error>,
>,
> {
while let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) {
Expand Down
27 changes: 14 additions & 13 deletions swarm/src/handler/map_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ use libp2p_core::Multiaddr;
use std::{fmt::Debug, marker::PhantomData, task::Context, task::Poll};

/// Wrapper around a protocol handler that turns the input event into something else.
pub struct MapInEvent<TProtoHandler, TNewIn, TMap> {
inner: TProtoHandler,
pub struct MapInEvent<TConnectionHandler, TNewIn, TMap> {
inner: TConnectionHandler,
map: TMap,
marker: PhantomData<TNewIn>,
}

impl<TProtoHandler, TMap, TNewIn> MapInEvent<TProtoHandler, TNewIn, TMap> {
impl<TConnectionHandler, TMap, TNewIn> MapInEvent<TConnectionHandler, TNewIn, TMap> {
/// Creates a `MapInEvent`.
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
pub(crate) fn new(inner: TConnectionHandler, map: TMap) -> Self {
MapInEvent {
inner,
map,
Expand All @@ -44,20 +44,21 @@ impl<TProtoHandler, TMap, TNewIn> MapInEvent<TProtoHandler, TNewIn, TMap> {
}
}

impl<TProtoHandler, TMap, TNewIn> ConnectionHandler for MapInEvent<TProtoHandler, TNewIn, TMap>
impl<TConnectionHandler, TMap, TNewIn> ConnectionHandler
for MapInEvent<TConnectionHandler, TNewIn, TMap>
where
TProtoHandler: ConnectionHandler,
TMap: Fn(TNewIn) -> Option<TProtoHandler::InEvent>,
TConnectionHandler: ConnectionHandler,
TMap: Fn(TNewIn) -> Option<TConnectionHandler::InEvent>,
TNewIn: Debug + Send + 'static,
TMap: Send + 'static,
{
type InEvent = TNewIn;
type OutEvent = TProtoHandler::OutEvent;
type Error = TProtoHandler::Error;
type InboundProtocol = TProtoHandler::InboundProtocol;
type OutboundProtocol = TProtoHandler::OutboundProtocol;
type InboundOpenInfo = TProtoHandler::InboundOpenInfo;
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
type OutEvent = TConnectionHandler::OutEvent;
type Error = TConnectionHandler::Error;
type InboundProtocol = TConnectionHandler::InboundProtocol;
type OutboundProtocol = TConnectionHandler::OutboundProtocol;
type InboundOpenInfo = TConnectionHandler::InboundOpenInfo;
type OutboundOpenInfo = TConnectionHandler::OutboundOpenInfo;

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
self.inner.listen_protocol()
Expand Down
26 changes: 13 additions & 13 deletions swarm/src/handler/map_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,32 @@ use std::fmt::Debug;
use std::task::{Context, Poll};

/// Wrapper around a protocol handler that turns the output event into something else.
pub struct MapOutEvent<TProtoHandler, TMap> {
inner: TProtoHandler,
pub struct MapOutEvent<TConnectionHandler, TMap> {
inner: TConnectionHandler,
map: TMap,
}

impl<TProtoHandler, TMap> MapOutEvent<TProtoHandler, TMap> {
impl<TConnectionHandler, TMap> MapOutEvent<TConnectionHandler, TMap> {
/// Creates a `MapOutEvent`.
pub(crate) fn new(inner: TProtoHandler, map: TMap) -> Self {
pub(crate) fn new(inner: TConnectionHandler, map: TMap) -> Self {
MapOutEvent { inner, map }
}
}

impl<TProtoHandler, TMap, TNewOut> ConnectionHandler for MapOutEvent<TProtoHandler, TMap>
impl<TConnectionHandler, TMap, TNewOut> ConnectionHandler for MapOutEvent<TConnectionHandler, TMap>
where
TProtoHandler: ConnectionHandler,
TMap: FnMut(TProtoHandler::OutEvent) -> TNewOut,
TConnectionHandler: ConnectionHandler,
TMap: FnMut(TConnectionHandler::OutEvent) -> TNewOut,
TNewOut: Debug + Send + 'static,
TMap: Send + 'static,
{
type InEvent = TProtoHandler::InEvent;
type InEvent = TConnectionHandler::InEvent;
type OutEvent = TNewOut;
type Error = TProtoHandler::Error;
type InboundProtocol = TProtoHandler::InboundProtocol;
type OutboundProtocol = TProtoHandler::OutboundProtocol;
type InboundOpenInfo = TProtoHandler::InboundOpenInfo;
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
type Error = TConnectionHandler::Error;
type InboundProtocol = TConnectionHandler::InboundProtocol;
type OutboundProtocol = TConnectionHandler::OutboundProtocol;
type InboundOpenInfo = TConnectionHandler::InboundOpenInfo;
type OutboundOpenInfo = TConnectionHandler::OutboundOpenInfo;

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
self.inner.listen_protocol()
Expand Down

0 comments on commit e2fd6f2

Please sign in to comment.