Skip to content

Commit

Permalink
Auto merge of #707 - Mark-Simulacrum:no-state, r=Mark-Simulacrum
Browse files Browse the repository at this point in the history
Parallelize log uploads

This should drastically speed up report generation.
  • Loading branch information
bors committed Oct 29, 2023
2 parents 00ec081 + c0882fe commit a0cb14c
Showing 1 changed file with 70 additions and 34 deletions.
104 changes: 70 additions & 34 deletions src/report/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use mime::{self, Mime};
use percent_encoding::{utf8_percent_encode, AsciiSet};
use std::borrow::Cow;
#[cfg(test)]
use std::cell::RefCell;
#[cfg(test)]
use std::collections::HashMap;
use std::convert::AsRef;
use std::fmt::{self, Display};
Expand Down Expand Up @@ -281,40 +279,76 @@ fn write_logs<DB: ReadResults, W: ReportWriter>(
) -> Fallible<()> {
let num_crates = crates.len();
let progress_every = (num_crates / PROGRESS_FRACTION) + 1;
for (i, krate) in crates.iter().enumerate() {
if i % progress_every == 0 {
info!("wrote logs for {}/{} crates", i, num_crates)
}

if config.should_skip(krate) {
continue;
let errors = std::sync::Mutex::new(vec![]);
std::thread::scope(|s| {
let mut channels = vec![];
// This isn't really related to the number of cores on the system, since these threads are
// mostly driving network-related traffic. 8 is a reasonable number to not overwhelm
// systems while keeping things moving much faster than fully serial uploads.
for _ in 0..8 {
let (tx, rx) = std::sync::mpsc::sync_channel::<(PathBuf, Vec<u8>, EncodingType)>(32);
channels.push(tx);
let errors = &errors;
s.spawn(move || {
while let Ok((log_path, data, encoding)) = rx.recv() {
if let Err(e) =
dest.write_bytes(log_path, data, &mime::TEXT_PLAIN_UTF_8, encoding)
{
errors.lock().unwrap().push(e);
}
}
});
}

for tc in &ex.toolchains {
let log_path =
crate_to_path_fragment(tc, krate, SanitizationContext::Path).join("log.txt");
let content = db
.load_log(ex, tc, krate)
.and_then(|c| c.ok_or_else(|| err_msg("missing logs")))
.with_context(|_| format!("failed to read log of {krate} on {tc}"));
let content = match content {
Ok(c) => c,
Err(e) => {
utils::report_failure(&e);
continue;
}
};
for (i, krate) in crates.iter().enumerate() {
if i % progress_every == 0 {
info!("wrote logs for {}/{} crates", i, num_crates)
}

match content {
EncodedLog::Plain(data) => {
dest.write_bytes(log_path, data, &mime::TEXT_PLAIN_UTF_8, EncodingType::Plain)
}
EncodedLog::Gzip(data) => {
dest.write_bytes(log_path, data, &mime::TEXT_PLAIN_UTF_8, EncodingType::Gzip)
if config.should_skip(krate) {
continue;
}

for tc in &ex.toolchains {
let log_path =
crate_to_path_fragment(tc, krate, SanitizationContext::Path).join("log.txt");
let content = db
.load_log(ex, tc, krate)
.and_then(|c| c.ok_or_else(|| err_msg("missing logs")))
.with_context(|_| format!("failed to read log of {krate} on {tc}"));
let content = match content {
Ok(c) => c,
Err(e) => {
utils::report_failure(&e);
continue;
}
};

match content {
EncodedLog::Plain(data) => {
channels[i % channels.len()]
.send((log_path, data, EncodingType::Plain))
.unwrap();
}
EncodedLog::Gzip(data) => {
channels[i % channels.len()]
.send((log_path, data, EncodingType::Gzip))
.unwrap();
}
}
}?;
}
}
});

let mut errors = errors.into_inner().unwrap();
for error in errors.iter() {
utils::report_failure(&failure::format_err!("Logging upload failed: {:?}", error));
}
if !errors.is_empty() {
return Err(errors.remove(0));
}

Ok(())
}

Expand Down Expand Up @@ -510,7 +544,7 @@ fn compare(
}
}

pub trait ReportWriter {
pub trait ReportWriter: Send + Sync {
fn write_bytes<P: AsRef<Path>>(
&self,
path: P,
Expand Down Expand Up @@ -565,14 +599,15 @@ impl Display for FileWriter {
#[cfg(test)]
#[derive(Default)]
pub struct DummyWriter {
results: RefCell<HashMap<(PathBuf, Mime), Vec<u8>>>,
results: std::sync::Mutex<HashMap<(PathBuf, Mime), Vec<u8>>>,
}

#[cfg(test)]
impl DummyWriter {
pub fn get<P: AsRef<Path>>(&self, path: P, mime: &Mime) -> Vec<u8> {
self.results
.borrow()
.lock()
.unwrap()
.get(&(path.as_ref().to_path_buf(), mime.clone()))
.unwrap()
.clone()
Expand All @@ -589,13 +624,14 @@ impl ReportWriter for DummyWriter {
_: EncodingType,
) -> Fallible<()> {
self.results
.borrow_mut()
.lock()
.unwrap()
.insert((path.as_ref().to_path_buf(), mime.clone()), b);
Ok(())
}

fn write_string<P: AsRef<Path>>(&self, path: P, s: Cow<str>, mime: &Mime) -> Fallible<()> {
self.results.borrow_mut().insert(
self.results.lock().unwrap().insert(
(path.as_ref().to_path_buf(), mime.clone()),
s.bytes().collect(),
);
Expand Down

0 comments on commit a0cb14c

Please sign in to comment.