Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add progress info to conda install #470

Merged
merged 2 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pixi.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

125 changes: 97 additions & 28 deletions src/install.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use crate::default_retry_policy;
use crate::progress::{default_progress_style, finished_progress_style, global_multi_progress};
use crate::progress::{
default_progress_style, finished_progress_style, global_multi_progress,
ProgressBarMessageFormatter,
};
use futures::future::ready;
use futures::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use indicatif::ProgressBar;
use itertools::Itertools;
use miette::{IntoDiagnostic, WrapErr};
use rattler::install::{
link_package, InstallDriver, InstallOptions, Transaction, TransactionOperation,
};
use rattler::package_cache::PackageCache;
use rattler_conda_types::{PrefixRecord, RepoDataRecord};
use rattler_networking::AuthenticatedClient;
use std::cmp::Ordering;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::time::Duration;
Expand Down Expand Up @@ -49,23 +53,61 @@ pub async fn execute_transaction(
.with_prefix("downloading"),
);
pb.enable_steady_tick(Duration::from_millis(100));
Some(pb)
Some(ProgressBarMessageFormatter::new(pb))
} else {
None
};

// Create a progress bar to track all operations.
let total_operations = transaction.operations.len();
let link_pb = multi_progress.add(
indicatif::ProgressBar::new(total_operations as u64)
.with_style(default_progress_style())
.with_finish(indicatif::ProgressFinish::WithMessage("Done!".into()))
.with_prefix("linking"),
);
link_pb.enable_steady_tick(Duration::from_millis(100));
let link_pb = {
let pb = multi_progress.add(
indicatif::ProgressBar::new(total_operations as u64)
.with_style(default_progress_style())
.with_finish(indicatif::ProgressFinish::WithMessage("Done!".into()))
.with_prefix("linking"),
);
pb.enable_steady_tick(Duration::from_millis(100));
ProgressBarMessageFormatter::new(pb)
};

// Sort the operations to try to optimize the installation time.
let sorted_operations = transaction
.operations
.iter()
.enumerate()
.sorted_unstable_by(|&(a_idx, a), &(b_idx, b)| {
// Sort the operations so we first install packages and then remove them. We do it in
// this order because downloading takes time so we want to do that as soon as possible
match (a.record_to_install(), b.record_to_install()) {
(Some(a), Some(b)) => {
// If we have two packages sort them by size, the biggest goes first.
let a_size = a.package_record.size.or(a.package_record.legacy_bz2_size);
let b_size = b.package_record.size.or(b.package_record.legacy_bz2_size);
if let (Some(a_size), Some(b_size)) = (a_size, b_size) {
match a_size.cmp(&b_size) {
Ordering::Less => return Ordering::Greater,
Ordering::Greater => return Ordering::Less,
Ordering::Equal => {}
}
}
}
(Some(_), None) => {
return Ordering::Less;
}
(None, Some(_)) => {
return Ordering::Greater;
}
_ => {}
}

// Otherwise keep the original order as much as possible.
a_idx.cmp(&b_idx)
})
.map(|(_, op)| op);

