From 65b6d0b900995781dd4b8c881629a028cf09c738 Mon Sep 17 00:00:00 2001 From: arya2 Date: Mon, 29 Aug 2022 18:30:02 -0400 Subject: [PATCH 01/17] adds non-blocking writer for tracing subscriber --- Cargo.lock | 12 ++++++++++++ zebrad/Cargo.toml | 1 + zebrad/src/components/tracing/component.rs | 14 +++++++++++--- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 332b59dc58f..d265af0287e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5524,6 +5524,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" +dependencies = [ + "crossbeam-channel", + "time 0.3.7", + "tracing-subscriber 0.3.11", +] + [[package]] name = "tracing-attributes" version = "0.1.19" @@ -6630,6 +6641,7 @@ dependencies = [ "tonic-build", "tower", "tracing", + "tracing-appender", "tracing-error", "tracing-flame", "tracing-futures", diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index d33d2389476..fbf316b4d98 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -99,6 +99,7 @@ tinyvec = { version = "1.6.0", features = ["rustc_1_55"] } thiserror = "1.0.33" tracing-subscriber = { version = "0.3.11", features = ["env-filter"] } +tracing-appender = "0.2.2" tracing-error = "0.2.0" tracing-futures = "0.2.5" tracing = "0.1.31" diff --git a/zebrad/src/components/tracing/component.rs b/zebrad/src/components/tracing/component.rs index bab5675d100..7c37c38e4cd 100644 --- a/zebrad/src/components/tracing/component.rs +++ b/zebrad/src/components/tracing/component.rs @@ -24,6 +24,9 @@ pub struct Tracing { /// The installed flame graph collector, if enabled. #[cfg(feature = "flamegraph")] flamegrapher: Option, + + #[cfg(not(all(feature = "tokio-console", tokio_unstable)))] + _guard: tracing_appender::non_blocking::WorkerGuard, } impl Tracing { @@ -42,12 +45,15 @@ impl Tracing { // // TODO: when fmt::Subscriber supports per-layer filtering, always enable this code #[cfg(not(all(feature = "tokio-console", tokio_unstable)))] - let (subscriber, filter_handle) = { + let (subscriber, _guard, filter_handle) = { use tracing_subscriber::FmtSubscriber; + // By default, the built NonBlocking will be lossy. (with a line limit of 128_000) + let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout()); let logger = FmtSubscriber::builder() .with_ansi(use_color) - .with_env_filter(&filter); + .with_env_filter(&filter) + .with_writer(non_blocking); // Enable reloading if that feature is selected. #[cfg(feature = "filter-reload")] @@ -62,7 +68,7 @@ impl Tracing { let subscriber = logger.finish().with(ErrorLayer::default()); - (subscriber, filter_handle) + (subscriber, _guard, filter_handle) }; // Construct a tracing registry with the supplied per-layer logging filter, @@ -185,6 +191,8 @@ impl Tracing { initial_filter: filter, #[cfg(feature = "flamegraph")] flamegrapher, + #[cfg(not(all(feature = "tokio-console", tokio_unstable)))] + _guard, }) } From aad7f868032655f74c9354f34ad4fe8f43845bb8 Mon Sep 17 00:00:00 2001 From: arya2 Date: Mon, 29 Aug 2022 21:17:26 -0400 Subject: [PATCH 02/17] use non_blocking writer for the fmt::Layer with the tokio-console feature as well --- zebrad/src/components/tracing/component.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/zebrad/src/components/tracing/component.rs b/zebrad/src/components/tracing/component.rs index 7c37c38e4cd..97c7bd45498 100644 --- a/zebrad/src/components/tracing/component.rs +++ b/zebrad/src/components/tracing/component.rs @@ -6,6 +6,8 @@ use tracing_subscriber::{ fmt::Formatter, layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, EnvFilter, }; +use tracing_appender::non_blocking::WorkerGuard; + use crate::{application::app_version, config::TracingSection}; #[cfg(feature = "flamegraph")] @@ -25,8 +27,7 @@ pub struct Tracing { #[cfg(feature = "flamegraph")] flamegrapher: Option, - #[cfg(not(all(feature = "tokio-console", tokio_unstable)))] - _guard: tracing_appender::non_blocking::WorkerGuard, + _guard: WorkerGuard, } impl Tracing { @@ -34,6 +35,7 @@ impl Tracing { pub fn new(config: TracingSection) -> Result { let filter = config.filter.unwrap_or_else(|| "".to_string()); let flame_root = &config.flamegraph; + let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout()); // Only use color if tracing output is being sent to a terminal or if it was explicitly // forced to. @@ -45,10 +47,9 @@ impl Tracing { // // TODO: when fmt::Subscriber supports per-layer filtering, always enable this code #[cfg(not(all(feature = "tokio-console", tokio_unstable)))] - let (subscriber, _guard, filter_handle) = { + let (subscriber, filter_handle) = { use tracing_subscriber::FmtSubscriber; // By default, the built NonBlocking will be lossy. (with a line limit of 128_000) - let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout()); let logger = FmtSubscriber::builder() .with_ansi(use_color) @@ -68,7 +69,7 @@ impl Tracing { let subscriber = logger.finish().with(ErrorLayer::default()); - (subscriber, _guard, filter_handle) + (subscriber, filter_handle) }; // Construct a tracing registry with the supplied per-layer logging filter, @@ -88,6 +89,7 @@ impl Tracing { // Using `FmtSubscriber` as the base subscriber, all the logs get filtered. let logger = fmt::Layer::new() .with_ansi(use_color) + .with_writer(non_blocking) .with_filter(EnvFilter::from(&filter)); let subscriber = subscriber.with(logger); @@ -191,7 +193,6 @@ impl Tracing { initial_filter: filter, #[cfg(feature = "flamegraph")] flamegrapher, - #[cfg(not(all(feature = "tokio-console", tokio_unstable)))] _guard, }) } From 3428c092cfe99fd76c330192ca5e794c10901b95 Mon Sep 17 00:00:00 2001 From: arya2 Date: Wed, 31 Aug 2022 17:06:26 -0400 Subject: [PATCH 03/17] adds doc comment to _guard field --- zebrad/src/components/tracing/component.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/zebrad/src/components/tracing/component.rs b/zebrad/src/components/tracing/component.rs index 97c7bd45498..cb5ea9b935f 100644 --- a/zebrad/src/components/tracing/component.rs +++ b/zebrad/src/components/tracing/component.rs @@ -27,6 +27,8 @@ pub struct Tracing { #[cfg(feature = "flamegraph")] flamegrapher: Option, + /// Drop guard for worker thread of non-blocking logger, + /// responsible for flushing any remaining logs when the program terminates _guard: WorkerGuard, } @@ -35,6 +37,8 @@ impl Tracing { pub fn new(config: TracingSection) -> Result { let filter = config.filter.unwrap_or_else(|| "".to_string()); let flame_root = &config.flamegraph; + + // By default, the built NonBlocking will be lossy. (with a line limit of 128_000) let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout()); // Only use color if tracing output is being sent to a terminal or if it was explicitly @@ -49,7 +53,6 @@ impl Tracing { #[cfg(not(all(feature = "tokio-console", tokio_unstable)))] let (subscriber, filter_handle) = { use tracing_subscriber::FmtSubscriber; - // By default, the built NonBlocking will be lossy. (with a line limit of 128_000) let logger = FmtSubscriber::builder() .with_ansi(use_color) From f4e6abf35ed992d9622a51f9b4bb850c0fa0bb14 Mon Sep 17 00:00:00 2001 From: arya2 Date: Thu, 1 Sep 2022 14:00:45 -0400 Subject: [PATCH 04/17] adds acceptance test --- zebrad/tests/acceptance.rs | 45 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 47058a2e254..f7a7e3da8a5 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -1229,6 +1229,51 @@ async fn rpc_endpoint(parallel_cpu_threads: bool) -> Result<()> { Ok(()) } +#[tokio::test] +async fn non_blocking_logger() -> Result<()> { + let _init_guard = zebra_test::init(); + + // Write a configuration that has RPC listen_addr set + // [Note on port conflict](#Note on port conflict) + let mut config = random_known_rpc_port_config()?; + let zebra_rpc_address = config.rpc.listen_addr.unwrap(); + + let dir = testdir()?.with_config(&mut config)?; + let mut child = dir.spawn_child(args!["start"])?; + + // Wait until port is open. + child.expect_stdout_line_matches( + format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(), + )?; + + // Create an http client + let client = reqwest::Client::new(); + + for _ in 0..10_000 { + let res = client + .post(format!("http://{}", &zebra_rpc_address)) + .body(r#"{"jsonrpc":"1.0","method":"getinfo","params":[],"id":123}"#) + .header("Content-Type", "application/json") + .send() + .await?; + + // Test that zebrad rpc endpoint is still responding to requests + assert!(res.status().is_success()); + } + + child.kill(false)?; + + let output = child.wait_with_output()?; + let output = output.assert_failure()?; + + // [Note on port conflict](#Note on port conflict) + output + .assert_was_killed() + .wrap_err("Possible port conflict. Are there other acceptance tests running?")?; + + Ok(()) +} + /// Make sure `lightwalletd` works with Zebra, when both their states are empty. /// /// This test only runs when the `ZEBRA_TEST_LIGHTWALLETD` env var is set. From e9650c1a5e6975f8b23c922a612ee209fce7545a Mon Sep 17 00:00:00 2001 From: arya2 Date: Thu, 1 Sep 2022 17:02:52 -0400 Subject: [PATCH 05/17] update filter_handle type to use NonBlocking --- zebrad/src/components/tracing/component.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/zebrad/src/components/tracing/component.rs b/zebrad/src/components/tracing/component.rs index cb5ea9b935f..5ba243f6d5b 100644 --- a/zebrad/src/components/tracing/component.rs +++ b/zebrad/src/components/tracing/component.rs @@ -3,10 +3,14 @@ use abscissa_core::{Component, FrameworkError, Shutdown}; use tracing_error::ErrorLayer; use tracing_subscriber::{ - fmt::Formatter, layer::SubscriberExt, reload::Handle, util::SubscriberInitExt, EnvFilter, + fmt::{format, Formatter}, + layer::SubscriberExt, + reload::Handle, + util::SubscriberInitExt, + EnvFilter, }; -use tracing_appender::non_blocking::WorkerGuard; +use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; use crate::{application::app_version, config::TracingSection}; @@ -18,7 +22,12 @@ pub struct Tracing { /// The installed filter reloading handle, if enabled. // // TODO: when fmt::Subscriber supports per-layer filtering, remove the Option - filter_handle: Option>, + filter_handle: Option< + Handle< + EnvFilter, + Formatter, NonBlocking>, + >, + >, /// The originally configured filter. initial_filter: String, @@ -56,8 +65,8 @@ impl Tracing { let logger = FmtSubscriber::builder() .with_ansi(use_color) - .with_env_filter(&filter) - .with_writer(non_blocking); + .with_writer(non_blocking) + .with_env_filter(&filter); // Enable reloading if that feature is selected. #[cfg(feature = "filter-reload")] From 93e6678a8f051c885cf50c7c08484426b9342c1d Mon Sep 17 00:00:00 2001 From: arya2 Date: Thu, 1 Sep 2022 19:25:24 -0400 Subject: [PATCH 06/17] adds more detail on lossy non-blocking writer and sets tracing.filter to "trace" in acceptance test --- zebrad/src/components/tracing/component.rs | 3 ++- zebrad/tests/acceptance.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/zebrad/src/components/tracing/component.rs b/zebrad/src/components/tracing/component.rs index 5ba243f6d5b..450b804ba68 100644 --- a/zebrad/src/components/tracing/component.rs +++ b/zebrad/src/components/tracing/component.rs @@ -47,7 +47,8 @@ impl Tracing { let filter = config.filter.unwrap_or_else(|| "".to_string()); let flame_root = &config.flamegraph; - // By default, the built NonBlocking will be lossy. (with a line limit of 128_000) + // By default, the built NonBlocking will be lossy with a line limit of 128_000, lines sent to the worker past + // this limit will be dropped without being written to stdout let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout()); // Only use color if tracing output is being sent to a terminal or if it was explicitly diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index f7a7e3da8a5..3cfdb962604 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -1237,6 +1237,7 @@ async fn non_blocking_logger() -> Result<()> { // [Note on port conflict](#Note on port conflict) let mut config = random_known_rpc_port_config()?; let zebra_rpc_address = config.rpc.listen_addr.unwrap(); + config.tracing.filter = Some("trace".to_string()); let dir = testdir()?.with_config(&mut config)?; let mut child = dir.spawn_child(args!["start"])?; From 27207651cd2eb27a24ac86e091b87b750a21b6ba Mon Sep 17 00:00:00 2001 From: arya2 Date: Thu, 1 Sep 2022 20:26:21 -0400 Subject: [PATCH 07/17] drops ZebradApp before process::exit(1) in the event of a FrameworkError --- zebrad/src/application.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/zebrad/src/application.rs b/zebrad/src/application.rs index 6ebd51c987b..49d6c7f20a7 100644 --- a/zebrad/src/application.rs +++ b/zebrad/src/application.rs @@ -6,7 +6,7 @@ use self::entry_point::EntryPoint; use std::{fmt::Write as _, io::Write as _, process}; use abscissa_core::{ - application::{self, fatal_error, AppCell}, + application::{self, AppCell}, config::{self, Configurable}, status_err, terminal::{component::Terminal, stderr, stdout, ColorChoice}, @@ -18,6 +18,13 @@ use zebra_state::constants::{DATABASE_FORMAT_VERSION, LOCK_FILE_ERROR}; use crate::{commands::ZebradCmd, components::tracing::Tracing, config::ZebradConfig}; +/// See https://docs.rs/abscissa_core/latest/src/abscissa_core/application/exit.rs.html#7-10Z +/// Print a fatal error message and exit +fn fatal_error(app_name: String, err: &dyn std::error::Error) { + status_err!("{} fatal error: {}", app_name, err); + process::exit(1) +} + /// Application state pub static APPLICATION: AppCell = AppCell::new(); @@ -462,7 +469,9 @@ impl Application for ZebradApp { let _ = stderr().lock().flush(); if let Err(e) = self.state().components.shutdown(self, shutdown) { - fatal_error(self, &e) + let app_name = self.name().to_string(); + let _ = std::mem::take(self); + fatal_error(app_name, &e); } // Swap out a fake app so we can trigger the destructor on the original From 11a7cf4589ebb285139bfc708ac4054ac0a78a14 Mon Sep 17 00:00:00 2001 From: arya2 Date: Thu, 1 Sep 2022 21:15:34 -0400 Subject: [PATCH 08/17] reduces buffered lines limit to 8000 --- zebrad/src/components/tracing/component.rs | 12 ++++++++---- zebrad/tests/acceptance.rs | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/zebrad/src/components/tracing/component.rs b/zebrad/src/components/tracing/component.rs index 450b804ba68..5e4d7236cd6 100644 --- a/zebrad/src/components/tracing/component.rs +++ b/zebrad/src/components/tracing/component.rs @@ -10,7 +10,7 @@ use tracing_subscriber::{ EnvFilter, }; -use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; +use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard}; use crate::{application::app_version, config::TracingSection}; @@ -47,9 +47,13 @@ impl Tracing { let filter = config.filter.unwrap_or_else(|| "".to_string()); let flame_root = &config.flamegraph; - // By default, the built NonBlocking will be lossy with a line limit of 128_000, lines sent to the worker past - // this limit will be dropped without being written to stdout - let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout()); + // Builds a lossy NonBlocking logger with a line limit of 8_000, lines sent to the worker past + // this limit (via the write method which sends it to a crossbeam::bounded channel) + // will be dropped without being written to stdout + let (non_blocking, _guard) = NonBlockingBuilder::default() + .lossy(true) + .buffered_lines_limit(8_000) + .finish(std::io::stdout()); // Only use color if tracing output is being sent to a terminal or if it was explicitly // forced to. diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 3cfdb962604..61a4e4f1348 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -1250,7 +1250,7 @@ async fn non_blocking_logger() -> Result<()> { // Create an http client let client = reqwest::Client::new(); - for _ in 0..10_000 { + for _ in 0..20_000 { let res = client .post(format!("http://{}", &zebra_rpc_address)) .body(r#"{"jsonrpc":"1.0","method":"getinfo","params":[],"id":123}"#) From 1f227a8cf328dbe96340cf7680f5f545e5f15c83 Mon Sep 17 00:00:00 2001 From: arya2 Date: Fri, 2 Sep 2022 13:56:54 -0400 Subject: [PATCH 09/17] adds tracing.buffer_limit config and some comments --- zebra-state/src/service/finalized_state.rs | 13 +++++++++++++ zebrad/src/application.rs | 4 +++- zebrad/src/components/tracing/component.rs | 9 ++++----- zebrad/src/config.rs | 7 +++++++ zebrad/tests/acceptance.rs | 7 +++++-- 5 files changed, 32 insertions(+), 8 deletions(-) diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index cc2207b9e95..343c7ed56f1 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -106,6 +106,11 @@ impl FinalizedState { // So we want to drop it before we exit. std::mem::drop(new_state); + // Drops tracing log output that's hasn't already been written to stdout + // since this exits before calling drop on the WorkerGuard for the logger thread. + // This is okay for now because this is test-only code + // + // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout Self::exit_process(); } } @@ -307,6 +312,11 @@ impl FinalizedState { // We're just about to do a forced exit, so it's ok to do a forced db shutdown self.db.shutdown(true); + // Drops tracing log output that's hasn't already been written to stdout + // since this exits before calling drop on the WorkerGuard for the logger thread. + // This is okay for now because this is test-only code + // + // TODO: Call ZebradApp.shutdown or drop its Tracing component before calling exit_process to flush logs to stdout Self::exit_process(); } @@ -343,6 +353,9 @@ impl FinalizedState { let _ = stdout().lock().flush(); let _ = stderr().lock().flush(); + // Exits before calling drop on the WorkerGuard for the logger thread, + // dropping any lines that haven't already been written to stdout. + // This is okay for now because this is test-only code std::process::exit(0); } } diff --git a/zebrad/src/application.rs b/zebrad/src/application.rs index 49d6c7f20a7..607105ecbd4 100644 --- a/zebrad/src/application.rs +++ b/zebrad/src/application.rs @@ -20,7 +20,7 @@ use crate::{commands::ZebradCmd, components::tracing::Tracing, config::ZebradCon /// See https://docs.rs/abscissa_core/latest/src/abscissa_core/application/exit.rs.html#7-10Z /// Print a fatal error message and exit -fn fatal_error(app_name: String, err: &dyn std::error::Error) { +fn fatal_error(app_name: String, err: &dyn std::error::Error) -> ! { status_err!("{} fatal error: {}", app_name, err); process::exit(1) } @@ -470,6 +470,8 @@ impl Application for ZebradApp { if let Err(e) = self.state().components.shutdown(self, shutdown) { let app_name = self.name().to_string(); + + // Swap out a fake app so we can trigger the destructor on the original let _ = std::mem::take(self); fatal_error(app_name, &e); } diff --git a/zebrad/src/components/tracing/component.rs b/zebrad/src/components/tracing/component.rs index 5e4d7236cd6..d2bca164a3d 100644 --- a/zebrad/src/components/tracing/component.rs +++ b/zebrad/src/components/tracing/component.rs @@ -47,12 +47,11 @@ impl Tracing { let filter = config.filter.unwrap_or_else(|| "".to_string()); let flame_root = &config.flamegraph; - // Builds a lossy NonBlocking logger with a line limit of 8_000, lines sent to the worker past - // this limit (via the write method which sends it to a crossbeam::bounded channel) - // will be dropped without being written to stdout + // Builds a lossy NonBlocking logger with a default line limit of 128_000 or an explicit buffer_limit. + // The write method queues lines down a bounded channel with this capacity to a worker thread that writes to stdout. + // Increments error_counter and drops lines when the buffer is full. let (non_blocking, _guard) = NonBlockingBuilder::default() - .lossy(true) - .buffered_lines_limit(8_000) + .buffered_lines_limit(config.buffer_limit.max(100)) .finish(std::io::stdout()); // Only use color if tracing output is being sent to a terminal or if it was explicitly diff --git a/zebrad/src/config.rs b/zebrad/src/config.rs index da5c2322c56..8622857704f 100644 --- a/zebrad/src/config.rs +++ b/zebrad/src/config.rs @@ -93,6 +93,12 @@ pub struct TracingSection { /// verification of every 1000th block. pub filter: Option, + /// The buffer_limit size sets the number of log lines that can be queued by the tracing subscriber + /// to be written to stdout before logs are dropped. + /// + /// Defaults to 128,000 with a minimum of 100. + pub buffer_limit: usize, + /// The address used for an ad-hoc RPC endpoint allowing dynamic control of the tracing filter. /// /// Install Zebra using `cargo install --features=filter-reload` to enable this config. @@ -140,6 +146,7 @@ impl Default for TracingSection { use_color: true, force_use_color: false, filter: None, + buffer_limit: 128_000, endpoint_addr: None, flamegraph: None, use_journald: false, diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 61a4e4f1348..f3fa8fbd75d 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -1236,8 +1236,9 @@ async fn non_blocking_logger() -> Result<()> { // Write a configuration that has RPC listen_addr set // [Note on port conflict](#Note on port conflict) let mut config = random_known_rpc_port_config()?; - let zebra_rpc_address = config.rpc.listen_addr.unwrap(); config.tracing.filter = Some("trace".to_string()); + config.tracing.buffer_limit = 100; + let zebra_rpc_address = config.rpc.listen_addr.unwrap(); let dir = testdir()?.with_config(&mut config)?; let mut child = dir.spawn_child(args!["start"])?; @@ -1250,7 +1251,9 @@ async fn non_blocking_logger() -> Result<()> { // Create an http client let client = reqwest::Client::new(); - for _ in 0..20_000 { + // Most of Zebra's lines are 100-200 characters long, so 500 requests should print enough to fill the unix pipe, + // fill the channel logs are queued onto, and drop logs rather than block execution. + for _ in 0..5_000 { let res = client .post(format!("http://{}", &zebra_rpc_address)) .body(r#"{"jsonrpc":"1.0","method":"getinfo","params":[],"id":123}"#) From 31769ad043c57d26c5a805297fbbc7ddeee6f44d Mon Sep 17 00:00:00 2001 From: arya2 Date: Fri, 2 Sep 2022 15:45:41 -0400 Subject: [PATCH 10/17] update acceptance.rs --- Cargo.lock | 2 +- zebrad/tests/acceptance.rs | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index d265af0287e..1fe07ffbcb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5531,7 +5531,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" dependencies = [ "crossbeam-channel", - "time 0.3.7", + "time 0.3.14", "tracing-subscriber 0.3.11", ] diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index f3fa8fbd75d..ae75b405245 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -1248,6 +1248,11 @@ async fn non_blocking_logger() -> Result<()> { format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(), )?; + let mut cmd2 = std::process::Command::new("echo") + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .spawn()?; + // Create an http client let client = reqwest::Client::new(); @@ -1265,6 +1270,7 @@ async fn non_blocking_logger() -> Result<()> { assert!(res.status().is_success()); } + cmd2.kill()?; child.kill(false)?; let output = child.wait_with_output()?; From 18397ca374bc21f5899f91a61694b90c127f4049 Mon Sep 17 00:00:00 2001 From: arya2 Date: Fri, 2 Sep 2022 16:44:29 -0400 Subject: [PATCH 11/17] fix acceptance test --- zebrad/tests/acceptance.rs | 102 +++++++++++++++++++++---------------- 1 file changed, 59 insertions(+), 43 deletions(-) diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index ae75b405245..3b175869d0b 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -1229,59 +1229,75 @@ async fn rpc_endpoint(parallel_cpu_threads: bool) -> Result<()> { Ok(()) } -#[tokio::test] -async fn non_blocking_logger() -> Result<()> { - let _init_guard = zebra_test::init(); +#[test] +fn non_blocking_logger() -> Result<()> { + use futures::FutureExt as _; - // Write a configuration that has RPC listen_addr set - // [Note on port conflict](#Note on port conflict) - let mut config = random_known_rpc_port_config()?; - config.tracing.filter = Some("trace".to_string()); - config.tracing.buffer_limit = 100; - let zebra_rpc_address = config.rpc.listen_addr.unwrap(); + let rt = tokio::runtime::Runtime::new().unwrap(); - let dir = testdir()?.with_config(&mut config)?; - let mut child = dir.spawn_child(args!["start"])?; + let test_task_handle: tokio::task::JoinHandle> = rt.spawn(async { + let _init_guard = zebra_test::init(); - // Wait until port is open. - child.expect_stdout_line_matches( - format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(), - )?; + // Write a configuration that has RPC listen_addr set + // [Note on port conflict](#Note on port conflict) + let mut config = random_known_rpc_port_config()?; + config.tracing.filter = Some("trace".to_string()); + config.tracing.buffer_limit = 100; + let zebra_rpc_address = config.rpc.listen_addr.unwrap(); - let mut cmd2 = std::process::Command::new("echo") - .stdin(Stdio::piped()) - .stdout(Stdio::null()) - .spawn()?; + let dir = testdir()?.with_config(&mut config)?; + let mut child = dir.spawn_child(args!["start"])?; + // Wait until port is open. + child.expect_stdout_line_matches( + format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(), + )?; + + let mut cmd2 = std::process::Command::new("echo") + .stdin(Stdio::piped()) + .stdout(Stdio::null()) + .spawn()?; + + // Create an http client + let client = reqwest::Client::new(); + + // Most of Zebra's lines are 100-200 characters long, so 500 requests should print enough to fill the unix pipe, + // fill the channel logs are queued onto, and drop logs rather than block execution. + for _ in 0..5_000 { + let res = client + .post(format!("http://{}", &zebra_rpc_address)) + .body(r#"{"jsonrpc":"1.0","method":"getinfo","params":[],"id":123}"#) + .header("Content-Type", "application/json") + .send() + .await?; + + // Test that zebrad rpc endpoint is still responding to requests + assert!(res.status().is_success()); + } - // Create an http client - let client = reqwest::Client::new(); + cmd2.kill()?; + child.kill(false)?; - // Most of Zebra's lines are 100-200 characters long, so 500 requests should print enough to fill the unix pipe, - // fill the channel logs are queued onto, and drop logs rather than block execution. - for _ in 0..5_000 { - let res = client - .post(format!("http://{}", &zebra_rpc_address)) - .body(r#"{"jsonrpc":"1.0","method":"getinfo","params":[],"id":123}"#) - .header("Content-Type", "application/json") - .send() - .await?; - - // Test that zebrad rpc endpoint is still responding to requests - assert!(res.status().is_success()); - } + let output = child.wait_with_output()?; + let output = output.assert_failure()?; - cmd2.kill()?; - child.kill(false)?; + // [Note on port conflict](#Note on port conflict) + output + .assert_was_killed() + .wrap_err("Possible port conflict. Are there other acceptance tests running?")?; - let output = child.wait_with_output()?; - let output = output.assert_failure()?; + Ok(()) + }); - // [Note on port conflict](#Note on port conflict) - output - .assert_was_killed() - .wrap_err("Possible port conflict. Are there other acceptance tests running?")?; + // Wait until the spawned task finishes + std::thread::sleep(Duration::from_secs(10)); - Ok(()) + rt.shutdown_timeout(Duration::from_secs(3)); + + match test_task_handle.now_or_never() { + Some(Ok(result)) => result, + Some(Err(_)) => Err(eyre!("join error")), + None => Err(eyre!("unexpected test task hang")), + } } /// Make sure `lightwalletd` works with Zebra, when both their states are empty. From 9b9c7bb7474c4d514345a151b008978861e16e99 Mon Sep 17 00:00:00 2001 From: arya2 Date: Fri, 2 Sep 2022 16:57:19 -0400 Subject: [PATCH 12/17] fixes ambigious phrasing in comment --- zebrad/tests/acceptance.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 3b175869d0b..4519a043ec7 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -1261,7 +1261,7 @@ fn non_blocking_logger() -> Result<()> { let client = reqwest::Client::new(); // Most of Zebra's lines are 100-200 characters long, so 500 requests should print enough to fill the unix pipe, - // fill the channel logs are queued onto, and drop logs rather than block execution. + // fill the channel that tracing logs are queued onto, and drop logs rather than block execution. for _ in 0..5_000 { let res = client .post(format!("http://{}", &zebra_rpc_address)) From 77176873817cae38f9cf3ea2a60f24f9df126d43 Mon Sep 17 00:00:00 2001 From: arya2 Date: Fri, 2 Sep 2022 18:41:15 -0400 Subject: [PATCH 13/17] updates zebrad/src/application.rs --- zebrad/src/application.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zebrad/src/application.rs b/zebrad/src/application.rs index 607105ecbd4..521fec67afd 100644 --- a/zebrad/src/application.rs +++ b/zebrad/src/application.rs @@ -18,7 +18,7 @@ use zebra_state::constants::{DATABASE_FORMAT_VERSION, LOCK_FILE_ERROR}; use crate::{commands::ZebradCmd, components::tracing::Tracing, config::ZebradConfig}; -/// See https://docs.rs/abscissa_core/latest/src/abscissa_core/application/exit.rs.html#7-10Z +/// See /// Print a fatal error message and exit fn fatal_error(app_name: String, err: &dyn std::error::Error) -> ! { status_err!("{} fatal error: {}", app_name, err); From fc6a983e20042c1a4bc8a593c674730d5b5643e3 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 5 Sep 2022 10:31:05 +1000 Subject: [PATCH 14/17] Find out what the join error is in the GitHub runner tests --- zebrad/tests/acceptance.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 4519a043ec7..5675ecd58fe 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -1295,7 +1295,7 @@ fn non_blocking_logger() -> Result<()> { match test_task_handle.now_or_never() { Some(Ok(result)) => result, - Some(Err(_)) => Err(eyre!("join error")), + Some(Err(error)) => Err(eyre!("join error: {:?}", error)), None => Err(eyre!("unexpected test task hang")), } } From 590bb914f65ffd8fd0da57446077bf6f6f644e21 Mon Sep 17 00:00:00 2001 From: arya2 Date: Mon, 5 Sep 2022 14:43:06 -0400 Subject: [PATCH 15/17] updates acceptance test to use recv_timeout instead of always waiting 10 seconds, removes unnecessary echo command, and reduces # of rpc requests to 500 --- zebrad/tests/acceptance.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 5675ecd58fe..36c2f71b3be 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -1231,16 +1231,18 @@ async fn rpc_endpoint(parallel_cpu_threads: bool) -> Result<()> { #[test] fn non_blocking_logger() -> Result<()> { - use futures::FutureExt as _; + use futures::FutureExt; + use std::{sync::mpsc, time::Duration}; let rt = tokio::runtime::Runtime::new().unwrap(); + let (done_tx, done_rx) = mpsc::channel(); - let test_task_handle: tokio::task::JoinHandle> = rt.spawn(async { + let test_task_handle: tokio::task::JoinHandle> = rt.spawn(async move { let _init_guard = zebra_test::init(); // Write a configuration that has RPC listen_addr set // [Note on port conflict](#Note on port conflict) - let mut config = random_known_rpc_port_config()?; + let mut config = random_known_rpc_port_config(false)?; config.tracing.filter = Some("trace".to_string()); config.tracing.buffer_limit = 100; let zebra_rpc_address = config.rpc.listen_addr.unwrap(); @@ -1252,17 +1254,12 @@ fn non_blocking_logger() -> Result<()> { format!("Opened RPC endpoint at {}", config.rpc.listen_addr.unwrap()).as_str(), )?; - let mut cmd2 = std::process::Command::new("echo") - .stdin(Stdio::piped()) - .stdout(Stdio::null()) - .spawn()?; - // Create an http client let client = reqwest::Client::new(); // Most of Zebra's lines are 100-200 characters long, so 500 requests should print enough to fill the unix pipe, // fill the channel that tracing logs are queued onto, and drop logs rather than block execution. - for _ in 0..5_000 { + for _ in 0..500 { let res = client .post(format!("http://{}", &zebra_rpc_address)) .body(r#"{"jsonrpc":"1.0","method":"getinfo","params":[],"id":123}"#) @@ -1274,7 +1271,6 @@ fn non_blocking_logger() -> Result<()> { assert!(res.status().is_success()); } - cmd2.kill()?; child.kill(false)?; let output = child.wait_with_output()?; @@ -1285,11 +1281,15 @@ fn non_blocking_logger() -> Result<()> { .assert_was_killed() .wrap_err("Possible port conflict. Are there other acceptance tests running?")?; + done_tx.send(())?; + Ok(()) }); - // Wait until the spawned task finishes - std::thread::sleep(Duration::from_secs(10)); + // Wait until the spawned task finishes or return an error in 45 seconds + if done_rx.recv_timeout(Duration::from_secs(45)).is_err() { + return Err(eyre!("unexpected test task hang")); + } rt.shutdown_timeout(Duration::from_secs(3)); From 175579cf61fa2e5c87f2da0a82018cb8d7519585 Mon Sep 17 00:00:00 2001 From: arya2 Date: Tue, 6 Sep 2022 16:22:22 -0400 Subject: [PATCH 16/17] see if sleeping for a few seconds before exiting helps the macOS test pass --- zebra-state/src/service/finalized_state.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 343c7ed56f1..c351fe2e527 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -353,6 +353,9 @@ impl FinalizedState { let _ = stdout().lock().flush(); let _ = stderr().lock().flush(); + // Give some time to logger thread to flush out any remaining lines to stdout + std::thread::sleep(std::time::Duration::from_secs(3)); + // Exits before calling drop on the WorkerGuard for the logger thread, // dropping any lines that haven't already been written to stdout. // This is okay for now because this is test-only code From 3e203d78244f46351ea457674f7af3f9dc8681c3 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 7 Sep 2022 12:41:15 +1000 Subject: [PATCH 17/17] Expand exit sleep docs Co-authored-by: Arya --- zebra-state/src/service/finalized_state.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index c351fe2e527..afa1b4f17f1 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -354,6 +354,7 @@ impl FinalizedState { let _ = stderr().lock().flush(); // Give some time to logger thread to flush out any remaining lines to stdout + // and yield so that tests pass on MacOS std::thread::sleep(std::time::Duration::from_secs(3)); // Exits before calling drop on the WorkerGuard for the logger thread,