From ed497bf5e6f1d651e3b30fd42c10245c560aff5b Mon Sep 17 00:00:00 2001 From: Sam Rijs Date: Sat, 23 Sep 2017 15:44:14 +1000 Subject: [PATCH] feat(client): allow custom executors for HttpConnector --- Cargo.toml | 2 +- src/client/connect.rs | 70 ++++++++++++++++++++++++++++++++++++------- src/client/dns.rs | 39 +++++++----------------- 3 files changed, 71 insertions(+), 40 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4d302269c4..176d6ca1bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ include = [ base64 = "0.6" bytes = "0.4.4" futures = "0.1.14" -futures-cpupool = "0.1" +futures-cpupool = "0.1.6" http = { version = "0.1", optional = true } httparse = "1.0" language-tags = "0.2" diff --git a/src/client/connect.rs b/src/client/connect.rs index 5420bdf761..a1214bb820 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -2,9 +2,13 @@ use std::error::Error as StdError; use std::fmt; use std::io; use std::mem; +use std::sync::Arc; //use std::net::SocketAddr; use futures::{Future, Poll, Async}; +use futures::future::{Executor, ExecuteError}; +use futures::sync::oneshot; +use futures_cpupool::{Builder as CpuPoolBuilder}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio::reactor::Handle; use tokio::net::{TcpStream, TcpStreamNew}; @@ -43,22 +47,35 @@ where T: Service + 'static, /// A connector for the `http` scheme. #[derive(Clone)] pub struct HttpConnector { - dns: dns::Dns, + executor: HttpConnectExecutor, enforce_http: bool, handle: Handle, } impl HttpConnector { - /// Construct a new HttpConnector. /// /// Takes number of DNS worker threads. #[inline] pub fn new(threads: usize, handle: &Handle) -> HttpConnector { + let pool = CpuPoolBuilder::new() + .name_prefix("hyper-dns") + .pool_size(threads) + .create(); + HttpConnector::new_with_executor(pool, handle) + } + + /// Construct a new HttpConnector. + /// + /// Takes an executor to run blocking tasks on. + #[inline] + pub fn new_with_executor(executor: E, handle: &Handle) -> HttpConnector + where E: Executor + { HttpConnector { - dns: dns::Dns::new(threads), + executor: HttpConnectExecutor(Arc::new(executor)), enforce_http: true, - handle: handle.clone(), + handle: handle.clone() } } @@ -109,7 +126,7 @@ impl Service for HttpConnector { }; HttpConnecting { - state: State::Lazy(self.dns.clone(), host.into(), port), + state: State::Lazy(self.executor.clone(), host.into(), port), handle: self.handle.clone(), } } @@ -154,8 +171,8 @@ pub struct HttpConnecting { } enum State { - Lazy(dns::Dns, String, u16), - Resolving(dns::Query), + Lazy(HttpConnectExecutor, String, u16), + Resolving(oneshot::SpawnHandle), Connecting(ConnectingTcp), Error(Option), } @@ -168,12 +185,13 @@ impl Future for HttpConnecting { loop { let state; match self.state { - State::Lazy(ref dns, ref mut host, port) => { + State::Lazy(ref executor, ref mut host, port) => { let host = mem::replace(host, String::new()); - state = State::Resolving(dns.resolve(host, port)); + let work = dns::Work::new(host, port); + state = State::Resolving(oneshot::spawn(work, executor)); }, - State::Resolving(ref mut query) => { - match try!(query.poll()) { + State::Resolving(ref mut future) => { + match try!(future.poll()) { Async::NotReady => return Ok(Async::NotReady), Async::Ready(addrs) => { state = State::Connecting(ConnectingTcp { @@ -231,6 +249,36 @@ impl ConnectingTcp { } } +/// Blocking task to be executed on a thread pool. +pub struct HttpConnectorBlockingTask { + work: oneshot::Execute +} + +impl fmt::Debug for HttpConnectorBlockingTask { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.pad("HttpConnectorBlockingTask") + } +} + +impl Future for HttpConnectorBlockingTask { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + self.work.poll() + } +} + +#[derive(Clone)] +struct HttpConnectExecutor(Arc>); + +impl Executor> for HttpConnectExecutor { + fn execute(&self, future: oneshot::Execute) -> Result<(), ExecuteError>> { + self.0.execute(HttpConnectorBlockingTask { work: future }) + .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work)) + } +} + /* impl HttpsConnector { /// Create a new connector using the provided SSL implementation. diff --git a/src/client/dns.rs b/src/client/dns.rs index 1fb6ff9591..cf522801e4 100644 --- a/src/client/dns.rs +++ b/src/client/dns.rs @@ -2,37 +2,27 @@ use std::io; use std::net::{SocketAddr, ToSocketAddrs}; use std::vec; -use ::futures::{Future, Poll}; -use ::futures_cpupool::{CpuPool, CpuFuture, Builder}; +use ::futures::{Async, Future, Poll}; -#[derive(Clone)] -pub struct Dns { - pool: CpuPool, +pub struct Work { + host: String, + port: u16 } -impl Dns { - pub fn new(threads: usize) -> Dns { - Dns { - pool: Builder::new() - .name_prefix("hyper-dns") - .pool_size(threads) - .create() - } - } - - pub fn resolve(&self, host: String, port: u16) -> Query { - Query(self.pool.spawn_fn(move || work(host, port))) +impl Work { + pub fn new(host: String, port: u16) -> Work { + Work { host: host, port: port } } } -pub struct Query(CpuFuture); - -impl Future for Query { +impl Future for Work { type Item = IpAddrs; type Error = io::Error; fn poll(&mut self) -> Poll { - self.0.poll() + debug!("resolve host={:?}, port={:?}", self.host, self.port); + (&*self.host, self.port).to_socket_addrs() + .map(|i| Async::Ready(IpAddrs { iter: i })) } } @@ -47,10 +37,3 @@ impl Iterator for IpAddrs { self.iter.next() } } - -pub type Answer = io::Result; - -fn work(hostname: String, port: u16) -> Answer { - debug!("resolve host={:?}, port={:?}", hostname, port); - (&*hostname, port).to_socket_addrs().map(|i| IpAddrs { iter: i }) -}