Skip to content

Commit

Permalink
Fixes #18818: Update relayd to tokio 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
amousset committed Jan 12, 2021
1 parent 652b7b7 commit 2bb09d8
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 376 deletions.
559 changes: 237 additions & 322 deletions relay/sources/relayd/Cargo.lock

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions relay/sources/relayd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,38 @@ name = "benches"

[dependencies]
anyhow = "1"
bytes = "0.5"
bytes = "1"
chrono = { version = "0.4", default-features = false, features = ["clock", "std", "serde"] }
diesel = { version = "1", default-features = false, features = ["postgres", "chrono", "r2d2"] }
# Uses rust implementation by default
flate2 = "1"
futures = "0.3"
hex = "0.4"
humantime = "2"
# 0.13.9 for tokio 0.3
hyper = { version = "0.13.8", default-features = false }
# 0.9 for tokio 0.3
inotify = "0.8"
lazy_static = "1.4"
hyper = { version = "0.14", default-features = false }
inotify = "0.9"
lazy_static = "1"
log = "0.4"
md-5 = "0.9"
nom = "6"
openssl = "0.10"
prometheus = { version = "0.11", default-features = false, features = ["process"] }
regex = "1"
# Use openssl for TLS to be consistent
reqwest = { version = "0.10", default-features = false, features = ["stream", "blocking", "native-tls"] }
reqwest = { version = "0.11", default-features = false, features = ["stream", "blocking", "native-tls"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.9"
structopt = { version = "0.3", default-features = false }
thiserror = "1"
tokio = { version = "0.2", default-features = false, features = ["rt-threaded", "io-driver", "io-std", "process", "macros", "stream", "signal", "fs", "blocking"] }
tokio = { version = "1", default-features = false, features = [ "rt-multi-thread", "process", "macros", "signal", "fs"] }
tokio-stream = { version = "0.1", default-features = false, features = ["io-util"] }
toml = "0.5"
# Compile dev and release with trace logs enabled
tracing = { version = "0.1", features = ["max_level_trace", "release_max_level_trace"] }
tracing-subscriber = { version = "0.2", default-features = false, features = ["env-filter", "smallvec", "fmt", "tracing-log"] }
warp = { version = "0.2", default-features = false }
## FIXME replace by 0.3 release
warp = { git = "https://github.com/aknuds1/warp", branch = "chore/upgrade-tokio", default-features = false }
# Use rust implementation
zip = { version = "0.5", default-features = false, features = ["deflate"] }

Expand Down
4 changes: 2 additions & 2 deletions relay/sources/relayd/deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
allow = [
"MIT",
"Apache-2.0",
"GPL-3.0",
"ISC",
"Zlib",
"BSD-2-Clause",
"BSD-3-Clause",
"GPL-3.0",
]

[bans]
# Lint level for when multiple versions of the same crate are detected
multiple-versions = "allow"

4 changes: 2 additions & 2 deletions relay/sources/relayd/src/api/remote_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, Command},
};
use tokio_stream::wrappers::LinesStream;
use tracing::{debug, error, span, trace, Level};
use warp::{body, filters::method, path, Filter, Reply};

