Skip to content

Commit

Permalink
allow for empty progress writer and remove conditional logic
Browse files Browse the repository at this point in the history
To use progresswriter in more places, we should not have to check if it
is None before using it. Always initialize it, perhaps being empty, to
allow its use.

This raises the question of whether it should be a singleton, but for
now leave it in the relevant functions.
  • Loading branch information
antheas committed Nov 27, 2024
1 parent e2074d0 commit 9615af7
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 113 deletions.
28 changes: 19 additions & 9 deletions lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use serde::{Deserialize, Serialize};
use crate::deploy::RequiredHostSpec;
use crate::lints;
use crate::progress_jsonl;
use crate::progress_jsonl::ProgressWriter;
use crate::spec::Host;
use crate::spec::ImageReference;
use crate::utils::sigpolicy_from_opts;
Expand Down Expand Up @@ -56,7 +57,7 @@ pub(crate) struct UpgradeOpts {
pub(crate) apply: bool,

/// Pipe download progress to this fd in a jsonl format.
#[clap(long)]
#[clap(long, conflicts_with = "quiet")]
pub(crate) json_fd: Option<RawFd>,
}

Expand Down Expand Up @@ -109,7 +110,7 @@ pub(crate) struct SwitchOpts {
pub(crate) target: String,

/// Pipe download progress to this fd in a jsonl format.
#[clap(long)]
#[clap(long, conflicts_with = "quiet")]
pub(crate) json_fd: Option<RawFd>,
}

Expand Down Expand Up @@ -624,9 +625,10 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
let (booted_deployment, _deployments, host) =
crate::status::get_status_require_booted(sysroot)?;
let imgref = host.spec.image.as_ref();
let prog = opts
.json_fd
.map(progress_jsonl::ProgressWriter::from_raw_fd);
let prog = opts.json_fd.map_or_else(
progress_jsonl::ProgressWriter::from_empty,
progress_jsonl::ProgressWriter::from_raw_fd,
);

// If there's no specified image, let's be nice and check if the booted system is using rpm-ostree
if imgref.is_none() {
Expand Down Expand Up @@ -743,9 +745,10 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
);
let target = ostree_container::OstreeImageReference { sigverify, imgref };
let target = ImageReference::from(target);
let prog = opts
.json_fd
.map(progress_jsonl::ProgressWriter::from_raw_fd);
let prog = opts.json_fd.map_or_else(
progress_jsonl::ProgressWriter::from_empty,
progress_jsonl::ProgressWriter::from_raw_fd,
);

// If we're doing an in-place mutation, we shortcut most of the rest of the work here
if opts.mutate_in_place {
Expand Down Expand Up @@ -843,7 +846,14 @@ async fn edit(opts: EditOpts) -> Result<()> {
return crate::deploy::rollback(sysroot).await;
}

let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, None).await?;
let fetched = crate::deploy::pull(
repo,
new_spec.image,
None,
opts.quiet,
ProgressWriter::from_empty(),
)
.await?;

// TODO gc old layers here

Expand Down
130 changes: 40 additions & 90 deletions lib/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,21 @@ fn prefix_of_progress(p: &ImportProgress) -> &'static str {
async fn handle_layer_progress_print(
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
n_layers_to_fetch: usize,
download_bytes: u64,
layers_download: usize,
layers_total: usize,
bytes_download: u64,
bytes_total: u64,
mut prog: ProgressWriter,
) {
let start = std::time::Instant::now();
let mut total_read = 0u64;
let bar = indicatif::MultiProgress::new();
let layers_bar = bar.add(indicatif::ProgressBar::new(
n_layers_to_fetch.try_into().unwrap(),
layers_download.try_into().unwrap(),
));
let byte_bar = bar.add(indicatif::ProgressBar::new(0));
let total_byte_bar = bar.add(indicatif::ProgressBar::new(download_bytes));
let total_byte_bar = bar.add(indicatif::ProgressBar::new(bytes_download));
let mut last_json_written = std::time::Instant::now();
// let byte_bar = indicatif::ProgressBar::new(0);
// byte_bar.set_draw_target(indicatif::ProgressDrawTarget::hidden());
println!("");
Expand Down Expand Up @@ -207,8 +211,25 @@ async fn handle_layer_progress_print(
}
let bytes = layer_bytes.borrow();
if let Some(bytes) = &*bytes {
let bytes_done = total_read + bytes.fetched;
byte_bar.set_position(bytes.fetched);
total_byte_bar.set_position(total_read + bytes.fetched);
total_byte_bar.set_position(bytes_done);

// Lets update the json output only on bytes fetched
// They are common enough, anyhow. Debounce on time.
let curr = std::time::Instant::now();
if curr.duration_since(last_json_written).as_secs_f64() > 0.2 {
prog.send(ProgressStage::Fetching {
bytes_done,
bytes_download,
bytes_total,
layers_done: layers_bar.position() as usize,
layers_download,
layers_total,
layers: None,
});
last_json_written = curr;
}
}
}
}
Expand All @@ -232,73 +253,14 @@ async fn handle_layer_progress_print(
}
}

