From d9cca330cb27a2046b2175561b57cf2b2341d0fd Mon Sep 17 00:00:00 2001 From: Corentin Henry Date: Wed, 24 Jan 2018 19:17:35 -0800 Subject: [PATCH] provide a sync api This is an attempt to provide a sync api, which makes much more sense for expect. The idea is to run the event loop in a separate thread so that we can call `Future::wait()` in the client handle without preventing the event loop from making progress. Note that this currently rely on https://github.com/rust-lang/rust/pull/47760 Fwiw, running our own even loop is not considered good practice, but it the approach reqwest has taken too, so I guess it's not that bad. --- src/lib.rs | 91 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 67 insertions(+), 24 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 07d2590..5be6760 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ use std::fmt; use std::io::{self, Read, Write}; use std::collections::VecDeque; use std::process::Command; +use std::thread; use std::time::Duration; use futures::{Async, Canceled, Future, Poll, Stream}; @@ -79,6 +80,8 @@ pub struct Session { /// FIFO storage for the matching requests. Requests are processed one after another, not /// concurrently. match_requests: VecDeque, + + drop_rx: oneshot::Receiver<()>, } #[derive(Debug)] @@ -140,40 +143,48 @@ impl fmt::Display for MatchError { type MatchOutcome = Result<(usize, Vec), MatchError>; -#[derive(Clone)] pub struct Handle { match_requests_tx: mpsc::UnboundedSender, input_requests_tx: mpsc::UnboundedSender, + thread: Option>, + drop_tx: Option>, } impl Handle { - pub fn send(&self, bytes: Vec) -> Box> { + pub fn send(&self, bytes: Vec) -> () { let handle = self.clone(); let (response_tx, response_rx) = oneshot::channel::<()>(); handle .input_requests_tx .unbounded_send(InputRequest(bytes, response_tx)) .unwrap(); - Box::new(response_rx) + response_rx.wait().unwrap() } pub fn expect( &mut self, matches: Vec, timeout: Option, - ) -> Box> { + ) -> Result<(usize, std::vec::Vec), MatchError> { let (response_tx, response_rx) = oneshot::channel::(); let request = MatchRequest { matches, response_tx, timeout, }; - let handle = self.clone(); - handle.match_requests_tx.unbounded_send(request).unwrap(); - Box::new(response_rx.map_err(|_| ())) + self.match_requests_tx.unbounded_send(request).unwrap(); + response_rx.wait().unwrap() + } +} + +impl Drop for Handle { + fn drop(&mut self) { + self.drop_tx.take().unwrap().send(()).unwrap(); + self.thread.take().unwrap().join(); } } + // TODO: // // - Make stdin evented PollEvented? @@ -184,25 +195,46 @@ impl Handle { // precedence over the MIN and TIME settings. impl Session { - pub fn spawn(cmd: Command, handle: &TokioHandle) -> Result { + pub fn spawn(cmd: Command) -> Result { debug!("spawning new command {:?}", cmd); let (input_tx, input_rx) = mpsc::unbounded::(); let (match_tx, match_rx) = mpsc::unbounded::(); - let mut pty = Pty::new::<::std::fs::File>(None, handle).unwrap(); - let mut _child = pty.spawn(cmd).unwrap(); - let session = Session { - pty: pty, - handle: handle.clone(), - buffer: Vec::new(), - input_requests_rx: input_rx, - match_requests_rx: match_rx, - input_requests: VecDeque::new(), - match_requests: VecDeque::new(), - }; - handle.spawn(session); + let (drop_tx, drop_rx) = oneshot::channel::<()>(); + + // spawn the core future in a separate thread. + // + // it's bad practice to spawn our own core but otherwise, it's complicated to provide a + // synchronous client, since calling `wait()` blocks the current thread, preventing the + // event loop from making progress... But running the event loop in a separate thread, we + // can call `wait()` in the client. + let thread = thread::Builder::new() + .name("expect-internal-core".into()) + .spawn(move || { + use tokio_core::reactor::Core; + let mut core = Core::new().unwrap(); + + let mut pty = Pty::new::<::std::fs::File>(None, &core.handle()).unwrap(); + // FIXME: I guess we should do something with the child? + let _child = pty.spawn(cmd).unwrap(); + + let session = Session { + pty: pty, + handle: core.handle(), + buffer: Vec::new(), + input_requests_rx: input_rx, + match_requests_rx: match_rx, + input_requests: VecDeque::new(), + match_requests: VecDeque::new(), + drop_rx: drop_rx, + }; + core.run(session); + }) + .unwrap(); Ok(Handle { match_requests_tx: match_tx.clone(), input_requests_tx: input_tx.clone(), + thread: Some(thread), + drop_tx: Some(drop_tx), }) } @@ -216,7 +248,7 @@ impl Session { if size == req.0.len() { return Ok(Async::Ready(())); } - // FIXME: do we need to check if we wrote 0 bytes to avoid infinite looping? + // FIXME: do we need to check if we wrote 0 bytes to avoid infinite loops? continue; } Err(e) => { @@ -459,10 +491,21 @@ impl Future for Session { type Error = (); fn poll(&mut self) -> Poll { - self.get_input_requests().unwrap(); - self.get_match_requests().unwrap(); + if let Err(_e) = self.get_input_requests() { + return Err(()); + } + if let Err(_e) = self.get_match_requests() { + return Err(()); + } self.process_input(); - self.process_matches().unwrap(); + if let Err(_e) = self.process_matches() { + return Err(()); + } + match self.drop_rx.poll() { + Ok(Async::Ready(())) => return Ok(Async::Ready(())), + Ok(Async::NotReady) => {}, + Err(Canceled) => return Err(()), + } Ok(Async::NotReady) } }