diff --git a/Cargo.toml b/Cargo.toml
index b0e87668d5..2750432a75 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,18 +22,20 @@ include = [
[dependencies]
bytes = "0.4.4"
-futures = "=0.2.0-beta"
-futures-core = "=0.2.0-beta"
-futures-timer = { git = "https://github.com/alexcrichton/futures-timer.git", rev = "682e792" }
+futures = "0.1.17"
+futures-cpupool = "0.1.6"
+futures-timer = "0.1.0"
http = "0.1.5"
httparse = "1.0"
iovec = "0.1"
log = "0.4"
net2 = "0.2.32"
time = "0.1"
-tokio = { git = "https://github.com/seanmonstar/tokio.git", branch = "futures2-use-after-free", features = ["unstable-futures"] }
-tokio-executor = { git = "https://github.com/seanmonstar/tokio.git", branch = "futures2-use-after-free", features = ["unstable-futures"] }
-want = { git = "https://github.com/srijs/want.git", branch = "futures-0.2" }
+tokio = "0.1.5"
+tokio-executor = "0.1.0"
+tokio-service = "0.1"
+tokio-io = "0.1"
+want = "0.0.2"
[dev-dependencies]
num_cpus = "1.0"
@@ -43,6 +45,3 @@ url = "1.0"
[features]
nightly = []
-
-[replace]
-"futures:0.2.0-beta" = { git = "https://github.com/rust-lang-nursery/futures-rs", rev = "30473ba" }
diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs
index fe803b7d51..075f98d6e0 100644
--- a/benches/end_to_end.rs
+++ b/benches/end_to_end.rs
@@ -8,8 +8,7 @@ extern crate tokio;
use std::net::SocketAddr;
-use futures::{FutureExt, StreamExt};
-use futures::executor::block_on;
+use futures::{Future, Stream};
use tokio::runtime::Runtime;
use tokio::net::TcpListener;
@@ -23,20 +22,19 @@ fn get_one_at_a_time(b: &mut test::Bencher) {
let addr = spawn_hello(&mut rt);
let client = hyper::Client::configure()
- .build(&rt.handle());
+ .build_with_executor(&rt.reactor(), rt.executor());
let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap();
b.bytes = 160 * 2 + PHRASE.len() as u64;
b.iter(move || {
- block_on(client.get(url.clone())
- .with_executor(rt.executor())
+ client.get(url.clone())
.and_then(|res| {
res.into_body().into_stream().for_each(|_chunk| {
Ok(())
- }).map(|_| ())
+ })
})
- ).expect("client wait");
+ .wait().expect("client wait");
});
}
@@ -46,7 +44,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let addr = spawn_hello(&mut rt);
let client = hyper::Client::configure()
- .build(&rt.handle());
+ .build_with_executor(&rt.reactor(), rt.executor());
let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap();
@@ -56,14 +54,11 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let mut req = Request::new(post.into());
*req.method_mut() = Method::POST;
*req.uri_mut() = url.clone();
- block_on(client.request(req)
- .with_executor(rt.executor())
- .and_then(|res| {
- res.into_body().into_stream().for_each(|_chunk| {
- Ok(())
- }).map(|_| ())
+ client.request(req).and_then(|res| {
+ res.into_body().into_stream().for_each(|_chunk| {
+ Ok(())
})
- ).expect("client wait");
+ }).wait().expect("client wait");
});
}
@@ -81,22 +76,21 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr {
let service = const_service(service_fn(|req: Request
| {
req.into_body()
.into_stream()
- .concat()
+ .concat2()
.map(|_| {
Response::new(Body::from(PHRASE))
})
}));
let srv = listener.incoming()
- .next()
+ .into_future()
.map_err(|(e, _inc)| panic!("accept error: {}", e))
.and_then(move |(accepted, _inc)| {
let socket = accepted.expect("accepted socket");
http.serve_connection(socket, service.new_service().expect("new_service"))
.map(|_| ())
.map_err(|_| ())
- })
- .map_err(|_| panic!("server error"));
- rt.spawn2(srv);
+ });
+ rt.spawn(srv);
return addr
}
diff --git a/benches/server.rs b/benches/server.rs
index 25c98da372..158bdb1ffc 100644
--- a/benches/server.rs
+++ b/benches/server.rs
@@ -11,8 +11,8 @@ use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc;
-use futures::{future, stream, FutureExt, StreamExt};
-use futures::channel::oneshot;
+use futures::{future, stream, Future, Stream};
+use futures::sync::oneshot;
use hyper::{Body, Request, Response};
use hyper::server::Service;
@@ -31,7 +31,7 @@ macro_rules! bench_server {
})).unwrap();
let addr = srv.local_addr().unwrap();
addr_tx.send(addr).unwrap();
- tokio::runtime::run2(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
+ tokio::run(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e)));
});
addr_rx.recv().unwrap()
diff --git a/examples/client.rs b/examples/client.rs
index 53507e81bd..e5d9a2862a 100644
--- a/examples/client.rs
+++ b/examples/client.rs
@@ -8,7 +8,7 @@ extern crate pretty_env_logger;
use std::env;
use std::io::{self, Write};
-use futures::{FutureExt, StreamExt};
+use futures::{Future, Stream};
use futures::future::lazy;
use hyper::{Body, Client, Request};
@@ -30,7 +30,7 @@ fn main() {
return;
}
- tokio::runtime::run2(lazy(move |_| {
+ tokio::run(lazy(move || {
let client = Client::default();
let mut req = Request::new(Body::empty());
@@ -43,13 +43,10 @@ fn main() {
res.into_parts().1.into_stream().for_each(|chunk| {
io::stdout().write_all(&chunk).map_err(From::from)
})
- }).then(|result| {
- if let Some(err) = result.err() {
- eprintln!("Error {}", err);
- } else {
- println!("\n\nDone.");
- }
- Ok(())
+ }).map(|_| {
+ println!("\n\nDone.");
+ }).map_err(|err| {
+ eprintln!("Error {}", err);
})
}));
}
diff --git a/examples/hello.rs b/examples/hello.rs
index b4195d5d6e..80d33542db 100644
--- a/examples/hello.rs
+++ b/examples/hello.rs
@@ -4,7 +4,7 @@ extern crate futures;
extern crate pretty_env_logger;
extern crate tokio;
-use futures::FutureExt;
+use futures::Future;
use futures::future::lazy;
use hyper::{Body, Response};
@@ -20,13 +20,13 @@ fn main() {
Ok(Response::new(Body::from(PHRASE)))
}));
- tokio::runtime::run2(lazy(move |_| {
+ tokio::run(lazy(move || {
let server = Http::new()
.sleep_on_errors(true)
.bind(&addr, new_service)
.unwrap();
- println!("Listening on http://{}", server.local_addr().unwrap());
- server.run().map_err(|err| panic!("Server error {}", err))
+ println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
+ server.run().map_err(|err| eprintln!("Server error {}", err))
}));
}
diff --git a/examples/multi_server.rs b/examples/multi_server.rs
index a42fc5bfa5..239d7abb90 100644
--- a/examples/multi_server.rs
+++ b/examples/multi_server.rs
@@ -4,9 +4,8 @@ extern crate futures;
extern crate pretty_env_logger;
extern crate tokio;
-use futures::{FutureExt, StreamExt};
+use futures::{Future, Stream};
use futures::future::{FutureResult, lazy};
-use futures::executor::spawn;
use hyper::{Body, Method, Request, Response, StatusCode};
use hyper::server::{Http, Service};
@@ -44,20 +43,22 @@ fn main() {
let addr1 = "127.0.0.1:1337".parse().unwrap();
let addr2 = "127.0.0.1:1338".parse().unwrap();
- tokio::runtime::run2(lazy(move |_| {
+ tokio::run(lazy(move || {
let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap();
let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap();
println!("Listening on http://{}", srv1.incoming_ref().local_addr());
println!("Listening on http://{}", srv2.incoming_ref().local_addr());
- spawn(srv1.map_err(|err| panic!("srv1 error: {:?}", err)).for_each(move |conn| {
- spawn(conn.map(|_| ()).map_err(|err| panic!("srv1 error: {:?}", err)))
- }).map(|_| ()));
+ tokio::spawn(srv1.for_each(move |conn| {
+ tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv1 error: {:?}", err)));
+ Ok(())
+ }).map_err(|_| ()));
- spawn(srv2.map_err(|err| panic!("srv2 error: {:?}", err)).for_each(move |conn| {
- spawn(conn.map(|_| ()).map_err(|err| panic!("srv2 error: {:?}", err)))
- }).map(|_| ()));
+ tokio::spawn(srv2.for_each(move |conn| {
+ tokio::spawn(conn.map(|_| ()).map_err(|err| println!("srv2 error: {:?}", err)));
+ Ok(())
+ }).map_err(|_| ()));
Ok(())
}));
diff --git a/examples/params.rs b/examples/params.rs
index eb576cc1bf..d362840f52 100644
--- a/examples/params.rs
+++ b/examples/params.rs
@@ -5,7 +5,7 @@ extern crate pretty_env_logger;
extern crate tokio;
extern crate url;
-use futures::{Future, FutureExt, StreamExt};
+use futures::{Future, Stream};
use futures::future::lazy;
use hyper::{Body, Method, Request, Response, StatusCode};
@@ -32,7 +32,7 @@ impl Service for ParamExample {
Box::new(futures::future::ok(Response::new(INDEX.into())))
},
(&Method::POST, "/post") => {
- Box::new(req.into_parts().1.into_stream().concat().map(|b| {
+ Box::new(req.into_parts().1.into_stream().concat2().map(|b| {
// Parse the request body. form_urlencoded::parse
// always succeeds, but in general parsing may
// fail (for example, an invalid post of json), so
@@ -98,11 +98,9 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
- tokio::runtime::run2(lazy(move |_| {
+ tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap();
- println!("Listening on http://{}", server.local_addr().unwrap());
- server.run().recover(|err| {
- eprintln!("Server error {}", err)
- })
+ println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
+ server.run().map_err(|err| eprintln!("Server error {}", err))
}));
}
diff --git a/examples/send_file.rs b/examples/send_file.rs
index ad0cdc12d9..8e6fe917a0 100644
--- a/examples/send_file.rs
+++ b/examples/send_file.rs
@@ -4,9 +4,9 @@ extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;
-use futures::{Future, FutureExt};
+use futures::{Future/*, Sink*/};
use futures::future::lazy;
-use futures::channel::oneshot;
+use futures::sync::oneshot;
use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
use hyper::error::Error;
@@ -141,9 +141,9 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
- tokio::runtime::run2(lazy(move |_| {
+ tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap();
- println!("Listening on http://{}", server.local_addr().unwrap());
- server.run().map_err(|err| panic!("Server error {}", err))
+ println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
+ server.run().map_err(|err| eprintln!("Server error {}", err))
}));
}
diff --git a/examples/server.rs b/examples/server.rs
index 9486ab8d04..b5d2153958 100644
--- a/examples/server.rs
+++ b/examples/server.rs
@@ -4,7 +4,7 @@ extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;
-use futures::FutureExt;
+use futures::Future;
use futures::future::{FutureResult, lazy};
use hyper::{Body, Method, Request, Response, StatusCode};
@@ -43,11 +43,9 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
- tokio::runtime::run2(lazy(move |_| {
+ tokio::run(lazy(move || {
let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
- println!("Listening on http://{}", server.local_addr().unwrap());
- server.run().recover(|err| {
- eprintln!("Server error {}", err)
- })
+ println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
+ server.run().map_err(|err| eprintln!("Server error {}", err))
}));
}
diff --git a/examples/web_api.rs b/examples/web_api.rs
index d792eeafd8..7923d4afd9 100644
--- a/examples/web_api.rs
+++ b/examples/web_api.rs
@@ -4,8 +4,7 @@ extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;
-use futures::{Future, FutureExt, StreamExt};
-use futures::executor::spawn;
+use futures::{Future, Stream};
use futures::future::lazy;
use tokio::reactor::Handle;
@@ -79,15 +78,13 @@ fn main() {
pretty_env_logger::init();
let addr = "127.0.0.1:1337".parse().unwrap();
- tokio::runtime::run2(lazy(move |_| {
+ tokio::run(lazy(move || {
let handle = Handle::current();
let serve = Http::new().serve_addr(&addr, move || Ok(ResponseExamples(handle.clone()))).unwrap();
- println!("Listening on http://{}", serve.incoming_ref().local_addr());
+ println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr());
- serve.map_err(|err| panic!("server error {:?}", err)).for_each(move |conn| {
- spawn(conn.recover(|err| {
- println!("connection error: {:?}", err);
- }))
- }).map(|_| ())
+ serve.map_err(|_| ()).for_each(move |conn| {
+ tokio::spawn(conn.map(|_| ()).map_err(|err| println!("serve error: {:?}", err)))
+ })
}));
}
diff --git a/src/client/conn.rs b/src/client/conn.rs
index c01b24e488..852dc9ffa2 100644
--- a/src/client/conn.rs
+++ b/src/client/conn.rs
@@ -11,10 +11,9 @@ use std::fmt;
use std::marker::PhantomData;
use bytes::Bytes;
-use futures::{Async, Future, FutureExt, Poll};
+use futures::{Async, Future, Poll};
use futures::future::{self, Either};
-use futures::task;
-use futures::io::{AsyncRead, AsyncWrite};
+use tokio_io::{AsyncRead, AsyncWrite};
use proto;
use proto::body::Entity;
@@ -124,8 +123,8 @@ impl SendRequest
/// Polls to determine whether this sender can be used yet for a request.
///
/// If the associated connection is closed, this returns an Error.
- pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
- self.dispatch.poll_ready(cx)
+ pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
+ self.dispatch.poll_ready()
}
pub(super) fn is_ready(&self) -> bool {
@@ -167,7 +166,7 @@ where
/// # use http::header::HOST;
/// # use hyper::client::conn::SendRequest;
/// # use hyper::Body;
- /// use futures::FutureExt;
+ /// use futures::Future;
/// use hyper::Request;
///
/// # fn doc(mut tx: SendRequest) {
@@ -191,19 +190,19 @@ where
pub fn send_request(&mut self, req: Request) -> ResponseFuture {
let inner = match self.dispatch.send(req) {
Ok(rx) => {
- rx.then(move |res| {
+ Either::A(rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
- }).left()
+ }))
},
Err(_req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled(Some("connection was not ready"));
- future::err(err).right()
+ Either::B(future::err(err))
}
};
@@ -219,7 +218,7 @@ where
{
let inner = match self.dispatch.try_send(req) {
Ok(rx) => {
- Either::Left(rx.then(move |res| {
+ Either::A(rx.then(move |res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
@@ -231,7 +230,7 @@ where
Err(req) => {
debug!("connection was not ready");
let err = ::Error::new_canceled(Some("connection was not ready"));
- Either::Right(future::err((err, Some(req))))
+ Either::B(future::err((err, Some(req))))
}
};
Box::new(inner)
@@ -282,8 +281,8 @@ where
/// upgrade. Once the upgrade is completed, the connection would be "done",
/// but it is not desired to actally shutdown the IO object. Instead you
/// would take it back using `into_parts`.
- pub fn poll_without_shutdown(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
- self.inner.poll_without_shutdown(cx)
+ pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
+ self.inner.poll_without_shutdown()
}
}
@@ -295,8 +294,8 @@ where
type Item = ();
type Error = ::Error;
- fn poll(&mut self, cx: &mut task::Context) -> Poll {
- self.inner.poll(cx)
+ fn poll(&mut self) -> Poll {
+ self.inner.poll()
}
}
@@ -368,8 +367,8 @@ where
type Item = (SendRequest, Connection);
type Error = ::Error;
- fn poll(&mut self, cx: &mut task::Context) -> Poll {
- self.inner.poll(cx)
+ fn poll(&mut self) -> Poll {
+ self.inner.poll()
.map(|async| {
async.map(|(tx, dispatch)| {
(tx, Connection { inner: dispatch })
@@ -399,8 +398,8 @@ where
>);
type Error = ::Error;
- fn poll(&mut self, cx: &mut task::Context) -> Poll {
- self.inner.poll(cx)
+ fn poll(&mut self) -> Poll {
+ self.inner.poll()
}
}
@@ -422,7 +421,7 @@ where
>);
type Error = ::Error;
- fn poll(&mut self, _cx: &mut task::Context) -> Poll {
+ fn poll(&mut self) -> Poll {
let io = self.io.take().expect("polled more than once");
let (tx, rx) = dispatch::channel();
let mut conn = proto::Conn::new(io);
@@ -446,8 +445,8 @@ impl Future for ResponseFuture {
type Error = ::Error;
#[inline]
- fn poll(&mut self, cx: &mut task::Context) -> Poll {
- self.inner.poll(cx)
+ fn poll(&mut self) -> Poll {
+ self.inner.poll()
}
}
diff --git a/src/client/connect.rs b/src/client/connect.rs
index ad5ab7b518..33e636c067 100644
--- a/src/client/connect.rs
+++ b/src/client/connect.rs
@@ -8,21 +8,24 @@
use std::error::Error as StdError;
use std::fmt;
use std::io;
+use std::mem;
use std::net::SocketAddr;
+use std::sync::Arc;
use std::time::Duration;
-use futures::{Future, Never, Poll, Async};
-use futures::executor::{Executor, SpawnError, ThreadPoolBuilder};
-use futures::task;
-use futures::io::{AsyncRead, AsyncWrite};
+use futures::{Future, Poll, Async};
+use futures::future::{Executor, ExecuteError};
+use futures::sync::oneshot;
+use futures_cpupool::{Builder as CpuPoolBuilder};
use http::Uri;
use http::uri::Scheme;
use net2::TcpBuilder;
+use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::Handle;
use tokio::net::{TcpStream, ConnectFuture};
-use executor::CloneBoxedExecutor;
use super::dns;
+use self::http_connector::HttpConnectorBlockingTask;
/// Connect to a destination, returning an IO transport.
///
@@ -171,7 +174,7 @@ impl HttpConnector {
/// Takes number of DNS worker threads.
#[inline]
pub fn new(threads: usize, handle: &Handle) -> HttpConnector {
- let pool = ThreadPoolBuilder::new()
+ let pool = CpuPoolBuilder::new()
.name_prefix("hyper-dns")
.pool_size(threads)
.create();
@@ -183,10 +186,10 @@ impl HttpConnector {
/// Takes an executor to run blocking tasks on.
#[inline]
pub fn new_with_executor(executor: E, handle: &Handle) -> HttpConnector
- where E: Executor + Clone + Send + Sync
+ where E: Executor + Send + Sync
{
HttpConnector {
- executor: HttpConnectExecutor(Box::new(executor)),
+ executor: HttpConnectExecutor(Arc::new(executor)),
enforce_http: true,
handle: handle.clone(),
keep_alive_timeout: None,
@@ -295,7 +298,7 @@ pub struct HttpConnecting {
enum State {
Lazy(HttpConnectExecutor, String, u16),
- Resolving(dns::Resolving),
+ Resolving(oneshot::SpawnHandle),
Connecting(ConnectingTcp),
Error(Option),
}
@@ -304,11 +307,11 @@ impl Future for HttpConnecting {
type Item = (TcpStream, Connected);
type Error = io::Error;
- fn poll(&mut self, cx: &mut task::Context) -> Poll {
+ fn poll(&mut self) -> Poll {
loop {
let state;
match self.state {
- State::Lazy(ref mut executor, ref mut host, port) => {
+ State::Lazy(ref executor, ref mut host, port) => {
// If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
@@ -317,19 +320,24 @@ impl Future for HttpConnecting {
current: None
})
} else {
- let host = ::std::mem::replace(host, String::new());
- state = State::Resolving(dns::Resolving::spawn(host, port, executor));
+ let host = mem::replace(host, String::new());
+ let work = dns::Work::new(host, port);
+ state = State::Resolving(oneshot::spawn(work, executor));
}
},
State::Resolving(ref mut future) => {
- let addrs = try_ready!(future.poll(cx));
- state = State::Connecting(ConnectingTcp {
- addrs: addrs,
- current: None,
- });
+ match try!(future.poll()) {
+ Async::NotReady => return Ok(Async::NotReady),
+ Async::Ready(addrs) => {
+ state = State::Connecting(ConnectingTcp {
+ addrs: addrs,
+ current: None,
+ })
+ }
+ };
},
State::Connecting(ref mut c) => {
- let sock = try_ready!(c.poll(cx, &self.handle));
+ let sock = try_ready!(c.poll(&self.handle));
if let Some(dur) = self.keep_alive_timeout {
sock.set_keepalive(Some(dur))?;
@@ -357,11 +365,11 @@ struct ConnectingTcp {
impl ConnectingTcp {
// not a Future, since passing a &Handle to poll
- fn poll(&mut self, cx: &mut task::Context, handle: &Handle) -> Poll {
+ fn poll(&mut self, handle: &Handle) -> Poll {
let mut err = None;
loop {
if let Some(ref mut current) = self.current {
- match current.poll(cx) {
+ match current.poll() {
Ok(ok) => return Ok(ok),
Err(e) => {
trace!("connect error {:?}", e);
@@ -384,19 +392,37 @@ impl ConnectingTcp {
}
}
-#[derive(Clone)]
-struct HttpConnectExecutor(Box);
-
-impl Executor for HttpConnectExecutor {
- fn spawn(
- &mut self,
- f: Box + 'static + Send>
- ) -> Result<(), SpawnError> {
- self.0.spawn(f)
+// Make this Future unnameable outside of this crate.
+mod http_connector {
+ use super::*;
+ // Blocking task to be executed on a thread pool.
+ pub struct HttpConnectorBlockingTask {
+ pub(super) work: oneshot::Execute
+ }
+
+ impl fmt::Debug for HttpConnectorBlockingTask {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad("HttpConnectorBlockingTask")
+ }
+ }
+
+ impl Future for HttpConnectorBlockingTask {
+ type Item = ();
+ type Error = ();
+
+ fn poll(&mut self) -> Poll<(), ()> {
+ self.work.poll()
+ }
}
+}
+
+#[derive(Clone)]
+struct HttpConnectExecutor(Arc + Send + Sync>);
- fn status(&self) -> Result<(), SpawnError> {
- self.0.status()
+impl Executor> for HttpConnectExecutor {
+ fn execute(&self, future: oneshot::Execute) -> Result<(), ExecuteError>> {
+ self.0.execute(HttpConnectorBlockingTask { work: future })
+ .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work))
}
}
@@ -404,7 +430,7 @@ impl Executor for HttpConnectExecutor {
mod tests {
#![allow(deprecated)]
use std::io;
- use futures::executor::block_on;
+ use futures::Future;
use tokio::runtime::Runtime;
use super::{Connect, Destination, HttpConnector};
@@ -417,7 +443,7 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
- assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
+ assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
#[test]
@@ -429,7 +455,7 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
- assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
+ assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
@@ -442,6 +468,6 @@ mod tests {
};
let connector = HttpConnector::new(1, runtime.handle());
- assert_eq!(block_on(connector.connect(dst)).unwrap_err().kind(), io::ErrorKind::InvalidInput);
+ assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
}
diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs
index 98c591521e..f8240c9678 100644
--- a/src/client/dispatch.rs
+++ b/src/client/dispatch.rs
@@ -1,8 +1,9 @@
-use futures::{Async, Never, Poll, Stream};
-use futures::channel::{mpsc, oneshot};
-use futures::task;
+use futures::{Async, Poll, Stream};
+use futures::sync::{mpsc, oneshot};
use want;
+use common::Never;
+
//pub type Callback = oneshot::Sender)>>;
pub type RetryPromise = oneshot::Receiver)>>;
pub type Promise = oneshot::Receiver>;
@@ -32,15 +33,15 @@ pub struct Sender {
}
impl Sender {
- pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), ::Error> {
- match self.inner.poll_ready(cx) {
+ pub fn poll_ready(&mut self) -> Poll<(), ::Error> {
+ match self.inner.poll_ready() {
Ok(Async::Ready(())) => {
// there's room in the queue, but does the Connection
// want a message yet?
- self.giver.poll_want(cx)
+ self.giver.poll_want()
.map_err(|_| ::Error::Closed)
},
- Ok(Async::Pending) => Ok(Async::Pending),
+ Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => Err(::Error::Closed),
}
}
@@ -78,15 +79,16 @@ impl Stream for Receiver {
type Item = (T, Callback);
type Error = Never;
- fn poll_next(&mut self, cx: &mut task::Context) -> Poll