/// Write container fetch progress to standard output.
async fn handle_layer_progress_print_jsonl(
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
layers_download: usize,
layers_total: usize,
bytes_download: u64,
bytes_total: u64,
mut prog: ProgressWriter,
) {
let mut total_read = 0u64;
let mut layers_done: usize = 0;
let mut last_json_written = std::time::Instant::now();
loop {
tokio::select! {
// Always handle layer changes first.
biased;
layer = layers.recv() => {
if let Some(l) = layer {
if !l.is_starting() {
let layer = descriptor_of_progress(&l);
layers_done += 1;
total_read += total_read.saturating_add(layer.size());
}
} else {
// If the receiver is disconnected, then we're done
break
};
},
r = layer_bytes.changed() => {
if r.is_err() {
// If the receiver is disconnected, then we're done
break
}
let bytes = layer_bytes.borrow();
if let Some(bytes) = &*bytes {
let bytes_done = total_read + bytes.fetched;

// Lets update the json output only on bytes fetched
// They are common enough, anyhow. Debounce on time.
let curr = std::time::Instant::now();
if curr.duration_since(last_json_written).as_secs_f64() > 0.2 {
prog.send(ProgressStage::Fetching {
bytes_done,
bytes_download,
bytes_total,
layers_done,
layers_download,
layers_total,
layers: None,
});
last_json_written = curr;
}
}
}
}
}
}

/// Wrapper for pulling a container image, wiring up status output.
#[context("Pulling")]
pub(crate) async fn pull(
repo: &ostree::Repo,
imgref: &ImageReference,
target_imgref: Option<&OstreeImageReference>,
quiet: bool,
prog: Option<ProgressWriter>,
prog: ProgressWriter,
) -> Result<Box<ImageState>> {
let ostree_imgref = &OstreeImageReference::from(imgref.clone());
let mut imp = new_importer(repo, ostree_imgref).await?;
Expand All @@ -323,33 +285,21 @@ pub(crate) async fn pull(
let bytes_download: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum();
let bytes_total: u64 = prep.all_layers().map(|l| l.layer.size()).sum();

let printer = (!quiet || prog.is_some()).then(|| {
let printer = (!quiet).then(|| {
let layer_progress = imp.request_progress();
let layer_byte_progress = imp.request_layer_progress();
if let Some(prog) = prog {
tokio::task::spawn(async move {
handle_layer_progress_print_jsonl(
layer_progress,
layer_byte_progress,
layers_download,
layers_total,
bytes_download,
bytes_total,
prog,
)
.await
})
} else {
tokio::task::spawn(async move {
handle_layer_progress_print(
layer_progress,
layer_byte_progress,
layers_download,
bytes_download,
)
.await
})
}
tokio::task::spawn(async move {
handle_layer_progress_print(
layer_progress,
layer_byte_progress,
layers_download,
layers_total,
bytes_download,
bytes_total,
prog,
)
.await
})
});
let import = imp.import(prep).await;
if let Some(printer) = printer {
Expand Down
11 changes: 9 additions & 2 deletions lib/src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::boundimage::{BoundImage, ResolvedBoundImage};
use crate::containerenv::ContainerExecutionInfo;
use crate::lsm;
use crate::mount::Filesystem;
use crate::progress_jsonl::ProgressWriter;
use crate::spec::ImageReference;
use crate::store::Storage;
use crate::task::Task;
Expand Down Expand Up @@ -744,8 +745,14 @@ async fn install_container(
let spec_imgref = ImageReference::from(src_imageref.clone());
let repo = &sysroot.repo();
repo.set_disable_fsync(true);
let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false, None)
.await?;
let r = crate::deploy::pull(
repo,
&spec_imgref,
Some(&state.target_imgref),
false,
ProgressWriter::from_empty(),
)
.await?;
repo.set_disable_fsync(false);
r
};
Expand Down
32 changes: 20 additions & 12 deletions lib/src/progress_jsonl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ pub enum ProgressStage {
}

pub(crate) struct ProgressWriter {
fd: BufWriter<fs::File>,
closed: bool,
fd: Option<BufWriter<fs::File>>,
}

impl From<fs::File> for ProgressWriter {
fn from(value: fs::File) -> Self {
Self {
fd: BufWriter::new(value),
closed: false,
fd: Some(BufWriter::new(value)),
}
}
}
Expand All @@ -53,32 +51,42 @@ impl ProgressWriter {
unsafe { fs::File::from_raw_fd(fd) }.into()
}

pub(crate) fn from_empty() -> Self {
Self { fd: None }
}

/// Serialize the target object to JSON as a single line
pub(crate) fn send_unchecked<T: Serialize>(&mut self, v: T) -> Result<()> {
if self.fd.is_none() {
return Ok(());
}
let mut fd = self.fd.as_mut().unwrap();

// serde is guaranteed not to output newlines here
serde_json::to_writer(&mut self.fd, &v)?;
serde_json::to_writer(&mut fd, &v)?;
// We always end in a newline
self.fd.write_all(b"\n")?;
fd.write_all(b"\n")?;
// And flush to ensure the remote side sees updates immediately
self.fd.flush()?;
fd.flush()?;
Ok(())
}

pub(crate) fn send<T: Serialize>(&mut self, v: T) {
if self.closed {
return;
}
if let Err(e) = self.send_unchecked(v) {
eprintln!("Failed to write to jsonl: {}", e);
// Stop writing to fd but let process continue
self.closed = true;
self.fd = None;
}
}

/// Flush remaining data and return the underlying file.
#[allow(dead_code)]
pub(crate) fn into_inner(self) -> Result<fs::File> {
self.fd.into_inner().map_err(Into::into)
if let Some(fd) = self.fd {
return fd.into_inner().map_err(Into::into);
} else {
return Err(anyhow::anyhow!("File descriptor closed/never existed."));
}
}
}

Expand Down

0 comments on commit 9615af7

Please sign in to comment.