Skip to content

Commit

Permalink
Remove tokio-codec dependency from multistream-select. (#1203)
Browse files Browse the repository at this point in the history
* Remove tokio-codec dependency from multistream-select.

In preparation for the eventual switch from tokio to std futures.

Includes some initial refactoring in preparation for further work
in the context of #659.

* Reduce default buffer sizes.

* Allow more than one frame to be buffered for sending.

* Doc tweaks.

* Remove superfluous (duplicated) Message types.
  • Loading branch information
romanb authored Jul 29, 2019
1 parent bcc7c4d commit 2fd9411
Show file tree
Hide file tree
Showing 11 changed files with 383 additions and 401 deletions.
3 changes: 1 addition & 2 deletions misc/multistream-select/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ bytes = "0.4"
futures = { version = "0.1" }
log = "0.4"
smallvec = "0.6"
tokio-codec = "0.1"
tokio-io = "0.1"
unsigned-varint = { version = "0.2.1", features = ["codec"] }
unsigned-varint = { version = "0.2.2" }

[dev-dependencies]
tokio = "0.1"
Expand Down
43 changes: 18 additions & 25 deletions misc/multistream-select/src/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,14 @@
//! `multistream-select` for the dialer.
use futures::{future::Either, prelude::*, stream::StreamFuture};
use crate::protocol::{
Dialer,
DialerFuture,
DialerToListenerMessage,
ListenerToDialerMessage
};
use crate::protocol::{Dialer, DialerFuture, Request, Response};
use log::trace;
use std::mem;
use tokio_io::{AsyncRead, AsyncWrite};
use crate::{Negotiated, ProtocolChoiceError};

/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer
/// either sequentially of by considering all protocols in parallel.
/// either sequentially or by considering all protocols in parallel.
pub type DialerSelectFuture<R, I> = Either<DialerSelectSeq<R, I>, DialerSelectPar<R, I>>;

/// Helps selecting a protocol amongst the ones supported.
Expand Down Expand Up @@ -75,7 +70,10 @@ where
{
let protocols = protocols.into_iter();
DialerSelectSeq {
inner: DialerSelectSeqState::AwaitDialer { dialer_fut: Dialer::dial(inner), protocols }
inner: DialerSelectSeqState::AwaitDialer {
dialer_fut: Dialer::dial(inner),
protocols
}
}
}

Expand Down Expand Up @@ -148,9 +146,7 @@ where
}
DialerSelectSeqState::NextProtocol { mut dialer, protocols, proto_name } => {
trace!("sending {:?}", proto_name.as_ref());
let req = DialerToListenerMessage::ProtocolRequest {
name: proto_name.clone()
};
let req = Request::Protocol { name: proto_name.clone() };
match dialer.start_send(req)? {
AsyncSink::Ready => {
self.inner = DialerSelectSeqState::FlushProtocol {
Expand Down Expand Up @@ -204,12 +200,12 @@ where
};
trace!("received {:?}", m);
match m.ok_or(ProtocolChoiceError::UnexpectedMessage)? {
ListenerToDialerMessage::ProtocolAck { ref name }
Response::Protocol { ref name }
if name.as_ref() == proto_name.as_ref() =>
{
return Ok(Async::Ready((proto_name, Negotiated(r.into_inner()))))
}
ListenerToDialerMessage::NotAvailable => {
Response::ProtocolNotAvailable => {
let proto_name = protocols.next()
.ok_or(ProtocolChoiceError::NoProtocolFound)?;
self.inner = DialerSelectSeqState::NextProtocol {
Expand Down Expand Up @@ -244,9 +240,8 @@ where
}
}


/// Future, returned by `dialer_select_proto_parallel`, which selects a protocol and dialer in
/// parellel, by first requesting the liste of protocols supported by the remote endpoint and
/// parallel, by first requesting the list of protocols supported by the remote endpoint and
/// then selecting the most appropriate one by applying a match predicate to the result.
pub struct DialerSelectPar<R, I>
where
Expand Down Expand Up @@ -319,7 +314,7 @@ where
}
DialerSelectParState::ProtocolList { mut dialer, protocols } => {
trace!("requesting protocols list");
match dialer.start_send(DialerToListenerMessage::ProtocolsListRequest)? {
match dialer.start_send(Request::ListProtocols)? {
AsyncSink::Ready => {
self.inner = DialerSelectParState::FlushListRequest {
dialer,
Expand Down Expand Up @@ -359,15 +354,15 @@ where
Err((e, _)) => return Err(ProtocolChoiceError::from(e))
};
trace!("protocols list response: {:?}", resp);
let list =
if let Some(ListenerToDialerMessage::ProtocolsListResponse { list }) = resp {
list
let supported =
if let Some(Response::SupportedProtocols { protocols }) = resp {
protocols
} else {
return Err(ProtocolChoiceError::UnexpectedMessage)
};
let mut found = None;
for local_name in protocols {
for remote_name in &list {
for remote_name in &supported {
if remote_name.as_ref() == local_name.as_ref() {
found = Some(local_name);
break;
Expand All @@ -381,10 +376,8 @@ where
self.inner = DialerSelectParState::Protocol { dialer, proto_name }
}
DialerSelectParState::Protocol { mut dialer, proto_name } => {
trace!("requesting protocol: {:?}", proto_name.as_ref());
let req = DialerToListenerMessage::ProtocolRequest {
name: proto_name.clone()
};
trace!("Requesting protocol: {:?}", proto_name.as_ref());
let req = Request::Protocol { name: proto_name.clone() };
match dialer.start_send(req)? {
AsyncSink::Ready => {
self.inner = DialerSelectParState::FlushProtocol { dialer, proto_name }
Expand Down Expand Up @@ -420,7 +413,7 @@ where
};
trace!("received {:?}", resp);
match resp {
Some(ListenerToDialerMessage::ProtocolAck { ref name })
Some(Response::Protocol { ref name })
if name.as_ref() == proto_name.as_ref() =>
{
return Ok(Async::Ready((proto_name, Negotiated(dialer.into_inner()))))
Expand Down
19 changes: 7 additions & 12 deletions misc/multistream-select/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
//! Main `ProtocolChoiceError` error.
use crate::protocol::MultistreamSelectError;
use std::error;
use std::fmt;
use std::io::Error as IoError;
use std::error::Error;
use std::{fmt, io};

/// Error that can happen when negotiating a protocol with the remote.
#[derive(Debug)]
Expand All @@ -39,21 +38,18 @@ pub enum ProtocolChoiceError {
}

impl From<MultistreamSelectError> for ProtocolChoiceError {
#[inline]
fn from(err: MultistreamSelectError) -> ProtocolChoiceError {
ProtocolChoiceError::MultistreamSelectError(err)
}
}

impl From<IoError> for ProtocolChoiceError {
#[inline]
fn from(err: IoError) -> ProtocolChoiceError {
impl From<io::Error> for ProtocolChoiceError {
fn from(err: io::Error) -> ProtocolChoiceError {
MultistreamSelectError::from(err).into()
}
}

impl error::Error for ProtocolChoiceError {
#[inline]
impl Error for ProtocolChoiceError {
fn description(&self) -> &str {
match *self {
ProtocolChoiceError::MultistreamSelectError(_) => "error in the protocol",
Expand All @@ -66,7 +62,7 @@ impl error::Error for ProtocolChoiceError {
}
}

fn cause(&self) -> Option<&dyn error::Error> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match *self {
ProtocolChoiceError::MultistreamSelectError(ref err) => Some(err),
_ => None,
Expand All @@ -75,8 +71,7 @@ impl error::Error for ProtocolChoiceError {
}

impl fmt::Display for ProtocolChoiceError {
#[inline]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(fmt, "{}", error::Error::description(self))
write!(fmt, "{}", Error::description(self))
}
}
Loading

0 comments on commit 2fd9411

Please sign in to comment.