From 91d50b2723fcacef39c7529887208445f7fc1e7c Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Mon, 17 Aug 2020 10:07:41 +0200 Subject: [PATCH] Custom yamux mode (#1691) * Allow override the yamux connection mode. * Add `multiplex_ext` to transport `Builder`. This method exposes the connection info and connected point to a provided function which creates the upgrade and can base the decision on `PeerId` or other connection information such as IP address. * Re-export `yamux::Mode`. Co-authored-by: Roman Borschel --- core/src/transport/upgrade.rs | 31 +++++++++++++++++++++++++++++++ muxers/yamux/src/lib.rs | 33 +++++++++++++++++++++++---------- 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 16784178271..b8f0a89c036 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -168,6 +168,37 @@ where Multiplex { info: Some(i), upgrade } }) } + + /// Like [`Builder::multiplex`] but accepts a function which returns the upgrade. + /// + /// The supplied function is applied to [`ConnectionInfo`] and [`ConnectedPoint`] + /// and returns an upgrade which receives the I/O resource `C` and must + /// produce a [`StreamMuxer`] `M`. The transport must already be authenticated. + /// This ends the (regular) transport upgrade process, yielding the underlying, + /// configured transport. + /// + /// ## Transitions + /// + /// * I/O upgrade: `C -> M`. + /// * Transport output: `(I, C) -> (I, M)`. + pub fn multiplex_ext(self, up: F) + -> AndThen Multiplex + Clone> + where + T: Transport, + C: AsyncRead + AsyncWrite + Unpin, + M: StreamMuxer, + I: ConnectionInfo, + U: InboundUpgrade, Output = M, Error = E>, + U: OutboundUpgrade, Output = M, Error = E> + Clone, + E: Error + 'static, + F: for<'a> FnOnce(&'a I, &'a ConnectedPoint) -> U + Clone + { + let version = self.version; + self.inner.and_then(move |(i, c), endpoint| { + let upgrade = upgrade::apply(c, up(&i, &endpoint), endpoint, version); + Multiplex { info: Some(i), upgrade } + }) + } } /// An upgrade that authenticates the remote peer, typically diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 6d510997b1b..c7f093ef69f 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -28,7 +28,7 @@ use parking_lot::Mutex; use std::{fmt, io, iter, ops::{Deref, DerefMut}, pin::Pin, task::Context}; use thiserror::Error; -pub use yamux::WindowUpdateMode; +pub use yamux::{Mode, WindowUpdateMode}; /// A Yamux connection. /// @@ -165,7 +165,10 @@ where /// The yamux configuration. #[derive(Clone)] -pub struct Config(yamux::Config); +pub struct Config { + config: yamux::Config, + mode: Option +} /// The yamux configuration for upgrading I/O resources which are ![`Send`]. #[derive(Clone)] @@ -173,7 +176,15 @@ pub struct LocalConfig(Config); impl Config { pub fn new(cfg: yamux::Config) -> Self { - Config(cfg) + Config { config: cfg, mode: None } + } + + /// Override the connection mode. + /// + /// This will always use the provided mode during the connection upgrade, + /// irrespective of whether an inbound or outbound upgrade happens. + pub fn override_mode(&mut self, mode: yamux::Mode) { + self.mode = Some(mode) } /// Turn this into a [`LocalConfig`] for use with upgrades of ![`Send`] resources. @@ -184,7 +195,7 @@ impl Config { impl Default for Config { fn default() -> Self { - Config(yamux::Config::default()) + Config::new(yamux::Config::default()) } } @@ -192,13 +203,13 @@ impl Deref for Config { type Target = yamux::Config; fn deref(&self) -> &Self::Target { - &self.0 + &self.config } } impl DerefMut for Config { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + &mut self.config } } @@ -229,7 +240,7 @@ where type Future = future::Ready>; fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future { - future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Server))) + future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Server)))) } } @@ -242,7 +253,8 @@ where type Future = future::Ready>; fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future { - future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Server))) + let cfg = self.0; + future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Server)))) } } @@ -255,7 +267,7 @@ where type Future = future::Ready>; fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { - future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Client))) + future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Client)))) } } @@ -268,7 +280,8 @@ where type Future = future::Ready>; fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future { - future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Client))) + let cfg = self.0; + future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Client)))) } }