// Perform all transactions operations in parallel.
let result = stream::iter(transaction.operations.iter())
let result = stream::iter(sorted_operations.into_iter())
.map(Ok)
.try_for_each_concurrent(50, |op| {
let target_prefix = target_prefix.clone();
Expand Down Expand Up @@ -93,9 +135,9 @@ pub async fn execute_transaction(

// Clear progress bars
if let Some(download_pb) = download_pb {
download_pb.finish_and_clear();
download_pb.into_progress_bar().finish_and_clear();
}
link_pb.finish_and_clear();
link_pb.into_progress_bar().finish_and_clear();

result
}
Expand All @@ -108,8 +150,8 @@ async fn execute_operation(
download_client: AuthenticatedClient,
package_cache: &PackageCache,
install_driver: &InstallDriver,
download_pb: Option<&ProgressBar>,
link_pb: &ProgressBar,
download_pb: Option<&ProgressBarMessageFormatter>,
link_pb: &ProgressBarMessageFormatter,
op: &TransactionOperation<PrefixRecord, RepoDataRecord>,
install_options: &InstallOptions,
) -> miette::Result<()> {
Expand All @@ -119,14 +161,36 @@ async fn execute_operation(

// Create a future to remove the existing package
let remove_future = if let Some(remove_record) = remove_record {
remove_package_from_environment(target_prefix, remove_record).left_future()
link_pb
.wrap(
format!(
"removing {} {}",
&remove_record
.repodata_record
.package_record
.name
.as_source(),
&remove_record.repodata_record.package_record.version
),
remove_package_from_environment(target_prefix, remove_record),
)
.left_future()
} else {
ready(Ok(())).right_future()
};

// Create a future to download the package
let cached_package_dir_fut = if let Some(install_record) = install_record {
async {
let task = if let Some(pb) = download_pb {
Some(
pb.start(install_record.package_record.name.as_source().to_string())
.await,
)
} else {
None
};

// Make sure the package is available in the package cache.
let result = package_cache
.get_or_fetch_from_url_with_retry(
Expand All @@ -140,7 +204,8 @@ async fn execute_operation(
.into_diagnostic();

// Increment the download progress bar.
if let Some(pb) = download_pb {
if let Some(task) = task {
let pb = task.finish().await;
pb.inc(1);
if pb.length() == Some(pb.position()) {
pb.set_style(finished_progress_style());
Expand All @@ -159,20 +224,24 @@ async fn execute_operation(

// If there is a package to install, do that now.
if let Some((record, package_dir)) = install_package {
install_package_to_environment(
target_prefix,
package_dir,
record.clone(),
install_driver,
install_options,
)
.await?;
link_pb
.wrap(
record.package_record.name.as_source().to_string(),
install_package_to_environment(
target_prefix,
package_dir,
record.clone(),
install_driver,
install_options,
),
)
.await?;
}

// Increment the link progress bar since we finished a step!
link_pb.inc(1);
if link_pb.length() == Some(link_pb.position()) {
link_pb.set_style(finished_progress_style());
link_pb.progress_bar().inc(1);
if link_pb.progress_bar().length() == Some(link_pb.progress_bar().position()) {
link_pb.progress_bar().set_style(finished_progress_style());
}

Ok(())
Expand Down
33 changes: 31 additions & 2 deletions src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub async fn await_in_progress<T, F: Future<Output = T>>(
#[derive(Debug, Clone)]
pub struct ProgressBarMessageFormatter {
sender: Sender<Operation>,
pb: ProgressBar,
}

enum Operation {
Expand All @@ -122,6 +123,7 @@ enum Operation {
pub struct ScopedTask {
name: String,
sender: Option<Sender<Operation>>,
pb: ProgressBar,
}

impl Drop for ScopedTask {
Expand All @@ -136,20 +138,27 @@ impl Drop for ScopedTask {

impl ScopedTask {
/// Finishes the execution of the task.
pub async fn finish(mut self) {
pub async fn finish(mut self) -> ProgressBar {
// Send the finished operation. If this fails the receiving end was most likely already
// closed and we can just ignore the error.
if let Some(sender) = self.sender.take() {
let _ = sender
.send(Operation::Finished(std::mem::take(&mut self.name)))
.await;
}
self.pb.clone()
}

/// Returns the progress bar associated with the task
pub fn progress_bar(&self) -> &ProgressBar {
&self.pb
}
}

impl ProgressBarMessageFormatter {
/// Construct a new instance that will update the given progress bar.
pub fn new(progress_bar: ProgressBar) -> Self {
let pb = progress_bar.clone();
let (tx, mut rx) = channel::<Operation>(20);
tokio::spawn(async move {
let mut pending = VecDeque::with_capacity(20);
Expand All @@ -173,7 +182,12 @@ impl ProgressBarMessageFormatter {
}
}
});
Self { sender: tx }
Self { sender: tx, pb }
}

/// Returns the associated progress bar
pub fn progress_bar(&self) -> &ProgressBar {
&self.pb
}

/// Adds the start of another task to the progress bar and returns an object that is used to
Expand All @@ -187,6 +201,21 @@ impl ProgressBarMessageFormatter {
ScopedTask {
name: op,
sender: Some(self.sender.clone()),
pb: self.pb.clone(),
}
}

/// Wraps an future into a task which starts when the task starts and ends when the future
/// returns.
pub async fn wrap<T, F: Future<Output = T>>(&self, name: impl Into<String>, fut: F) -> T {
let task = self.start(name.into()).await;
let result = fut.await;
task.finish().await;
result
}

/// Convert this instance into the underlying progress bar.
pub fn into_progress_bar(self) -> ProgressBar {
self.pb
}
}
Loading