From 31026673c5119636c8cd2a45e627723da06e9e8d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 27 Oct 2020 12:15:20 -0700 Subject: [PATCH 1/4] test: add tracing to tests --- tower/tests/balance/main.rs | 3 ++ tower/tests/buffer/main.rs | 16 +++++++++- tower/tests/filter/main.rs | 7 +++- tower/tests/hedge/main.rs | 7 ++++ tower/tests/limit/concurrency.rs | 10 ++++++ tower/tests/limit/main.rs | 3 +- tower/tests/limit/rate.rs | 4 ++- tower/tests/load_shed/main.rs | 6 ++++ tower/tests/ready_cache/main.rs | 18 +++++++++++ tower/tests/retry/main.rs | 12 +++++++ tower/tests/spawn_ready/main.rs | 6 ++++ tower/tests/steer/main.rs | 55 ++++++++++++++++---------------- tower/tests/support.rs | 8 +++++ 13 files changed, 123 insertions(+), 32 deletions(-) create mode 100644 tower/tests/support.rs diff --git a/tower/tests/balance/main.rs b/tower/tests/balance/main.rs index 8ca3ce943..bc323da58 100644 --- a/tower/tests/balance/main.rs +++ b/tower/tests/balance/main.rs @@ -1,4 +1,6 @@ #![cfg(feature = "balance")] +#[path = "../support.rs"] +mod support; use std::future::Future; use std::task::{Context, Poll}; @@ -32,6 +34,7 @@ impl tower::load::Load for Mock { #[test] fn stress() { + let _t = support::trace_init(); let mut task = task::spawn(()); let (tx, rx) = tokio::sync::mpsc::unbounded_channel::>(); let mut cache = Balance::<_, Req>::new(rx); diff --git a/tower/tests/buffer/main.rs b/tower/tests/buffer/main.rs index 67c75aefe..d951e6eb8 100644 --- a/tower/tests/buffer/main.rs +++ b/tower/tests/buffer/main.rs @@ -1,5 +1,6 @@ #![cfg(feature = "buffer")] - +#[path = "../support.rs"] +mod support; use std::thread; use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; use tower::buffer::{error, Buffer}; @@ -12,6 +13,8 @@ fn let_worker_work() { #[tokio::test] async fn req_and_res() { + let _t = support::trace_init(); + let (mut service, mut handle) = new_service(); assert_ready_ok!(service.poll_ready()); @@ -25,6 +28,8 @@ async fn req_and_res() { #[tokio::test] async fn clears_canceled_requests() { + let _t = support::trace_init(); + let (mut service, mut handle) = new_service(); handle.allow(1); @@ -61,6 +66,8 @@ async fn clears_canceled_requests() { #[tokio::test] async fn when_inner_is_not_ready() { + let _t = support::trace_init(); + let (mut service, mut handle) = new_service(); // Make the service NotReady @@ -84,6 +91,7 @@ async fn when_inner_is_not_ready() { #[tokio::test] async fn when_inner_fails() { use std::error::Error as StdError; + let _t = support::trace_init(); let (mut service, mut handle) = new_service(); @@ -107,6 +115,8 @@ async fn when_inner_fails() { #[tokio::test] async fn poll_ready_when_worker_is_dropped_early() { + let _t = support::trace_init(); + let (service, _handle) = mock::pair::<(), ()>(); let (service, worker) = Buffer::pair(service, 1); @@ -122,6 +132,8 @@ async fn poll_ready_when_worker_is_dropped_early() { #[tokio::test] async fn response_future_when_worker_is_dropped_early() { + let _t = support::trace_init(); + let (service, mut handle) = mock::pair::<_, ()>(); let (service, worker) = Buffer::pair(service, 1); @@ -142,6 +154,8 @@ async fn response_future_when_worker_is_dropped_early() { #[tokio::test] async fn waits_for_channel_capacity() { + let _t = support::trace_init(); + let (service, mut handle) = mock::pair::<&'static str, &'static str>(); let (service, worker) = Buffer::pair(service, 3); diff --git a/tower/tests/filter/main.rs b/tower/tests/filter/main.rs index 62741258d..8a7789bb5 100644 --- a/tower/tests/filter/main.rs +++ b/tower/tests/filter/main.rs @@ -1,5 +1,6 @@ #![cfg(feature = "filter")] - +#[path = "../support.rs"] +mod support; use futures_util::{future::poll_fn, pin_mut}; use std::future::Future; use tower::filter::{error::Error, Filter}; @@ -8,6 +9,8 @@ use tower_test::{assert_request_eq, mock}; #[tokio::test] async fn passthrough_sync() { + let _t = support::trace_init(); + let (mut service, handle) = new_service(|_| async { Ok(()) }); let th = tokio::spawn(async move { @@ -39,6 +42,8 @@ async fn passthrough_sync() { #[tokio::test] async fn rejected_sync() { + let _t = support::trace_init(); + let (mut service, _handle) = new_service(|_| async { Err(Error::rejected()) }); service.call("hello".into()).await.unwrap_err(); diff --git a/tower/tests/hedge/main.rs b/tower/tests/hedge/main.rs index a4e286730..3364d4427 100644 --- a/tower/tests/hedge/main.rs +++ b/tower/tests/hedge/main.rs @@ -1,4 +1,6 @@ #![cfg(feature = "hedge")] +#[path = "../support.rs"] +mod support; use std::time::Duration; use tokio::time; @@ -8,6 +10,7 @@ use tower_test::{assert_request_eq, mock}; #[tokio::test] async fn hedge_orig_completes_first() { + let _t = support::trace_init(); time::pause(); let (mut service, mut handle) = new_service(TestPolicy); @@ -35,6 +38,7 @@ async fn hedge_orig_completes_first() { #[tokio::test] async fn hedge_hedge_completes_first() { + let _t = support::trace_init(); time::pause(); let (mut service, mut handle) = new_service(TestPolicy); @@ -63,6 +67,7 @@ async fn hedge_hedge_completes_first() { #[tokio::test] async fn completes_before_hedge() { + let _t = support::trace_init(); let (mut service, mut handle) = new_service(TestPolicy); assert_ready_ok!(service.poll_ready()); @@ -82,6 +87,7 @@ async fn completes_before_hedge() { #[tokio::test] async fn request_not_retyable() { + let _t = support::trace_init(); time::pause(); let (mut service, mut handle) = new_service(TestPolicy); @@ -109,6 +115,7 @@ async fn request_not_retyable() { #[tokio::test] async fn request_not_clonable() { + let _t = support::trace_init(); time::pause(); let (mut service, mut handle) = new_service(TestPolicy); diff --git a/tower/tests/limit/concurrency.rs b/tower/tests/limit/concurrency.rs index 7e2ba4f99..d68d31265 100644 --- a/tower/tests/limit/concurrency.rs +++ b/tower/tests/limit/concurrency.rs @@ -1,9 +1,12 @@ +#[path = "../support.rs"] +mod support; use tokio_test::{assert_pending, assert_ready, assert_ready_ok}; use tower::limit::concurrency::ConcurrencyLimitLayer; use tower_test::{assert_request_eq, mock}; #[tokio::test] async fn basic_service_limit_functionality_with_poll_ready() { + let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(2); let (mut service, mut handle) = mock::spawn_layer(limit); @@ -47,6 +50,7 @@ async fn basic_service_limit_functionality_with_poll_ready() { #[tokio::test] async fn basic_service_limit_functionality_without_poll_ready() { + let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(2); let (mut service, mut handle) = mock::spawn_layer(limit); @@ -92,6 +96,7 @@ async fn basic_service_limit_functionality_without_poll_ready() { #[tokio::test] async fn request_without_capacity() { + let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(0); let (mut service, _) = mock::spawn_layer::<(), (), _>(limit); @@ -100,6 +105,7 @@ async fn request_without_capacity() { #[tokio::test] async fn reserve_capacity_without_sending_request() { + let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(1); let (mut s1, mut handle) = mock::spawn_layer(limit); @@ -125,6 +131,7 @@ async fn reserve_capacity_without_sending_request() { #[tokio::test] async fn service_drop_frees_capacity() { + let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(1); let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit); @@ -144,6 +151,7 @@ async fn service_drop_frees_capacity() { #[tokio::test] async fn response_error_releases_capacity() { + let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(1); let (mut s1, mut handle) = mock::spawn_layer::<_, (), _>(limit); @@ -164,6 +172,7 @@ async fn response_error_releases_capacity() { #[tokio::test] async fn response_future_drop_releases_capacity() { + let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(1); let (mut s1, _handle) = mock::spawn_layer::<_, (), _>(limit); @@ -184,6 +193,7 @@ async fn response_future_drop_releases_capacity() { #[tokio::test] async fn multi_waiters() { + let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(1); let (mut s1, _handle) = mock::spawn_layer::<(), (), _>(limit); let mut s2 = s1.clone(); diff --git a/tower/tests/limit/main.rs b/tower/tests/limit/main.rs index f80d68b54..12744547d 100644 --- a/tower/tests/limit/main.rs +++ b/tower/tests/limit/main.rs @@ -1,4 +1,5 @@ #![cfg(feature = "limit")] - mod concurrency; mod rate; +#[path = "../support.rs"] +pub(crate) mod support; diff --git a/tower/tests/limit/rate.rs b/tower/tests/limit/rate.rs index 946f86f00..bd7f67c29 100644 --- a/tower/tests/limit/rate.rs +++ b/tower/tests/limit/rate.rs @@ -1,3 +1,4 @@ +use super::support; use std::time::Duration; use tokio::time; use tokio_test::{assert_pending, assert_ready, assert_ready_ok}; @@ -6,6 +7,7 @@ use tower_test::{assert_request_eq, mock}; #[tokio::test] async fn reaching_capacity() { + let _t = support::trace_init(); time::pause(); let rate_limit = RateLimitLayer::new(1, Duration::from_millis(100)); @@ -42,7 +44,7 @@ async fn remaining_gets_reset() { // as ready. Then we can advance the clock to put us beyond the current period. When we make // subsequent requests the `rem` for the next window is continued from the previous when // it should be totally reset. - + let _t = support::trace_init(); time::pause(); let rate_limit = RateLimitLayer::new(3, Duration::from_millis(100)); diff --git a/tower/tests/load_shed/main.rs b/tower/tests/load_shed/main.rs index 6bad7ab21..2bebe04f8 100644 --- a/tower/tests/load_shed/main.rs +++ b/tower/tests/load_shed/main.rs @@ -1,4 +1,6 @@ #![cfg(feature = "load-shed")] +#[path = "../support.rs"] +mod support; use tokio_test::{assert_ready_err, assert_ready_ok, task}; use tower::load_shed::LoadShedLayer; @@ -6,6 +8,8 @@ use tower_test::{assert_request_eq, mock}; #[tokio::test] async fn when_ready() { + let _t = support::trace_init(); + let layer = LoadShedLayer::new(); let (mut service, mut handle) = mock::spawn_layer(layer); @@ -19,6 +23,8 @@ async fn when_ready() { #[tokio::test] async fn when_not_ready() { + let _t = support::trace_init(); + let layer = LoadShedLayer::new(); let (mut service, mut handle) = mock::spawn_layer::<_, (), _>(layer); diff --git a/tower/tests/ready_cache/main.rs b/tower/tests/ready_cache/main.rs index 4578e4ef8..f7acbf81c 100644 --- a/tower/tests/ready_cache/main.rs +++ b/tower/tests/ready_cache/main.rs @@ -1,4 +1,6 @@ #![cfg(feature = "ready-cache")] +#[path = "../support.rs"] +mod support; use tokio_test::{assert_pending, assert_ready, task}; use tower::ready_cache::ReadyCache; @@ -9,6 +11,8 @@ type Mock = mock::Mock; #[test] fn poll_ready_inner_failure() { + let _t = support::trace_init(); + let mut task = task::spawn(()); let mut cache = ReadyCache::::default(); @@ -30,6 +34,8 @@ fn poll_ready_inner_failure() { #[test] fn poll_ready_not_ready() { + let _t = support::trace_init(); + let mut task = task::spawn(()); let mut cache = ReadyCache::::default(); @@ -50,6 +56,8 @@ fn poll_ready_not_ready() { #[test] fn poll_ready_promotes_inner() { + let _t = support::trace_init(); + let mut task = task::spawn(()); let mut cache = ReadyCache::::default(); @@ -74,6 +82,8 @@ fn poll_ready_promotes_inner() { #[test] fn evict_ready_then_error() { + let _t = support::trace_init(); + let mut task = task::spawn(()); let mut cache = ReadyCache::::default(); @@ -94,6 +104,8 @@ fn evict_ready_then_error() { #[test] fn evict_pending_then_error() { + let _t = support::trace_init(); + let mut task = task::spawn(()); let mut cache = ReadyCache::::default(); @@ -111,6 +123,8 @@ fn evict_pending_then_error() { #[test] fn push_then_evict() { + let _t = support::trace_init(); + let mut task = task::spawn(()); let mut cache = ReadyCache::::default(); @@ -125,6 +139,8 @@ fn push_then_evict() { #[test] fn error_after_promote() { + let _t = support::trace_init(); + let mut task = task::spawn(()); let mut cache = ReadyCache::::default(); @@ -143,6 +159,8 @@ fn error_after_promote() { #[test] fn duplicate_key_by_index() { + let _t = support::trace_init(); + let mut task = task::spawn(()); let mut cache = ReadyCache::::default(); diff --git a/tower/tests/retry/main.rs b/tower/tests/retry/main.rs index ae97fc504..227252dbc 100644 --- a/tower/tests/retry/main.rs +++ b/tower/tests/retry/main.rs @@ -1,4 +1,6 @@ #![cfg(feature = "retry")] +#[path = "../support.rs"] +mod support; use futures_util::future; use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task}; @@ -7,6 +9,8 @@ use tower_test::{assert_request_eq, mock}; #[tokio::test] async fn retry_errors() { + let _t = support::trace_init(); + let (mut service, mut handle) = new_service(RetryErrors); assert_ready_ok!(service.poll_ready()); @@ -24,6 +28,8 @@ async fn retry_errors() { #[tokio::test] async fn retry_limit() { + let _t = support::trace_init(); + let (mut service, mut handle) = new_service(Limit(2)); assert_ready_ok!(service.poll_ready()); @@ -42,6 +48,8 @@ async fn retry_limit() { #[tokio::test] async fn retry_error_inspection() { + let _t = support::trace_init(); + let (mut service, mut handle) = new_service(UnlessErr("reject")); assert_ready_ok!(service.poll_ready()); @@ -56,6 +64,8 @@ async fn retry_error_inspection() { #[tokio::test] async fn retry_cannot_clone_request() { + let _t = support::trace_init(); + let (mut service, mut handle) = new_service(CannotClone); assert_ready_ok!(service.poll_ready()); @@ -67,6 +77,8 @@ async fn retry_cannot_clone_request() { #[tokio::test] async fn success_with_cannot_clone() { + let _t = support::trace_init(); + // Even though the request couldn't be cloned, if the first request succeeds, // it should succeed overall. let (mut service, mut handle) = new_service(CannotClone); diff --git a/tower/tests/spawn_ready/main.rs b/tower/tests/spawn_ready/main.rs index db28e395e..b44aa6959 100644 --- a/tower/tests/spawn_ready/main.rs +++ b/tower/tests/spawn_ready/main.rs @@ -1,4 +1,6 @@ #![cfg(feature = "spawn-ready")] +#[path = "../support.rs"] +mod support; use std::{thread, time::Duration}; use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok}; @@ -7,6 +9,8 @@ use tower_test::mock; #[tokio::test] async fn when_inner_is_not_ready() { + let _t = support::trace_init(); + let layer = SpawnReadyLayer::new(); let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer); @@ -23,6 +27,8 @@ async fn when_inner_is_not_ready() { #[tokio::test] async fn when_inner_fails() { + let _t = support::trace_init(); + let layer = SpawnReadyLayer::new(); let (mut service, mut handle) = mock::spawn_layer::<(), (), _>(layer); diff --git a/tower/tests/steer/main.rs b/tower/tests/steer/main.rs index 092b09433..d14279594 100644 --- a/tower/tests/steer/main.rs +++ b/tower/tests/steer/main.rs @@ -1,4 +1,6 @@ #![cfg(feature = "steer")] +#[path = "../support.rs"] +mod support; use futures_util::future::{ready, Ready}; use std::task::{Context, Poll}; @@ -27,34 +29,31 @@ impl Service for MyService { } } -#[test] -fn pick_correctly() { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async move { - let srvs = vec![MyService(42, true), MyService(57, true)]; - let mut st = Steer::new(srvs, |_: &_, _: &[_]| 1); - - futures_util::future::poll_fn(|cx| st.poll_ready(cx)) - .await - .unwrap(); - let r = st.call(String::from("foo")).await.unwrap(); - assert_eq!(r, 57); - }); +#[tokio::test] +async fn pick_correctly() { + let _t = support::trace_init(); + let srvs = vec![MyService(42, true), MyService(57, true)]; + let mut st = Steer::new(srvs, |_: &_, _: &[_]| 1); + + futures_util::future::poll_fn(|cx| st.poll_ready(cx)) + .await + .unwrap(); + let r = st.call(String::from("foo")).await.unwrap(); + assert_eq!(r, 57); } -#[test] -fn pending_all_ready() { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async move { - let srvs = vec![MyService(42, true), MyService(57, false)]; - let mut st = Steer::new(srvs, |_: &_, _: &[_]| 0); - - let p = futures_util::poll!(futures_util::future::poll_fn(|cx| st.poll_ready(cx))); - match p { - Poll::Pending => (), - _ => panic!( - "Steer should not return poll_ready if at least one component service is not ready" - ), - } - }); +#[tokio::test] +async fn pending_all_ready() { + let _t = support::trace_init(); + + let srvs = vec![MyService(42, true), MyService(57, false)]; + let mut st = Steer::new(srvs, |_: &_, _: &[_]| 0); + + let p = futures_util::poll!(futures_util::future::poll_fn(|cx| st.poll_ready(cx))); + match p { + Poll::Pending => (), + _ => panic!( + "Steer should not return poll_ready if at least one component service is not ready" + ), + } } diff --git a/tower/tests/support.rs b/tower/tests/support.rs new file mode 100644 index 000000000..1c4147578 --- /dev/null +++ b/tower/tests/support.rs @@ -0,0 +1,8 @@ +pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard { + let subscriber = tracing_subscriber::fmt() + .with_test_writer() + .with_max_level(tracing::Level::TRACE) + .with_thread_names(true) + .finish(); + tracing::subscriber::set_default(subscriber) +} From ea05fdc01226f536f87d38eb2495782a6068ee6e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 27 Oct 2020 12:20:19 -0700 Subject: [PATCH 2/4] ensure tests use current-thread runtime Signed-off-by: Eliza Weisman --- tower-test/tests/mock.rs | 4 ++-- tower/tests/buffer/main.rs | 14 +++++++------- tower/tests/builder.rs | 6 ++++-- tower/tests/filter/main.rs | 4 ++-- tower/tests/hedge/main.rs | 10 +++++----- tower/tests/limit/concurrency.rs | 16 ++++++++-------- tower/tests/limit/rate.rs | 4 ++-- tower/tests/load_shed/main.rs | 4 ++-- tower/tests/retry/main.rs | 10 +++++----- tower/tests/spawn_ready/main.rs | 4 ++-- tower/tests/steer/main.rs | 4 ++-- tower/tests/support.rs | 2 ++ tower/tests/util/call_all.rs | 6 +++++- tower/tests/util/main.rs | 2 ++ tower/tests/util/oneshot.rs | 3 ++- tower/tests/util/service_fn.rs | 4 +++- 16 files changed, 55 insertions(+), 42 deletions(-) diff --git a/tower-test/tests/mock.rs b/tower-test/tests/mock.rs index 1dd43b54d..0c6e73554 100644 --- a/tower-test/tests/mock.rs +++ b/tower-test/tests/mock.rs @@ -1,7 +1,7 @@ use tokio_test::{assert_pending, assert_ready}; use tower_test::{assert_request_eq, mock}; -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn single_request_ready() { let (mut service, mut handle) = mock::spawn(); @@ -16,7 +16,7 @@ async fn single_request_ready() { assert_eq!(response.await.unwrap(), "world"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] #[should_panic] async fn backpressure() { let (mut service, mut handle) = mock::spawn::<_, ()>(); diff --git a/tower/tests/buffer/main.rs b/tower/tests/buffer/main.rs index d951e6eb8..66a768d2e 100644 --- a/tower/tests/buffer/main.rs +++ b/tower/tests/buffer/main.rs @@ -11,7 +11,7 @@ fn let_worker_work() { thread::sleep(::std::time::Duration::from_millis(100)); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn req_and_res() { let _t = support::trace_init(); @@ -26,7 +26,7 @@ async fn req_and_res() { assert_eq!(assert_ready_ok!(response.poll()), "world"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn clears_canceled_requests() { let _t = support::trace_init(); @@ -64,7 +64,7 @@ async fn clears_canceled_requests() { assert_eq!(assert_ready_ok!(res3.poll()), "world3"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn when_inner_is_not_ready() { let _t = support::trace_init(); @@ -88,7 +88,7 @@ async fn when_inner_is_not_ready() { assert_eq!(assert_ready_ok!(res1.poll()), "world"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn when_inner_fails() { use std::error::Error as StdError; let _t = support::trace_init(); @@ -113,7 +113,7 @@ async fn when_inner_fails() { } } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn poll_ready_when_worker_is_dropped_early() { let _t = support::trace_init(); @@ -130,7 +130,7 @@ async fn poll_ready_when_worker_is_dropped_early() { assert!(err.is::(), "should be a Closed: {:?}", err); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn response_future_when_worker_is_dropped_early() { let _t = support::trace_init(); @@ -152,7 +152,7 @@ async fn response_future_when_worker_is_dropped_early() { assert!(err.is::(), "should be a Closed: {:?}", err); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn waits_for_channel_capacity() { let _t = support::trace_init(); diff --git a/tower/tests/builder.rs b/tower/tests/builder.rs index 96ffdc721..335ee5c9e 100644 --- a/tower/tests/builder.rs +++ b/tower/tests/builder.rs @@ -1,5 +1,5 @@ #![cfg(all(feature = "buffer", feature = "limit", feature = "retry"))] - +mod support; use futures_util::{future::Ready, pin_mut}; use std::time::Duration; use tower::builder::ServiceBuilder; @@ -8,8 +8,10 @@ use tower::util::ServiceExt; use tower_service::*; use tower_test::{assert_request_eq, mock}; -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn builder_service() { + let _t = support::trace_init(); + let (service, handle) = mock::pair(); pin_mut!(handle); diff --git a/tower/tests/filter/main.rs b/tower/tests/filter/main.rs index 8a7789bb5..2bcd7b0e5 100644 --- a/tower/tests/filter/main.rs +++ b/tower/tests/filter/main.rs @@ -7,7 +7,7 @@ use tower::filter::{error::Error, Filter}; use tower_service::Service; use tower_test::{assert_request_eq, mock}; -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn passthrough_sync() { let _t = support::trace_init(); @@ -40,7 +40,7 @@ async fn passthrough_sync() { th.await.unwrap(); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn rejected_sync() { let _t = support::trace_init(); diff --git a/tower/tests/hedge/main.rs b/tower/tests/hedge/main.rs index 3364d4427..3008a64e4 100644 --- a/tower/tests/hedge/main.rs +++ b/tower/tests/hedge/main.rs @@ -8,7 +8,7 @@ use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task}; use tower::hedge::{Hedge, Policy}; use tower_test::{assert_request_eq, mock}; -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn hedge_orig_completes_first() { let _t = support::trace_init(); time::pause(); @@ -36,7 +36,7 @@ async fn hedge_orig_completes_first() { assert_eq!(assert_ready_ok!(fut.poll()), "orig-done"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn hedge_hedge_completes_first() { let _t = support::trace_init(); time::pause(); @@ -65,7 +65,7 @@ async fn hedge_hedge_completes_first() { assert_eq!(assert_ready_ok!(fut.poll()), "hedge-done"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn completes_before_hedge() { let _t = support::trace_init(); let (mut service, mut handle) = new_service(TestPolicy); @@ -85,7 +85,7 @@ async fn completes_before_hedge() { assert_eq!(assert_ready_ok!(fut.poll()), "orig-done"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn request_not_retyable() { let _t = support::trace_init(); time::pause(); @@ -113,7 +113,7 @@ async fn request_not_retyable() { assert_eq!(assert_ready_ok!(fut.poll()), "orig-done"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn request_not_clonable() { let _t = support::trace_init(); time::pause(); diff --git a/tower/tests/limit/concurrency.rs b/tower/tests/limit/concurrency.rs index d68d31265..a471c52f5 100644 --- a/tower/tests/limit/concurrency.rs +++ b/tower/tests/limit/concurrency.rs @@ -4,7 +4,7 @@ use tokio_test::{assert_pending, assert_ready, assert_ready_ok}; use tower::limit::concurrency::ConcurrencyLimitLayer; use tower_test::{assert_request_eq, mock}; -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn basic_service_limit_functionality_with_poll_ready() { let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(2); @@ -48,7 +48,7 @@ async fn basic_service_limit_functionality_with_poll_ready() { assert_eq!(r3.await.unwrap(), "world 3"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn basic_service_limit_functionality_without_poll_ready() { let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(2); @@ -94,7 +94,7 @@ async fn basic_service_limit_functionality_without_poll_ready() { assert_eq!(r4.await.unwrap(), "world 4"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn request_without_capacity() { let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(0); @@ -103,7 +103,7 @@ async fn request_without_capacity() { assert_pending!(service.poll_ready()); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn reserve_capacity_without_sending_request() { let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(1); @@ -129,7 +129,7 @@ async fn reserve_capacity_without_sending_request() { assert_ready_ok!(s2.poll_ready()); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn service_drop_frees_capacity() { let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(1); @@ -149,7 +149,7 @@ async fn service_drop_frees_capacity() { assert_ready_ok!(s2.poll_ready()); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn response_error_releases_capacity() { let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(1); @@ -170,7 +170,7 @@ async fn response_error_releases_capacity() { assert_ready_ok!(s2.poll_ready()); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn response_future_drop_releases_capacity() { let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(1); @@ -191,7 +191,7 @@ async fn response_future_drop_releases_capacity() { assert_ready_ok!(s2.poll_ready()); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn multi_waiters() { let _t = support::trace_init(); let limit = ConcurrencyLimitLayer::new(1); diff --git a/tower/tests/limit/rate.rs b/tower/tests/limit/rate.rs index bd7f67c29..9c49e4baf 100644 --- a/tower/tests/limit/rate.rs +++ b/tower/tests/limit/rate.rs @@ -5,7 +5,7 @@ use tokio_test::{assert_pending, assert_ready, assert_ready_ok}; use tower::limit::rate::RateLimitLayer; use tower_test::{assert_request_eq, mock}; -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn reaching_capacity() { let _t = support::trace_init(); time::pause(); @@ -35,7 +35,7 @@ async fn reaching_capacity() { assert_eq!(response.await.unwrap(), "done"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn remaining_gets_reset() { // This test checks for the case where the `until` state gets reset // but the `rem` does not. This was a bug found `cd7dd12315706fc0860a35646b1eb7b60c50a5c1`. diff --git a/tower/tests/load_shed/main.rs b/tower/tests/load_shed/main.rs index 2bebe04f8..98eba860a 100644 --- a/tower/tests/load_shed/main.rs +++ b/tower/tests/load_shed/main.rs @@ -6,7 +6,7 @@ use tokio_test::{assert_ready_err, assert_ready_ok, task}; use tower::load_shed::LoadShedLayer; use tower_test::{assert_request_eq, mock}; -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn when_ready() { let _t = support::trace_init(); @@ -21,7 +21,7 @@ async fn when_ready() { assert_eq!(assert_ready_ok!(response.poll()), "world"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn when_not_ready() { let _t = support::trace_init(); diff --git a/tower/tests/retry/main.rs b/tower/tests/retry/main.rs index 227252dbc..a07242b0b 100644 --- a/tower/tests/retry/main.rs +++ b/tower/tests/retry/main.rs @@ -7,7 +7,7 @@ use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task}; use tower::retry::Policy; use tower_test::{assert_request_eq, mock}; -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn retry_errors() { let _t = support::trace_init(); @@ -26,7 +26,7 @@ async fn retry_errors() { assert_eq!(fut.into_inner().await.unwrap(), "world"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn retry_limit() { let _t = support::trace_init(); @@ -46,7 +46,7 @@ async fn retry_limit() { assert_eq!(assert_ready_err!(fut.poll()).to_string(), "retry 3"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn retry_error_inspection() { let _t = support::trace_init(); @@ -62,7 +62,7 @@ async fn retry_error_inspection() { assert_eq!(assert_ready_err!(fut.poll()).to_string(), "reject"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn retry_cannot_clone_request() { let _t = support::trace_init(); @@ -75,7 +75,7 @@ async fn retry_cannot_clone_request() { assert_eq!(assert_ready_err!(fut.poll()).to_string(), "retry 1"); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn success_with_cannot_clone() { let _t = support::trace_init(); diff --git a/tower/tests/spawn_ready/main.rs b/tower/tests/spawn_ready/main.rs index b44aa6959..682b2901c 100644 --- a/tower/tests/spawn_ready/main.rs +++ b/tower/tests/spawn_ready/main.rs @@ -7,7 +7,7 @@ use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok use tower::spawn_ready::SpawnReadyLayer; use tower_test::mock; -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn when_inner_is_not_ready() { let _t = support::trace_init(); @@ -25,7 +25,7 @@ async fn when_inner_is_not_ready() { assert_ready_ok!(service.poll_ready()); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn when_inner_fails() { let _t = support::trace_init(); diff --git a/tower/tests/steer/main.rs b/tower/tests/steer/main.rs index d14279594..1ff08d32c 100644 --- a/tower/tests/steer/main.rs +++ b/tower/tests/steer/main.rs @@ -29,7 +29,7 @@ impl Service for MyService { } } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn pick_correctly() { let _t = support::trace_init(); let srvs = vec![MyService(42, true), MyService(57, true)]; @@ -42,7 +42,7 @@ async fn pick_correctly() { assert_eq!(r, 57); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn pending_all_ready() { let _t = support::trace_init(); diff --git a/tower/tests/support.rs b/tower/tests/support.rs index 1c4147578..ba67c0cb5 100644 --- a/tower/tests/support.rs +++ b/tower/tests/support.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard { let subscriber = tracing_subscriber::fmt() .with_test_writer() diff --git a/tower/tests/util/call_all.rs b/tower/tests/util/call_all.rs index 5964f9da0..a7633814c 100644 --- a/tower/tests/util/call_all.rs +++ b/tower/tests/util/call_all.rs @@ -39,6 +39,8 @@ impl Service<&'static str> for Srv { #[test] fn ordered() { + let _t = super::support::trace_init(); + let mut mock = task::spawn(()); let admit = Rc::new(Cell::new(false)); @@ -108,8 +110,10 @@ fn ordered() { ); } -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn unordered() { + let _t = super::support::trace_init(); + let (mock, handle) = mock::pair::<_, &'static str>(); pin_mut!(handle); diff --git a/tower/tests/util/main.rs b/tower/tests/util/main.rs index 1f7f773b5..e80bb0f2c 100644 --- a/tower/tests/util/main.rs +++ b/tower/tests/util/main.rs @@ -3,3 +3,5 @@ mod call_all; mod oneshot; mod service_fn; +#[path = "../support.rs"] +pub(crate) mod support; diff --git a/tower/tests/util/oneshot.rs b/tower/tests/util/oneshot.rs index c2b95e183..3025edae9 100644 --- a/tower/tests/util/oneshot.rs +++ b/tower/tests/util/oneshot.rs @@ -3,10 +3,11 @@ use std::{future::Future, pin::Pin}; use tower::util::ServiceExt; use tower_service::Service; -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn service_driven_to_readiness() { // This test ensures that `oneshot` will repeatedly call `poll_ready` until // the service is ready. + let _t = super::support::trace_init(); struct PollMeTwice { ready: bool, diff --git a/tower/tests/util/service_fn.rs b/tower/tests/util/service_fn.rs index d02415936..ac6bf06f3 100644 --- a/tower/tests/util/service_fn.rs +++ b/tower/tests/util/service_fn.rs @@ -2,8 +2,10 @@ use futures_util::future::ready; use tower::util::service_fn; use tower_service::Service; -#[tokio::test] +#[tokio::test(flavor = "current_thread")] async fn simple() { + let _t = super::support::trace_init(); + let mut add_one = service_fn(|req| ready(Ok::<_, ()>(req + 1))); let answer = add_one.call(1).await.unwrap(); assert_eq!(answer, 2); From e796887f257567b8d43306682670049dc2b7764e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 27 Oct 2020 09:48:24 -0700 Subject: [PATCH 3/4] add tests for pending waiters on buffer --- tower/tests/buffer/main.rs | 121 +++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/tower/tests/buffer/main.rs b/tower/tests/buffer/main.rs index 66a768d2e..77bef8538 100644 --- a/tower/tests/buffer/main.rs +++ b/tower/tests/buffer/main.rs @@ -227,6 +227,127 @@ async fn waits_for_channel_capacity() { assert_ready_ok!(response4.poll()); } +#[tokio::test(flavor = "current_thread")] +async fn wakes_pending_waiters_on_close() { + let _t = support::trace_init(); + use tower::{util::ServiceExt, Service}; + + let (service, mut handle) = mock::pair::<_, ()>(); + + let (mut service, worker) = Buffer::pair(service, 1); + let mut worker = task::spawn(worker); + + // keep the request in the worker + handle.allow(0); + let service1 = service.ready_and().await.unwrap(); + assert_pending!(worker.poll()); + let mut response = task::spawn(service1.call("hello")); + + let mut service1 = service.clone(); + let mut ready_and1 = task::spawn(service1.ready_and()); + assert_pending!(worker.poll()); + assert_pending!(ready_and1.poll(), "no capacity"); + + let mut service1 = service.clone(); + let mut ready_and2 = task::spawn(service1.ready_and()); + assert_pending!(worker.poll()); + assert_pending!(ready_and2.poll(), "no capacity"); + + // kill the worker task + drop(worker); + + let err = assert_ready_err!(response.poll()); + assert!( + err.is::(), + "response should fail with a Closed, got: {:?}", + err + ); + + assert!( + ready_and1.is_woken(), + "dropping worker should wake ready_and task 1" + ); + let err = assert_ready_err!(ready_and1.poll()); + assert!( + err.is::(), + "ready_and 1 should fail with a Closed, got: {:?}", + err + ); + + assert!( + ready_and2.is_woken(), + "dropping worker should wake ready_and task 2" + ); + let err = assert_ready_err!(ready_and1.poll()); + assert!( + err.is::(), + "ready_and 2 should fail with a Closed, got: {:?}", + err + ); +} + +#[tokio::test(flavor = "current_thread")] +async fn wakes_pending_waiters_on_failure() { + let _t = support::trace_init(); + use std::error::Error as StdError; + use tower::{util::ServiceExt, Service}; + + let (service, mut handle) = mock::pair::<_, ()>(); + + let (mut service, worker) = Buffer::pair(service, 1); + let mut worker = task::spawn(worker); + + // keep the request in the worker + handle.allow(0); + let service1 = service.ready_and().await.unwrap(); + assert_pending!(worker.poll()); + let mut response = task::spawn(service1.call("hello")); + + let mut service1 = service.clone(); + let mut ready_and1 = task::spawn(service1.ready_and()); + assert_pending!(worker.poll()); + assert_pending!(ready_and1.poll(), "no capacity"); + + let mut service1 = service.clone(); + let mut ready_and2 = task::spawn(service1.ready_and()); + assert_pending!(worker.poll()); + assert_pending!(ready_and2.poll(), "no capacity"); + + // fail the inner service + handle.send_error("foobar"); + // worker task terminates + assert_ready!(worker.poll()); + + let err = assert_ready_err!(response.poll()); + assert!( + err.is::(), + "response should fail with a ServiceError, got: {:?}", + err + ); + + assert!( + ready_and1.is_woken(), + "dropping worker should wake ready_and task 1" + ); + let err = assert_ready_err!(ready_and1.poll()); + assert!( + err.is::(), + "ready_and 1 should fail with a ServiceError, got: {:?}", + err + ); + + assert!( + ready_and2.is_woken(), + "dropping worker should wake ready_and task 2" + ); + let err = assert_ready_err!(ready_and1.poll()); + assert!( + err.is::(), + "ready_and 2 should fail with a ServiceError, got: {:?}", + err + ); +} + type Mock = mock::Mock<&'static str, &'static str>; type Handle = mock::Handle<&'static str, &'static str>; From 9cecf463cafcfc3666e4a1be40c6dbc482dd7967 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 27 Oct 2020 11:56:28 -0700 Subject: [PATCH 4/4] fix buffer not notifying waiters when closing/erroring --- tower/src/buffer/service.rs | 4 ++-- tower/src/buffer/worker.rs | 23 ++++++++++++++++++++- tower/src/semaphore.rs | 41 ++++++++++++++++++++++++++++++++++++- tower/tests/buffer/main.rs | 4 +--- 4 files changed, 65 insertions(+), 7 deletions(-) diff --git a/tower/src/buffer/service.rs b/tower/src/buffer/service.rs index 882a35edc..d106e760b 100644 --- a/tower/src/buffer/service.rs +++ b/tower/src/buffer/service.rs @@ -80,8 +80,8 @@ where Request: Send + 'static, { let (tx, rx) = mpsc::unbounded_channel(); - let (handle, worker) = Worker::new(service, rx); - let semaphore = Semaphore::new(bound); + let (semaphore, wake_waiters) = Semaphore::new_with_close(bound); + let (handle, worker) = Worker::new(service, rx, wake_waiters); ( Buffer { tx, diff --git a/tower/src/buffer/worker.rs b/tower/src/buffer/worker.rs index ca5640bad..1f70c8e2f 100644 --- a/tower/src/buffer/worker.rs +++ b/tower/src/buffer/worker.rs @@ -20,7 +20,7 @@ use tower_service::Service; /// as part of the public API. This is the "sealed" pattern to include "private" /// types in public traits that are not meant for consumers of the library to /// implement (only call). -#[pin_project] +#[pin_project(PinnedDrop)] #[derive(Debug)] pub struct Worker where @@ -33,6 +33,7 @@ where finish: bool, failed: Option, handle: Handle, + close: Option, } /// Get the error out @@ -49,6 +50,7 @@ where pub(crate) fn new( service: T, rx: mpsc::UnboundedReceiver>, + close: crate::semaphore::Close, ) -> (Handle, Worker) { let handle = Handle { inner: Arc::new(Mutex::new(None)), @@ -61,6 +63,7 @@ where rx, service, handle: handle.clone(), + close: Some(close), }; (handle, worker) @@ -195,6 +198,11 @@ where .as_ref() .expect("Worker::failed did not set self.failed?") .clone())); + // Wake any tasks waiting on channel capacity. + if let Some(close) = self.close.take() { + tracing::debug!("waking pending tasks"); + close.close(); + } } } } @@ -208,6 +216,19 @@ where } } +#[pin_project::pinned_drop] +impl PinnedDrop for Worker +where + T: Service, + T::Error: Into, +{ + fn drop(mut self: Pin<&mut Self>) { + if let Some(close) = self.as_mut().close.take() { + close.close(); + } + } +} + impl Handle { pub(crate) fn get_error_on_closed(&self) -> crate::BoxError { self.inner diff --git a/tower/src/semaphore.rs b/tower/src/semaphore.rs index e15b923de..ea1c005c2 100644 --- a/tower/src/semaphore.rs +++ b/tower/src/semaphore.rs @@ -5,7 +5,7 @@ use std::{ future::Future, mem, pin::Pin, - sync::Arc, + sync::{Arc, Weak}, task::{Context, Poll}, }; use tokio::sync; @@ -16,6 +16,12 @@ pub(crate) struct Semaphore { state: State, } +#[derive(Debug)] +pub(crate) struct Close { + semaphore: Weak, + permits: usize, +} + enum State { Waiting(Pin + Send + 'static>>), Ready(Permit), @@ -23,6 +29,19 @@ enum State { } impl Semaphore { + pub(crate) fn new_with_close(permits: usize) -> (Self, Close) { + let semaphore = Arc::new(sync::Semaphore::new(permits)); + let close = Close { + semaphore: Arc::downgrade(&semaphore), + permits, + }; + let semaphore = Self { + semaphore, + state: State::Empty, + }; + (semaphore, close) + } + pub(crate) fn new(permits: usize) -> Self { Self { semaphore: Arc::new(sync::Semaphore::new(permits)), @@ -72,3 +91,23 @@ impl fmt::Debug for State { } } } + +impl Close { + /// Close the semaphore, waking any remaining tasks currently awaiting a permit. + pub(crate) fn close(self) { + // The maximum number of permits that a `tokio::sync::Semaphore` + // can hold is usize::MAX >> 3. If we attempt to add more than that + // number of permits, the semaphore will panic. + // XXX(eliza): another shift is kinda janky but if we add (usize::MAX + // > 3 - initial permits) the semaphore impl panics (I think due to a + // bug in tokio?). + // TODO(eliza): Tokio should _really_ just expose `Semaphore::close` + // publicly so we don't have to do this nonsense... + const MAX: usize = std::usize::MAX >> 4; + if let Some(semaphore) = self.semaphore.upgrade() { + // If we added `MAX - available_permits`, any tasks that are + // currently holding permits could drop them, overflowing the max. + semaphore.add_permits(MAX - self.permits); + } + } +} diff --git a/tower/tests/buffer/main.rs b/tower/tests/buffer/main.rs index 77bef8538..41b0336f4 100644 --- a/tower/tests/buffer/main.rs +++ b/tower/tests/buffer/main.rs @@ -4,6 +4,7 @@ mod support; use std::thread; use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; use tower::buffer::{error, Buffer}; +use tower::{util::ServiceExt, Service}; use tower_test::{assert_request_eq, mock}; fn let_worker_work() { @@ -230,7 +231,6 @@ async fn waits_for_channel_capacity() { #[tokio::test(flavor = "current_thread")] async fn wakes_pending_waiters_on_close() { let _t = support::trace_init(); - use tower::{util::ServiceExt, Service}; let (service, mut handle) = mock::pair::<_, ()>(); @@ -289,8 +289,6 @@ async fn wakes_pending_waiters_on_close() { #[tokio::test(flavor = "current_thread")] async fn wakes_pending_waiters_on_failure() { let _t = support::trace_init(); - use std::error::Error as StdError; - use tower::{util::ServiceExt, Service}; let (service, mut handle) = mock::pair::<_, ()>();