Expand Down Expand Up @@ -472,8 +473,7 @@ impl RunParameters {
.expect("child did not have a handle to stdout");

Box::new(
BufReader::new(stdout)
.lines()
LinesStream::new(BufReader::new(stdout).lines())
.map_err(Error::from)
.inspect(|line| trace!("output: {:?}", line))
.map(|r| {
Expand Down
2 changes: 1 addition & 1 deletion relay/sources/relayd/src/api/shared_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
JobConfig,
};
use anyhow::Error;
use bytes::{buf::BufExt, Bytes};
use bytes::{Buf, Bytes};
use chrono::Utc;
use humantime::parse_duration;
use serde::{Deserialize, Serialize};
Expand Down
10 changes: 2 additions & 8 deletions relay/sources/relayd/src/configuration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,8 @@ pub struct GeneralConfig {
pub listen: String,
/// None means using the number of available CPUs
pub core_threads: Option<usize>,
/// Deprecated in 7.0, tokio changed the way it counts threads
/// Max number of threads for the blocking operations
pub blocking_threads: Option<usize>,
/// Total number of threads used by tokio
// Take default from tokio, currently 512
// https://docs.rs/tokio/0.2.23/tokio/runtime/struct.Builder.html#method.max_threads
pub max_threads: Option<usize>,
}

impl GeneralConfig {
Expand Down Expand Up @@ -484,7 +480,6 @@ mod tests {
node_id: "root".to_string(),
listen: "127.0.0.1:3030".parse().unwrap(),
core_threads: None,
max_threads: None,
blocking_threads: None,
},
processing: ProcessingConfig {
Expand Down Expand Up @@ -572,8 +567,7 @@ mod tests {
node_id: "root".to_string(),
listen: "127.0.0.1:3030".parse().unwrap(),
core_threads: None,
max_threads: Some(512),
blocking_threads: None,
blocking_threads: Some(512),
},
processing: ProcessingConfig {
inventory: InventoryConfig {
Expand Down
12 changes: 6 additions & 6 deletions relay/sources/relayd/src/input/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ pub async fn cleanup(path: WatchedDirectory, cfg: CleanupConfig) -> Result<(), E
continue;
}
};
while let Some(entry) = files.next().await {
let entry = match entry {
Ok(e) => e,
loop {
let entry = match files.next_entry().await {
Ok(Some(e)) => e,
// Nothing to do
Ok(None) => break,
Err(e) => {
error!("entry error: {}", e);
continue;
Expand Down Expand Up @@ -103,9 +105,7 @@ async fn list_files(
}
};

while let Some(entry) = files.next().await {
let entry = entry?;

while let Some(entry) = files.next_entry().await? {
let metadata = entry.metadata().await?;
let since = sys_time
.duration_since(metadata.modified().unwrap_or(sys_time))
Expand Down
25 changes: 6 additions & 19 deletions relay/sources/relayd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tokio::{
signal::unix::{signal, SignalKind},
sync::RwLock,
};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info};
use tracing_subscriber::{
filter::EnvFilter,
fmt::{
Expand Down Expand Up @@ -125,29 +125,16 @@ pub fn start(cli_cfg: CliConfiguration, reload_handle: LogHandle) -> Result<(),

// Optimize for big servers: use multi-threaded scheduler

let mut builder = tokio::runtime::Builder::new();
let mut builder = tokio::runtime::Builder::new_multi_thread();
if let Some(threads) = job_config.cfg.general.core_threads {
builder.core_threads(threads);
}
if let Some(threads) = job_config.cfg.general.max_threads {
builder.max_threads(threads);
builder.worker_threads(threads);
}
if let Some(threads) = job_config.cfg.general.blocking_threads {
warn!("blocking_threads is deprecated, replaced by max_threads");

if job_config.cfg.general.max_threads.is_some() {
warn!("max_threads was provided, ignoring blocking_threads");
} else {
warn!("using blocking_threads value as max_threads");
// max_threads ~= core_threads + blocking_threads
// and core_threads (=num cores) << blocking_threads so it is should
// be good enough to approximate
builder.max_threads(threads);
}
builder.max_blocking_threads(threads);
}
let mut runtime = builder.threaded_scheduler().enable_all().build()?;
let runtime = builder.enable_all().build()?;

// TODO: recheck panic/error behavior on tokio 0.2
// TODO: recheck panic/error behavior on tokio 1.0
// don't use block_on_all as it panics on main future panic but not others
runtime.block_on(async {
// Setup signal handlers first
Expand Down
2 changes: 1 addition & 1 deletion relay/sources/relayd/tests/files/config/main.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ listen = "127.0.0.1:3030"

# By default, the number of CPUs
#core_threads = "4"
max_threads = 512
blocking_threads = 512

[processing.inventory]
directory = "target/tmp/inventories/"
Expand Down
2 changes: 1 addition & 1 deletion relay/sources/relayd/tests/processing_reporting_old.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn it_cleans_old_reports() {
set_file_times(file_very_old, FileTime::zero(), FileTime::zero()).unwrap();

thread::spawn(move || {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();

rt.block_on(cleanup(
PathBuf::from("target/tmp/reporting_old/incoming"),
Expand Down
3 changes: 1 addition & 2 deletions relay/sources/relayd/tools/config/main.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ listen = "127.0.0.1:3030"

# By default, the number of CPUs
#core_threads = 4
#max_threads = 512
# blocking_threads is deprecated, replaced by max_threads
#blocking_threads = 512

### Processing

Expand Down
4 changes: 1 addition & 3 deletions rudder-lang/deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
allow = [
"MIT",
"Apache-2.0",
"ISC",
"BSD-2-Clause",
"BSD-3-Clause",
"GPL-3.0",
"Zlib",
"MPL-2.0",
]

Expand Down

0 comments on commit 2bb09d8

Please sign in to comment.