Skip to content

Commit

Permalink
Custom yamux mode (#1691)
Browse files Browse the repository at this point in the history
* Allow override the yamux connection mode.

* Add `multiplex_ext` to transport `Builder`.

This method exposes the connection info and connected point to a provided
function which creates the upgrade and can base the decision on `PeerId`
or other connection information such as IP address.

* Re-export `yamux::Mode`.

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
  • Loading branch information
twittner and romanb authored Aug 17, 2020
1 parent d1024d2 commit 91d50b2
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 10 deletions.
31 changes: 31 additions & 0 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,37 @@ where
Multiplex { info: Some(i), upgrade }
})
}

/// Like [`Builder::multiplex`] but accepts a function which returns the upgrade.
///
/// The supplied function is applied to [`ConnectionInfo`] and [`ConnectedPoint`]
/// and returns an upgrade which receives the I/O resource `C` and must
/// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
/// This ends the (regular) transport upgrade process, yielding the underlying,
/// configured transport.
///
/// ## Transitions
///
/// * I/O upgrade: `C -> M`.
/// * Transport output: `(I, C) -> (I, M)`.
pub fn multiplex_ext<C, M, U, I, E, F>(self, up: F)
-> AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
F: for<'a> FnOnce(&'a I, &'a ConnectedPoint) -> U + Clone
{
let version = self.version;
self.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, up(&i, &endpoint), endpoint, version);
Multiplex { info: Some(i), upgrade }
})
}
}

/// An upgrade that authenticates the remote peer, typically
Expand Down
33 changes: 23 additions & 10 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use parking_lot::Mutex;
use std::{fmt, io, iter, ops::{Deref, DerefMut}, pin::Pin, task::Context};
use thiserror::Error;

pub use yamux::WindowUpdateMode;
pub use yamux::{Mode, WindowUpdateMode};

/// A Yamux connection.
///
Expand Down Expand Up @@ -165,15 +165,26 @@ where

/// The yamux configuration.
#[derive(Clone)]
pub struct Config(yamux::Config);
pub struct Config {
config: yamux::Config,
mode: Option<yamux::Mode>
}

/// The yamux configuration for upgrading I/O resources which are ![`Send`].
#[derive(Clone)]
pub struct LocalConfig(Config);

impl Config {
pub fn new(cfg: yamux::Config) -> Self {
Config(cfg)
Config { config: cfg, mode: None }
}

/// Override the connection mode.
///
/// This will always use the provided mode during the connection upgrade,
/// irrespective of whether an inbound or outbound upgrade happens.
pub fn override_mode(&mut self, mode: yamux::Mode) {
self.mode = Some(mode)
}

/// Turn this into a [`LocalConfig`] for use with upgrades of ![`Send`] resources.
Expand All @@ -184,21 +195,21 @@ impl Config {

impl Default for Config {
fn default() -> Self {
Config(yamux::Config::default())
Config::new(yamux::Config::default())
}
}

impl Deref for Config {
type Target = yamux::Config;

fn deref(&self) -> &Self::Target {
&self.0
&self.config
}
}

impl DerefMut for Config {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
&mut self.config
}
}

Expand Down Expand Up @@ -229,7 +240,7 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Server)))
future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Server))))
}
}

Expand All @@ -242,7 +253,8 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Server)))
let cfg = self.0;
future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Server))))
}
}

Expand All @@ -255,7 +267,7 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;

fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Client)))
future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Client))))
}
}

Expand All @@ -268,7 +280,8 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;

fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Client)))
let cfg = self.0;
future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Client))))
}
}

Expand Down

0 comments on commit 91d50b2

Please sign in to comment.