From 36e4846d55f94a5aac77612985fd768b0d3882b9 Mon Sep 17 00:00:00 2001 From: Tim Kuehn Date: Sun, 14 Oct 2018 21:17:43 +0100 Subject: [PATCH] Make tests compile and use current_thread in tests. --- Cargo.toml | 3 ++ bincode-transport/src/lib.rs | 6 ++-- rpc/src/util/deadline_compat.rs | 7 ++--- tarpc/Cargo.toml | 1 + tarpc/examples/pubsub.rs | 34 +++++++++----------- tarpc/examples/readme.rs | 8 ++--- tarpc/examples/server_calling_server.rs | 12 +++---- tarpc/src/lib.rs | 14 ++++----- tarpc/src/macros.rs | 42 +++++++++++++++---------- tarpc/tests/latency.rs | 12 ++++--- 10 files changed, 73 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3ab4bb2b..005ff334 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,3 +8,6 @@ members = [ "tarpc", "plugins", ] + +[patch."https://github.com/rust-lang-nursery/futures-rs"] +futures-preview = { git = "https://github.com/andreasots/futures-rs", branch = "compat-stack-overflow", features = ["compat", "tokio-compat"] } diff --git a/bincode-transport/src/lib.rs b/bincode-transport/src/lib.rs index 48b5c53e..5abeca84 100644 --- a/bincode-transport/src/lib.rs +++ b/bincode-transport/src/lib.rs @@ -22,7 +22,7 @@ use bytes::{Bytes, BytesMut}; use crate::vendored::tokio_serde_bincode::{IoErrorWrapper, ReadBincode, WriteBincode}; use futures::{ Poll, - compat::{Compat, Future01CompatExt, Stream01CompatExt}, + compat::{Compat01As03, Future01CompatExt, Stream01CompatExt}, prelude::*, ready, task, }; @@ -100,13 +100,13 @@ where /// A [`TcpListener`] that wraps connections in bincode transports. #[derive(Debug)] pub struct Incoming { - incoming: Compat, + incoming: Compat01As03, local_addr: SocketAddr, ghost: PhantomData<(Item, SinkItem)>, } impl Incoming { - unsafe_pinned!(incoming: Compat); + unsafe_pinned!(incoming: Compat01As03); /// Returns the address being listened on. pub fn local_addr(&self) -> SocketAddr { diff --git a/rpc/src/util/deadline_compat.rs b/rpc/src/util/deadline_compat.rs index c81f6099..3a73bfd1 100644 --- a/rpc/src/util/deadline_compat.rs +++ b/rpc/src/util/deadline_compat.rs @@ -1,9 +1,8 @@ use futures::{ - compat::{Compat, Future01CompatExt}, + compat::{Compat01As03, Future01CompatExt}, prelude::*, ready, task::{Poll, LocalWaker}, }; -use log::trace; use pin_utils::unsafe_pinned; use std::pin::Pin; use std::time::Instant; @@ -13,12 +12,12 @@ use tokio_timer::{timeout, Delay}; #[derive(Debug)] pub struct Deadline { future: T, - delay: Compat, + delay: Compat01As03, } impl Deadline { unsafe_pinned!(future: T); - unsafe_pinned!(delay: Compat); + unsafe_pinned!(delay: Compat01As03); /// Create a new `Deadline` that completes when `future` completes or when /// `deadline` is reached. diff --git a/tarpc/Cargo.toml b/tarpc/Cargo.toml index cd489881..cc9d21b6 100644 --- a/tarpc/Cargo.toml +++ b/tarpc/Cargo.toml @@ -36,3 +36,4 @@ futures-preview = { git = "https://github.com/rust-lang-nursery/futures-rs", fea bincode-transport = { path = "../bincode-transport" } env_logger = "0.5" tokio = "0.1" +tokio-executor = "0.1" diff --git a/tarpc/examples/pubsub.rs b/tarpc/examples/pubsub.rs index f3af6108..40824333 100644 --- a/tarpc/examples/pubsub.rs +++ b/tarpc/examples/pubsub.rs @@ -13,10 +13,9 @@ #![plugin(tarpc_plugins)] use futures::{ - compat::TokioDefaultSpawner, future::{self, Ready}, prelude::*, - spawn, Future, + Future, }; use rpc::{ client, context, @@ -65,17 +64,15 @@ impl Subscriber { async fn listen(id: u32, config: server::Config) -> io::Result { let incoming = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?; let addr = incoming.local_addr(); - spawn!( + tokio_executor::spawn( Server::new(config) .incoming(incoming) .take(1) .respond_with(subscriber::serve(Subscriber { id })) - ).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("Could not spawn server: {:?}", e), - ) - })?; + .unit_error() + .boxed() + .compat() + ); Ok(addr) } } @@ -119,7 +116,7 @@ impl publisher::Service for Publisher { addr: SocketAddr, ) -> io::Result<()> { let conn = await!(bincode_transport::connect(&addr))?; - let subscriber = await!(subscriber::new_stub(client::Config::default(), conn)); + let subscriber = await!(subscriber::new_stub(client::Config::default(), conn))?; println!("Subscribing {}.", id); clients.lock().unwrap().insert(id, subscriber); Ok(()) @@ -147,17 +144,15 @@ async fn run() -> io::Result<()> { env_logger::init(); let transport = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?; let publisher_addr = transport.local_addr(); - spawn!( + tokio_executor::spawn( Server::new(server::Config::default()) .incoming(transport) .take(1) .respond_with(publisher::serve(Publisher::new())) - ).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("Could not spawn server: {:?}", e), - ) - })?; + .unit_error() + .boxed() + .compat() + ); let subscriber1 = await!(Subscriber::listen(0, server::Config::default()))?; let subscriber2 = await!(Subscriber::listen(1, server::Config::default()))?; @@ -167,7 +162,7 @@ async fn run() -> io::Result<()> { let mut publisher = await!(publisher::new_stub( client::Config::default(), publisher_conn - )); + ))?; if let Err(e) = await!(publisher.subscribe(context::current(), 0, subscriber1))? { eprintln!("Couldn't subscribe subscriber 0: {}", e); @@ -188,7 +183,8 @@ fn main() { run() .boxed() .map_err(|e| panic!(e)) - .compat(TokioDefaultSpawner), + .boxed() + .compat(), ); thread::sleep(Duration::from_millis(100)); } diff --git a/tarpc/examples/readme.rs b/tarpc/examples/readme.rs index b7ed2c32..ae268576 100644 --- a/tarpc/examples/readme.rs +++ b/tarpc/examples/readme.rs @@ -15,10 +15,8 @@ #![plugin(tarpc_plugins)] use futures::{ - compat::TokioDefaultSpawner, future::{self, Ready}, prelude::*, - spawn, }; use rpc::{ client, context, @@ -64,14 +62,14 @@ async fn run() -> io::Result<()> { // the generated Service trait. .respond_with(serve(HelloServer)); - spawn!(server).unwrap(); + tokio_executor::spawn(server.unit_error().boxed().compat()); let transport = await!(bincode_transport::connect(&addr))?; // new_stub is generated by the tarpc::service! macro. Like Server, it takes a config and any // Transport as input, and returns a Client, also generated by the macro. // by the service mcro. - let mut client = await!(new_stub(client::Config::default(), transport)); + let mut client = await!(new_stub(client::Config::default(), transport))?; // The client has an RPC method for each RPC defined in tarpc::service!. It takes the same args // as defined, with the addition of a Context, which is always the first arg. The Context @@ -88,6 +86,6 @@ fn main() { run() .map_err(|e| eprintln!("Oh no: {}", e)) .boxed() - .compat(TokioDefaultSpawner), + .compat(), ); } diff --git a/tarpc/examples/server_calling_server.rs b/tarpc/examples/server_calling_server.rs index be3316b9..6fffc676 100644 --- a/tarpc/examples/server_calling_server.rs +++ b/tarpc/examples/server_calling_server.rs @@ -14,10 +14,8 @@ use crate::{add::Service as AddService, double::Service as DoubleService}; use futures::{ - compat::TokioDefaultSpawner, future::{self, Ready}, prelude::*, - spawn, }; use rpc::{ client, context, @@ -75,10 +73,10 @@ async fn run() -> io::Result<()> { .incoming(add_listener) .take(1) .respond_with(add::serve(AddServer)); - spawn!(add_server); + tokio_executor::spawn(add_server.unit_error().boxed().compat()); let to_add_server = await!(bincode_transport::connect(&addr))?; - let add_client = await!(add::new_stub(client::Config::default(), to_add_server)); + let add_client = await!(add::new_stub(client::Config::default(), to_add_server))?; let double_listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?; let addr = double_listener.local_addr(); @@ -86,13 +84,13 @@ async fn run() -> io::Result<()> { .incoming(double_listener) .take(1) .respond_with(double::serve(DoubleServer { add_client })); - spawn!(double_server); + tokio_executor::spawn(double_server.unit_error().boxed().compat()); let to_double_server = await!(bincode_transport::connect(&addr))?; let mut double_client = await!(double::new_stub( client::Config::default(), to_double_server - )); + ))?; for i in 1..=5 { println!("{:?}", await!(double_client.double(context::current(), i))?); @@ -106,6 +104,6 @@ fn main() { run() .map_err(|e| panic!(e)) .boxed() - .compat(TokioDefaultSpawner), + .compat(), ); } diff --git a/tarpc/src/lib.rs b/tarpc/src/lib.rs index 6bb906e4..b631d81a 100644 --- a/tarpc/src/lib.rs +++ b/tarpc/src/lib.rs @@ -32,13 +32,13 @@ //! #![feature(plugin, futures_api, pin, arbitrary_self_types, await_macro, async_await, existential_type)] //! #![plugin(tarpc_plugins)] //! +//! //! use futures::{ //! compat::TokioDefaultSpawner, //! future::{self, Ready}, //! prelude::*, -//! spawn, //! }; -//! use tarpc::rpc::{ +//! use tarpc::{ //! client, context, //! server::{self, Handler, Server}, //! }; @@ -83,14 +83,14 @@ //! // the generated Service trait. //! .respond_with(serve(HelloServer)); //! -//! spawn!(server).unwrap(); +//! tokio_executor::spawn(server.unit_error().boxed().compat()); //! //! let transport = await!(bincode_transport::connect(&addr))?; //! //! // new_stub is generated by the service! macro. Like Server, it takes a config and any //! // Transport as input, and returns a Client, also generated by the macro. //! // by the service mcro. -//! let mut client = await!(new_stub(client::Config::default(), transport)); +//! let mut client = await!(new_stub(client::Config::default(), transport))?; //! //! // The client has an RPC method for each RPC defined in service!. It takes the same args //! // as defined, with the addition of a Context, which is always the first arg. The Context @@ -103,10 +103,11 @@ //! } //! //! fn main() { +//! tarpc::init(TokioDefaultSpawner); //! tokio::run(run() //! .map_err(|e| eprintln!("Oh no: {}", e)) //! .boxed() -//! .compat(TokioDefaultSpawner), +//! .compat(), //! ); //! } //! ``` @@ -125,8 +126,7 @@ #[doc(hidden)] pub use futures; -#[doc(hidden)] -pub use rpc; +pub use rpc::*; #[cfg(feature = "serde")] #[doc(hidden)] pub use serde; diff --git a/tarpc/src/macros.rs b/tarpc/src/macros.rs index ae2f461e..a64aa6fe 100644 --- a/tarpc/src/macros.rs +++ b/tarpc/src/macros.rs @@ -145,7 +145,7 @@ macro_rules! service { } $(#[$attr])* - fn $fn_name(&self, ctx: $crate::rpc::context::Context, $($arg:$in_),*) -> ty_snake_to_camel!(Self::$fn_name); + fn $fn_name(&self, ctx: $crate::context::Context, $($arg:$in_),*) -> ty_snake_to_camel!(Self::$fn_name); )* } @@ -153,7 +153,7 @@ macro_rules! service { /// Returns a serving function to use with rpc::server::Server. pub fn serve(service: S) - -> impl FnMut($crate::rpc::context::Context, Request__) -> Resp + Send + 'static + Clone { + -> impl FnMut($crate::context::Context, Request__) -> Resp + Send + 'static + Clone { move |ctx, req| { let mut service = service.clone(); async move { @@ -173,24 +173,24 @@ macro_rules! service { #[allow(unused)] #[derive(Clone, Debug)] /// The client stub that makes RPC calls to the server. Exposes a Future interface. - pub struct Client($crate::rpc::client::Client); + pub struct Client($crate::client::Client); /// Returns a new client stub that sends requests over the given transport. - pub async fn new_stub(config: $crate::rpc::client::Config, transport: T) + pub async fn new_stub(config: $crate::client::Config, transport: T) -> ::std::io::Result where - T: $crate::rpc::Transport< - Item = $crate::rpc::Response, - SinkItem = $crate::rpc::ClientMessage> + Send, + T: $crate::Transport< + Item = $crate::Response, + SinkItem = $crate::ClientMessage> + Send, { - Ok(Client(await!($crate::rpc::client::Client::new(config, transport))?)) + Ok(Client(await!($crate::client::Client::new(config, transport))?)) } impl Client { $( #[allow(unused)] $(#[$attr])* - pub fn $fn_name(&mut self, ctx: $crate::rpc::context::Context, $($arg: $in_),*) + pub fn $fn_name(&mut self, ctx: $crate::context::Context, $($arg: $in_),*) -> impl ::std::future::Future> + '_ { let request__ = Request__::$fn_name { $($arg,)* }; let resp = self.0.call(ctx, request__); @@ -236,7 +236,6 @@ mod functional_test { compat::TokioDefaultSpawner, future::{ready, Ready}, prelude::*, - spawn, }; use rpc::{ client, context, @@ -244,6 +243,7 @@ mod functional_test { transport::channel, }; use std::io; + use tokio::runtime::current_thread; service! { rpc add(x: i32, y: i32) -> i32; @@ -270,16 +270,20 @@ mod functional_test { #[test] fn sequential() { let _ = env_logger::try_init(); + rpc::init(TokioDefaultSpawner); let test = async { let (tx, rx) = channel::unbounded(); - spawn!( + tokio_executor::spawn( rpc::Server::new(server::Config::default()) .incoming(stream::once(ready(Ok(rx)))) .respond_with(serve(Server)) + .unit_error() + .boxed() + .compat() ); - let mut client = await!(new_stub(client::Config::default(), tx)); + let mut client = await!(new_stub(client::Config::default(), tx))?; assert_eq!(3, await!(client.add(context::current(), 1, 2))?); assert_eq!( "Hey, Tim.", @@ -289,22 +293,26 @@ mod functional_test { } .map_err(|e| panic!(e.to_string())); - tokio::run(test.boxed().compat(TokioDefaultSpawner)); + current_thread::block_on_all(test.boxed().compat()).unwrap(); } #[test] fn concurrent() { let _ = env_logger::try_init(); + rpc::init(TokioDefaultSpawner); let test = async { let (tx, rx) = channel::unbounded(); - spawn!( + tokio_executor::spawn( rpc::Server::new(server::Config::default()) .incoming(stream::once(ready(Ok(rx)))) .respond_with(serve(Server)) + .unit_error() + .boxed() + .compat() ); - let client = await!(new_stub(client::Config::default(), tx)); + let client = await!(new_stub(client::Config::default(), tx))?; let mut c = client.clone(); let req1 = c.add(context::current(), 1, 2); let mut c = client.clone(); @@ -317,8 +325,8 @@ mod functional_test { assert_eq!("Hey, Tim.", await!(req3)?); Ok::<_, io::Error>(()) } - .map_err(|e| panic!(e.to_string())); + .map_err(|e| panic!("test failed: {}", e)); - tokio::run(test.boxed().compat(TokioDefaultSpawner)); + current_thread::block_on_all(test.boxed().compat()).unwrap(); } } diff --git a/tarpc/tests/latency.rs b/tarpc/tests/latency.rs index 732533bb..04c4bd12 100644 --- a/tarpc/tests/latency.rs +++ b/tarpc/tests/latency.rs @@ -18,7 +18,7 @@ extern crate test; use self::test::stats::Stats; -use futures::{compat::TokioDefaultSpawner, future, prelude::*, spawn}; +use futures::{compat::TokioDefaultSpawner, future, prelude::*}; use rpc::{ client, context, server::{self, Handler, Server}, @@ -49,15 +49,18 @@ async fn bench() -> io::Result<()> { let listener = bincode_transport::listen(&"0.0.0.0:0".parse().unwrap())?; let addr = listener.local_addr(); - spawn!( + tokio_executor::spawn( Server::new(server::Config::default()) .incoming(listener) .take(1) .respond_with(ack::serve(Serve)) + .unit_error() + .boxed() + .compat() ); let conn = await!(bincode_transport::connect(&addr))?; - let mut client = await!(ack::new_stub(client::Config::default(), conn)); + let mut client = await!(ack::new_stub(client::Config::default(), conn))?; let total = 10_000usize; let mut successful = 0u32; @@ -116,11 +119,12 @@ async fn bench() -> io::Result<()> { #[test] fn bench_small_packet() { env_logger::init(); + tarpc::init(TokioDefaultSpawner); tokio::run( bench() .map_err(|e| panic!(e.to_string())) .boxed() - .compat(TokioDefaultSpawner), + .compat(), ) }