From f45e9c8e4fcacc2bd7fed84ef0df6d2fcf8c1134 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 18 Jan 2017 14:09:20 -0800 Subject: [PATCH] refactor(server): expose Http that implements ServerProto The main changes are: * The entry point is how `Http`, the implementation of `ServerProto`. This type has a `new` constructor as well as builder methods to configure it. * A high-level entry point of `Http::bind` was added which returns a `Server`. Binding a protocol to a port requires a socket address (where to bind) as well as the instance of `NewService`. Internally this creates a core and a TCP listener. * The returned `Server` has a few methods to learn about itself, e.g. `local_addr` and `handle`, but mainly has two methods: `run` and `run_until`. * The `Server::run` entry point will execute a server infinitely, never having it exit. * The `Server::run_until` method is intended as a graceful shutdown mechanism. When the provided future resolves the server stops accepting connections immediately and then waits for a fixed period of time for all active connections to get torn down, after which the whole server is torn down anyway. * Finally a `Http::bind_connection` method exists as a low-level entry point to spawning a server connection. This is used by `Server::run` as is intended for external use in other event loops if necessary or otherwise low-level needs. BREAKING CHANGE: `Server` is no longer the pimary entry point. Instead, an `Http` type is created and then either `bind` to receiver a `Server`, or it can be passed to other Tokio things. --- benches/end_to_end.rs | 30 ++- examples/hello.rs | 10 +- examples/server.rs | 13 +- src/lib.rs | 2 +- src/server/mod.rs | 548 +++++++++++++++++++++++++----------------- tests/server.rs | 53 ++-- 6 files changed, 385 insertions(+), 271 deletions(-) diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 6e6a504322..88585cfa54 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -3,13 +3,15 @@ extern crate futures; extern crate hyper; -extern crate tokio_core; extern crate pretty_env_logger; - extern crate test; +extern crate tokio_core; + +use std::net::SocketAddr; use futures::{Future, Stream}; -use tokio_core::reactor::Core; +use tokio_core::reactor::{Core, Handle}; +use tokio_core::net::TcpListener; use hyper::client; use hyper::header::{ContentLength, ContentType}; @@ -22,9 +24,7 @@ fn get_one_at_a_time(b: &mut test::Bencher) { let _ = pretty_env_logger::init(); let mut core = Core::new().unwrap(); let handle = core.handle(); - - let addr = hyper::Server::http(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap() - .handle(|| Ok(Hello), &handle).unwrap(); + let addr = spawn_hello(&handle); let client = hyper::Client::new(&handle); @@ -47,9 +47,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) { let _ = pretty_env_logger::init(); let mut core = Core::new().unwrap(); let handle = core.handle(); - - let addr = hyper::Server::http(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap() - .handle(|| Ok(Hello), &handle).unwrap(); + let addr = spawn_hello(&handle); let client = hyper::Client::new(&handle); @@ -92,3 +90,17 @@ impl Service for Hello { } } + +fn spawn_hello(handle: &Handle) -> SocketAddr { + let addr = "127.0.0.1:0".parse().unwrap(); + let listener = TcpListener::bind(&addr, handle).unwrap(); + let addr = listener.local_addr().unwrap(); + + let handle2 = handle.clone(); + handle.spawn(listener.incoming().for_each(move |(socket, addr)| { + let http = hyper::server::Http::new(); + http.bind_connection(&handle2, socket, addr, Hello); + Ok(()) + }).then(|_| Ok(()))); + return addr +} diff --git a/examples/hello.rs b/examples/hello.rs index a9e45d3cd3..c01eff7d3a 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -5,7 +5,7 @@ extern crate pretty_env_logger; //extern crate num_cpus; use hyper::header::{ContentLength, ContentType}; -use hyper::server::{Server, Service, Request, Response}; +use hyper::server::{Http, Service, Request, Response}; static PHRASE: &'static [u8] = b"Hello World!"; @@ -31,9 +31,7 @@ impl Service for Hello { fn main() { pretty_env_logger::init().unwrap(); let addr = "127.0.0.1:3000".parse().unwrap(); - let _server = Server::standalone(|tokio| { - Server::http(&addr, tokio)? - .handle(|| Ok(Hello), tokio) - }).unwrap(); - println!("Listening on http://{}", addr); + let server = Http::new().bind(&addr, || Ok(Hello)).unwrap(); + println!("Listening on http://{}", server.local_addr().unwrap()); + server.run().unwrap(); } diff --git a/examples/server.rs b/examples/server.rs index a876df36bc..d67163725e 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -7,8 +7,7 @@ extern crate log; use hyper::{Get, Post, StatusCode}; use hyper::header::ContentLength; -use hyper::server::{Server, Service, Request, Response}; - +use hyper::server::{Http, Service, Request, Response}; static INDEX: &'static [u8] = b"Try POST /echo"; @@ -48,10 +47,8 @@ impl Service for Echo { fn main() { pretty_env_logger::init().unwrap(); let addr = "127.0.0.1:1337".parse().unwrap(); - let (listening, server) = Server::standalone(|tokio| { - Server::http(&addr, tokio)? - .handle(|| Ok(Echo), tokio) - }).unwrap(); - println!("Listening on http://{}", listening); - server.run(); + + let server = Http::new().bind(&addr, || Ok(Echo)).unwrap(); + println!("Listening on http://{}", server.local_addr().unwrap()); + server.run().unwrap(); } diff --git a/src/lib.rs b/src/lib.rs index 6cab904516..4e93960562 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,7 +14,7 @@ //! [Server](server/index.html), along with a //! [typed Headers system](header/index.html). -extern crate futures; +#[macro_use] extern crate futures; extern crate futures_cpupool; extern crate httparse; #[macro_use] extern crate language_tags; diff --git a/src/server/mod.rs b/src/server/mod.rs index c2ae861b96..ca69db6461 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,23 +2,26 @@ //! //! A `Server` is created to listen on a port, parse HTTP requests, and hand //! them off to a `Service`. + +use std::cell::RefCell; use std::fmt; use std::io; -use std::net::{SocketAddr, TcpListener as StdTcpListener}; +use std::net::SocketAddr; +use std::rc::{Rc, Weak}; +use std::time::Duration; -use futures::{Future, Map}; -use futures::stream::{Stream}; -use futures::sync::oneshot; +use futures::future; +use futures::task::{self, Task}; +use futures::{Future, Map, Stream, Poll, Async, Sink, StartSend, AsyncSink}; use tokio::io::Io; +use tokio::reactor::{Core, Handle, Timeout}; use tokio::net::TcpListener; -use tokio::reactor::{Core, Handle}; use tokio_proto::BindServer; use tokio_proto::streaming::Message; -use tokio_proto::streaming::pipeline::ServerProto; +use tokio_proto::streaming::pipeline::{Transport, Frame, ServerProto}; pub use tokio_service::{NewService, Service}; -pub use self::accept::Accept; pub use self::request::Request; pub use self::response::Response; @@ -27,219 +30,209 @@ use http; mod request; mod response; -type HttpIncoming = ::tokio::net::Incoming; - -/// A Server that can accept incoming network requests. -#[derive(Debug)] -pub struct Server { - accepter: A, - addr: SocketAddr, +/// An instance of the HTTP protocol, and implementation of tokio-proto's +/// `ServerProto` trait. +/// +/// This structure is used to create instances of `Server` or to spawn off tasks +/// which handle a connection to an HTTP server. Each instance of `Http` can be +/// configured with various protocol-level options such as keepalive. +#[derive(Debug, Clone)] +pub struct Http { keep_alive: bool, - //idle_timeout: Option, - //max_sockets: usize, } -impl Server { - /// Creates a new Server from a Stream of Ios. - /// - /// The addr is the socket address the accepter is listening on. - pub fn new(accepter: A, addr: SocketAddr) -> Server { - Server { - accepter: accepter, - addr: addr, +/// An instance of a server created through `Http::bind`. +/// +/// This server is intended as a convenience for creating a TCP listener on an +/// address and then serving TCP connections accepted with the service provided. +pub struct Server { + protocol: Http, + new_service: S, + core: Core, + listener: TcpListener, + shutdown_timeout: Duration, +} + +impl Http { + /// Creates a new instance of the HTTP protocol, ready to spawn a server or + /// start accepting connections. + pub fn new() -> Http { + Http { keep_alive: true, - //idle_timeout: Some(Duration::from_secs(75)), - //max_sockets: 4096, } } /// Enables or disables HTTP keep-alive. /// /// Default is true. - pub fn keep_alive(mut self, val: bool) -> Server { + pub fn keep_alive(&mut self, val: bool) -> &mut Self { self.keep_alive = val; self } - /* - /// Sets how long an idle connection will be kept before closing. + /// Bind the provided `addr` and return a server ready to handle + /// connections. /// - /// Default is 75 seconds. - pub fn idle_timeout(mut self, val: Option) -> Server { - self.idle_timeout = val; - self + /// This method will bind the `addr` provided with a new TCP listener ready + /// to accept connections. Each connection will be processed with the + /// `new_service` object provided as well, creating a new service per + /// connection. + /// + /// The returned `Server` contains one method, `run`, which is used to + /// actually run the server. + pub fn bind(&self, addr: &SocketAddr, new_service: S) -> ::Result> + where S: NewService + + Send + Sync + 'static, + { + let core = try!(Core::new()); + let handle = core.handle(); + let listener = try!(TcpListener::bind(addr, &handle)); + + Ok(Server { + new_service: new_service, + core: core, + listener: listener, + protocol: self.clone(), + shutdown_timeout: Duration::new(1, 0), + }) } - */ - /* - /// Sets the maximum open sockets for this Server. + /// Use this `Http` instance to create a new server task which handles the + /// connection `io` provided. /// - /// Default is 4096, but most servers can handle much more than this. - pub fn max_sockets(mut self, val: usize) -> Server { - self.max_sockets = val; - self + /// This is the low-level method used to actually spawn handling a TCP + /// connection, typically. The `handle` provided is the event loop on which + /// the server task will be spawned, `io` is the I/O object associated with + /// this connection (data that's read/written), `remote_addr` is the remote + /// peer address of the HTTP client, and `service` defines how HTTP requests + /// will be handled (and mapped to responses). + /// + /// This method is typically not invoked directly but is rather transitively + /// used through the `serve` helper method above. This can be useful, + /// however, when writing mocks or accepting sockets from a non-TCP + /// location. + pub fn bind_connection(&self, + handle: &Handle, + io: I, + remote_addr: SocketAddr, + service: S) + where S: Service + 'static, + I: Io + 'static, + { + self.bind_server(handle, io, HttpService { + inner: service, + remote_addr: remote_addr, + }) } - */ } -impl Server { - /// Creates a new HTTP server config listening on the provided address. - pub fn http(addr: &SocketAddr, handle: &Handle) -> ::Result> { - let listener = try!(StdTcpListener::bind(addr)); - let addr = try!(listener.local_addr()); - let listener = try!(TcpListener::from_listener(listener, &addr, handle)); - Ok(Server::new(listener.incoming(), addr)) - } +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ProtoRequest(http::RequestHead); +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ProtoResponse(ResponseHead); +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ProtoTransport(http::Conn); +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ProtoBindTransport { + inner: future::FutureResult, io::Error>, } +impl ServerProto for Http { + type Request = ProtoRequest; + type RequestBody = http::Chunk; + type Response = ProtoResponse; + type ResponseBody = http::Chunk; + type Error = ::Error; + type Transport = ProtoTransport; + type BindTransport = ProtoBindTransport; -/* -impl Server> { - /// Creates a new server config that will handle `HttpStream`s over SSL. - /// - /// You can use any SSL implementation, as long as it implements `hyper::net::Ssl`. - pub fn https(addr: &SocketAddr, ssl: S) -> ::Result>> { - HttpsListener::new(addr, ssl) - .map(Server::new) - .map_err(From::from) + fn bind_transport(&self, io: T) -> Self::BindTransport { + let ka = if self.keep_alive { + http::KA::Busy + } else { + http::KA::Disabled + }; + ProtoBindTransport { + inner: future::ok(http::Conn::new(io, ka)), + } } } -*/ +impl Sink for ProtoTransport { + type SinkItem = Frame; + type SinkError = io::Error; -impl Server { - /// Binds to a socket and starts handling connections. - pub fn handle(self, factory: H, handle: &Handle) -> ::Result - where H: NewService + 'static { - let binder = HttpServer { - keep_alive: self.keep_alive, + fn start_send(&mut self, item: Self::SinkItem) + -> StartSend { + let item = match item { + Frame::Message { message, body } => { + Frame::Message { message: message.0, body: body } + } + Frame::Body { chunk } => Frame::Body { chunk: chunk }, + Frame::Error { error } => Frame::Error { error: error }, }; - let inner_handle = handle.clone(); - handle.spawn(self.accepter.accept().for_each(move |(socket, remote_addr)| { - let service = HttpService { - inner: try!(factory.new_service()), - remote_addr: remote_addr, - }; - binder.bind_server(&inner_handle, socket, service); - Ok(()) - }).map_err(|e| { - error!("listener io error: {:?}", e); - () - })); - - Ok(self.addr) - } -} - -impl Server<()> { - /// Create a server that owns its event loop. - /// - /// The returned `ServerLoop` can be used to run the loop forever in the - /// thread. The returned `Listening` can be sent to another thread, and - /// used to shutdown the `ServerLoop`. - pub fn standalone(closure: F) -> ::Result<(Listening, ServerLoop)> - where F: FnOnce(&Handle) -> ::Result { - let core = try!(Core::new()); - let handle = core.handle(); - let addr = try!(closure(&handle)); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - Ok(( - Listening { - addr: addr, - shutdown: shutdown_tx, - }, - ServerLoop { - inner: Some((core, shutdown_rx)), + match try!(self.0.start_send(item)) { + AsyncSink::Ready => Ok(AsyncSink::Ready), + AsyncSink::NotReady(Frame::Message { message, body }) => { + Ok(AsyncSink::NotReady(Frame::Message { + message: ProtoResponse(message), + body: body, + })) } - )) - - } -} - -/// A configured `Server` ready to run. -pub struct ServerLoop { - inner: Option<(Core, oneshot::Receiver<()>)>, -} - -impl fmt::Debug for ServerLoop { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("ServerLoop") - } -} - -impl ServerLoop { - /// Runs the server forever in this loop. - /// - /// This will block the current thread. - pub fn run(self) { - // drop will take care of it. - trace!("ServerLoop::run()"); + AsyncSink::NotReady(Frame::Body { chunk }) => { + Ok(AsyncSink::NotReady(Frame::Body { chunk: chunk })) + } + AsyncSink::NotReady(Frame::Error { error }) => { + Ok(AsyncSink::NotReady(Frame::Error { error: error })) + } + } } -} -impl Drop for ServerLoop { - fn drop(&mut self) { - self.inner.take().map(|(mut loop_, shutdown)| { - debug!("ServerLoop::drop running"); - let _ = loop_.run(shutdown.or_else(|_dropped| ::futures::future::empty::<(), oneshot::Canceled>())); - debug!("Server closed"); - }); + fn poll_complete(&mut self) -> Poll<(), io::Error> { + self.0.poll_complete() } } -/// A handle of the running server. -pub struct Listening { - addr: SocketAddr, - shutdown: ::futures::sync::oneshot::Sender<()>, -} +impl Stream for ProtoTransport { + type Item = Frame; + type Error = io::Error; -impl fmt::Debug for Listening { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Listening") - .field("addr", &self.addr) - .finish() - } -} - -impl fmt::Display for Listening { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Display::fmt(&self.addr, f) + fn poll(&mut self) -> Poll, io::Error> { + let item = match try_ready!(self.0.poll()) { + Some(item) => item, + None => return Ok(None.into()), + }; + let item = match item { + Frame::Message { message, body } => { + Frame::Message { message: ProtoRequest(message), body: body } + } + Frame::Body { chunk } => Frame::Body { chunk: chunk }, + Frame::Error { error } => Frame::Error { error: error }, + }; + Ok(Some(item).into()) } } -impl Listening { - /// The addresses this server is listening on. - pub fn addr(&self) -> &SocketAddr { - &self.addr +impl Transport for ProtoTransport { + fn tick(&mut self) { + self.0.tick() } - /// Stop the server from listening to its socket address. - pub fn close(self) { - debug!("closing server {}", self); - self.shutdown.complete(()); + fn cancel(&mut self) -> io::Result<()> { + self.0.cancel() } } -struct HttpServer { - keep_alive: bool, -} +impl Future for ProtoBindTransport { + type Item = ProtoTransport; + type Error = io::Error; -impl ServerProto for HttpServer { - type Request = http::RequestHead; - type RequestBody = http::Chunk; - type Response = ResponseHead; - type ResponseBody = http::Chunk; - type Error = ::Error; - type Transport = http::Conn; - type BindTransport = io::Result>; - - fn bind_transport(&self, io: T) -> Self::BindTransport { - let ka = if self.keep_alive { - http::KA::Busy - } else { - http::KA::Disabled - }; - Ok(http::Conn::new(io, ka)) + fn poll(&mut self) -> Poll, io::Error> { + self.inner.poll().map(|a| a.map(ProtoTransport)) } } @@ -248,12 +241,12 @@ struct HttpService { remote_addr: SocketAddr, } -fn map_response_to_message(res: Response) -> Message { +fn map_response_to_message(res: Response) -> Message { let (head, body) = response::split(res); if let Some(body) = body { - Message::WithBody(head, body.into()) + Message::WithBody(ProtoResponse(head), body.into()) } else { - Message::WithoutBody(head) + Message::WithoutBody(ProtoResponse(head)) } } @@ -262,69 +255,184 @@ type ResponseHead = http::MessageHead<::StatusCode>; impl Service for HttpService where T: Service, { - type Request = Message; - type Response = Message; + type Request = Message; + type Response = Message; type Error = ::Error; - type Future = Map Message>; + type Future = Map Message>; fn call(&self, message: Self::Request) -> Self::Future { let (head, body) = match message { - Message::WithoutBody(head) => (head, http::Body::empty()), - Message::WithBody(head, body) => (head, body.into()), + Message::WithoutBody(head) => (head.0, http::Body::empty()), + Message::WithBody(head, body) => (head.0, body.into()), }; let req = request::new(self.remote_addr, head, body); self.inner.call(req).map(map_response_to_message) } } -//private so the `Acceptor` type can stay internal -mod accept { - use std::io; - use std::net::SocketAddr; - use futures::{Stream, Poll}; - use tokio::io::Io; +impl Server + where S: NewService + + Send + Sync + 'static, +{ + /// Returns the local address that this server is bound to. + pub fn local_addr(&self) -> ::Result { + Ok(try!(self.listener.local_addr())) + } + + /// Returns a handle to the underlying event loop that this server will be + /// running on. + pub fn handle(&self) -> Handle { + self.core.handle() + } - /// An Acceptor is an incoming Stream of Io. + /// Configure the amount of time this server will wait for a "graceful + /// shutdown". /// - /// This trait is not implemented directly, and only exists to make the - /// intent clearer. A `Stream` - /// should be implemented instead. - pub trait Accept: Stream { - #[doc(hidden)] - type Output: Io + 'static; - #[doc(hidden)] - type Stream: Stream + 'static; - - #[doc(hidden)] - fn accept(self) -> Accepter - where Self: Sized; + /// This is the amount of time after the shutdown signal is received the + /// server will wait for all pending connections to finish. If the timeout + /// elapses then the server will be forcibly shut down. + /// + /// This defaults to 1s. + pub fn shutdown_timeout(&mut self, timeout: Duration) -> &mut Self { + self.shutdown_timeout = timeout; + self } - #[allow(missing_debug_implementations)] - pub struct Accepter + 'static, I: Io + 'static>(T, ::std::marker::PhantomData); + /// Execute this server infinitely. + /// + /// This method does not currently return, but it will return an error if + /// one occurs. + pub fn run(self) -> ::Result<()> { + self.run_until(future::empty()) + } - impl Stream for Accepter - where T: Stream, - I: Io + 'static, + /// Execute this server until the given future, `shutdown_signal`, resolves. + /// + /// This method, like `run` above, is used to execute this HTTP server. The + /// difference with `run`, however, is that this method allows for shutdown + /// in a graceful fashion. The future provided is interpreted as a signal to + /// shut down the server when it resolves. + /// + /// This method will block the current thread executing the HTTP server. + /// When the `shutdown_signal` has resolved then the TCP listener will be + /// unbound (dropped). The thread will continue to block for a maximum of + /// `shutdown_timeout` time waiting for active connections to shut down. + /// Once the `shutdown_timeout` elapses or all active connections are + /// cleaned out then this method will return. + pub fn run_until(self, shutdown_signal: F) -> ::Result<()> + where F: Future, { - type Item = T::Item; - type Error = io::Error; + let Server { protocol, new_service, mut core, listener, shutdown_timeout } = self; + let handle = core.handle(); - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - self.0.poll() + // Mini future to track the number of active services + let info = Rc::new(RefCell::new(Info { + active: 0, + blocker: None, + })); + + // Future for our server's execution + let srv = listener.incoming().for_each(|(socket, addr)| { + let s = NotifyService { + inner: try!(new_service.new_service()), + info: Rc::downgrade(&info), + }; + info.borrow_mut().active += 1; + protocol.bind_connection(&handle, socket, addr, s); + Ok(()) + }); + + // Main execution of the server. Here we use `select` to wait for either + // `incoming` or `f` to resolve. We know that `incoming` will never + // resolve with a success (it's infinite) so we're actually just waiting + // for an error or for `f`, our shutdown signal. + // + // When we get a shutdown signal (`Ok`) then we drop the TCP listener to + // stop accepting incoming connections. + match core.run(shutdown_signal.select(srv.map_err(|e| e.into()))) { + Ok(((), _incoming)) => {} + Err((e, _other)) => return Err(e), + } + + // Ok we've stopped accepting new connections at this point, but we want + // to give existing connections a chance to clear themselves out. Wait + // at most `shutdown_timeout` time before we just return clearing + // everything out. + // + // Our custom `WaitUntilZero` will resolve once all services constructed + // here have been destroyed. + let timeout = try!(Timeout::new(shutdown_timeout, &handle)); + let wait = WaitUntilZero { info: info.clone() }; + match core.run(wait.select(timeout)) { + Ok(_) => Ok(()), + Err((e, _)) => return Err(e.into()) } } +} - impl Accept for T - where T: Stream + 'static, - I: Io + 'static, - { - type Output = I; - type Stream = T; +impl fmt::Debug for Server { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Server") + .field("core", &"...") + .field("listener", &self.listener) + .field("new_service", &self.new_service) + .field("protocol", &self.protocol) + .finish() + } +} + +struct NotifyService { + inner: S, + info: Weak>, +} + +struct WaitUntilZero { + info: Rc>, +} + +struct Info { + active: usize, + blocker: Option, +} - fn accept(self) -> Accepter { - Accepter(self, ::std::marker::PhantomData) +impl Service for NotifyService { + type Request = S::Request; + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn call(&self, message: Self::Request) -> Self::Future { + self.inner.call(message) + } +} + +impl Drop for NotifyService { + fn drop(&mut self) { + let info = match self.info.upgrade() { + Some(info) => info, + None => return, + }; + let mut info = info.borrow_mut(); + info.active -= 1; + if info.active == 0 { + if let Some(task) = info.blocker.take() { + task.unpark(); + } + } + } +} + +impl Future for WaitUntilZero { + type Item = (); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(), io::Error> { + let mut info = self.info.borrow_mut(); + if info.active == 0 { + Ok(().into()) + } else { + info.blocker = Some(task::park()); + Ok(Async::NotReady) } } } diff --git a/tests/server.rs b/tests/server.rs index d38b9b4b38..ad05f97519 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -4,27 +4,29 @@ extern crate futures; extern crate spmc; extern crate pretty_env_logger; -use futures::Future; -use futures::stream::Stream; +use futures::{Future, Stream}; +use futures::sync::oneshot; use std::net::{TcpStream, SocketAddr}; use std::io::{Read, Write}; use std::sync::mpsc; +use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; -use hyper::server::{Server, Request, Response, Service, NewService}; +use hyper::server::{Http, Request, Response, Service, NewService}; struct Serve { - listening: Option, + addr: SocketAddr, msg_rx: mpsc::Receiver, reply_tx: spmc::Sender, - spawn_rx: mpsc::Receiver<()>, + shutdown_signal: Option>, + thread: Option>, } impl Serve { fn addr(&self) -> &SocketAddr { - self.listening.as_ref().unwrap().addr() + &self.addr } fn body(&self) -> Vec { @@ -66,14 +68,14 @@ impl<'a> ReplyBuilder<'a> { impl Drop for Serve { fn drop(&mut self) { - self.listening.take().unwrap().close(); - self.spawn_rx.recv().expect("server thread should shutdown cleanly"); + drop(self.shutdown_signal.take()); + self.thread.take().unwrap().join().unwrap(); } } #[derive(Clone)] struct TestService { - tx: mpsc::Sender, + tx: Arc>>, reply: spmc::Receiver, _timeout: Option, } @@ -94,7 +96,7 @@ impl NewService for TestService { type Request = Request; type Response = Response; type Error = hyper::Error; - + type Instance = TestService; fn new_service(&self) -> std::io::Result { @@ -113,7 +115,7 @@ impl Service for TestService { let tx = self.tx.clone(); let replies = self.reply.clone(); req.body().for_each(move |chunk| { - tx.send(Msg::Chunk(chunk.to_vec())).unwrap(); + tx.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap(); Ok(()) }).map(move |_| { let mut res = Response::new(); @@ -150,35 +152,32 @@ fn serve() -> Serve { fn serve_with_timeout(dur: Option) -> Serve { let _ = pretty_env_logger::init(); - let (thread_tx, thread_rx) = mpsc::channel(); - let (spawn_tx, spawn_rx) = mpsc::channel(); + let (addr_tx, addr_rx) = mpsc::channel(); let (msg_tx, msg_rx) = mpsc::channel(); let (reply_tx, reply_rx) = spmc::channel(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); let addr = "127.0.0.1:0".parse().unwrap(); let thread_name = format!("test-server-{:?}", dur); - thread::Builder::new().name(thread_name).spawn(move || { - let (listening, server) = Server::standalone(move |tokio| { - Server::http(&addr, tokio).unwrap() - .handle(TestService { - tx: msg_tx.clone(), - _timeout: dur, - reply: reply_rx, - }, tokio) + let thread = thread::Builder::new().name(thread_name).spawn(move || { + let srv = Http::new().bind(&addr, TestService { + tx: Arc::new(Mutex::new(msg_tx.clone())), + _timeout: dur, + reply: reply_rx, }).unwrap(); - thread_tx.send(listening).unwrap(); - server.run(); - spawn_tx.send(()).unwrap(); + addr_tx.send(srv.local_addr().unwrap()).unwrap(); + srv.run_until(shutdown_rx.then(|_| Ok(()))).unwrap(); }).unwrap(); - let listening = thread_rx.recv().unwrap(); + let addr = addr_rx.recv().unwrap(); Serve { - listening: Some(listening), msg_rx: msg_rx, reply_tx: reply_tx, - spawn_rx: spawn_rx, + addr: addr, + shutdown_signal: Some(shutdown_tx), + thread: Some(thread), } }