Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish parallelizing materialization of Process inputs #18469

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/rust/engine/process_execution/docker/src/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,7 @@ impl<'a> process_execution::CommandRunner for CommandRunner<'a> {
workdir.path().to_owned(),
&req,
req.input_digests.inputs.clone(),
self.store.clone(),
self.executor.clone(),
&self.store,
&named_caches,
&self.immutable_inputs,
Some(Path::new(NAMED_CACHES_BASE_PATH_IN_CONTAINER)),
Expand Down
3 changes: 1 addition & 2 deletions src/rust/engine/process_execution/pe_nailgun/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ impl process_execution::CommandRunner for CommandRunner {
nailgun_process.workdir_path().to_owned(),
&client_req,
client_req.input_digests.inputs.clone(),
self.store.clone(),
self.executor.clone(),
&self.store,
&self.named_caches,
&self.immutable_inputs,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,7 @@ impl NailgunProcess {
workdir.path().to_owned(),
&startup_options,
startup_options.input_digests.inputs.clone(),
store.clone(),
executor.clone(),
&store,
named_caches,
immutable_inputs,
None,
Expand Down
251 changes: 116 additions & 135 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
// Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::ffi::OsStr;
use std::fmt::{self, Debug};
use std::fs::create_dir_all;
use std::io::Write;
use std::ops::Neg;
use std::os::unix::{
fs::{symlink, OpenOptionsExt},
process::ExitStatusExt,
};
use std::os::unix::{fs::OpenOptionsExt, process::ExitStatusExt};
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::str;
Expand All @@ -19,16 +15,18 @@ use std::time::Instant;
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use fs::{
self, DirectoryDigest, GlobExpansionConjunction, GlobMatching, PathGlobs, Permissions,
RelativePath, StrictGlobMatching, SymlinkBehavior, EMPTY_DIRECTORY_DIGEST,
self, DigestTrie, DirectoryDigest, GlobExpansionConjunction, GlobMatching, PathGlobs,
Permissions, RelativePath, StrictGlobMatching, SymlinkBehavior, TypedPath,
EMPTY_DIRECTORY_DIGEST,
};
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
use futures::{try_join, FutureExt, TryFutureExt};
use log::{debug, info};
use nails::execution::ExitCode;
use shell_quote::bash;
use store::{
ImmutableInputs, OneOffStoreFileByDigest, Snapshot, Store, StoreError, WorkdirSymlink,
ImmutableInputs, OneOffStoreFileByDigest, Snapshot, SnapshotOps, Store, StoreError,
WorkdirSymlink,
};
use task_executor::Executor;
use tempfile::TempDir;
Expand Down Expand Up @@ -276,8 +274,7 @@ impl super::CommandRunner for CommandRunner {
workdir.path().to_owned(),
&req,
req.input_digests.inputs.clone(),
self.store.clone(),
self.executor.clone(),
&self.store,
&self.named_caches,
&self.immutable_inputs,
None,
Expand Down Expand Up @@ -631,52 +628,43 @@ pub fn apply_chroot(chroot_path: &str, req: &mut Process) {
}
}

/// Prepares the given workdir for use by the given Process.
///
/// Returns true if the executable for the Process was created in the workdir, indicating that
/// `exclusive_spawn` is required.
///
/// TODO: Both the symlinks for named_caches/immutable_inputs and the empty output directories
/// required by the spec should be created via a synthetic Digest containing SymlinkNodes and
/// the empty output directories. That would:
/// 1. improve validation that nothing we create collides.
/// 2. allow for materialization to safely occur fully in parallel, rather than partially
/// synchronously in the background.
///
pub async fn prepare_workdir(
workdir_path: PathBuf,
/// Creates a Digest for the entire input sandbox contents of the given Process, including absolute
/// symlinks to immutable inputs, named caches, and JDKs (if configured).
pub async fn prepare_workdir_digest(
req: &Process,
materialized_input_digest: DirectoryDigest,
store: Store,
executor: Executor,
input_digest: DirectoryDigest,
store: &Store,
named_caches: &NamedCaches,
immutable_inputs: &ImmutableInputs,
immutable_inputs: Option<&ImmutableInputs>,
named_caches_prefix: Option<&Path>,
immutable_inputs_prefix: Option<&Path>,
) -> Result<bool, StoreError> {
// Collect the symlinks to create for immutable inputs or named caches.
let immutable_inputs_symlinks = {
let symlinks = immutable_inputs
.local_paths(&req.input_digests.immutable_inputs)
.await?;
) -> Result<DirectoryDigest, StoreError> {
let mut paths = Vec::new();

match immutable_inputs_prefix {
Some(prefix) => symlinks
.into_iter()
.map(|symlink| WorkdirSymlink {
src: symlink.src,
dst: prefix.join(
symlink
.dst
.strip_prefix(immutable_inputs.workdir())
.unwrap(),
),
})
.collect::<Vec<_>>(),
None => symlinks,
// Symlinks for immutable inputs and named caches.
let mut workdir_symlinks = Vec::new();
{
if let Some(immutable_inputs) = immutable_inputs {
let symlinks = immutable_inputs
.local_paths(&req.input_digests.immutable_inputs)
.await?;

match immutable_inputs_prefix {
Some(prefix) => workdir_symlinks.extend(symlinks.into_iter().map(|symlink| {
WorkdirSymlink {
src: symlink.src,
dst: prefix.join(
symlink
.dst
.strip_prefix(immutable_inputs.workdir())
.unwrap(),
),
}
})),
None => workdir_symlinks.extend(symlinks),
}
}
};
let named_caches_symlinks = {

let symlinks = named_caches
.paths(&req.append_only_caches)
.await
Expand All @@ -686,21 +674,59 @@ pub async fn prepare_workdir(
))
})?;
match named_caches_prefix {
Some(prefix) => symlinks
.into_iter()
.map(|symlink| WorkdirSymlink {
src: symlink.src,
dst: prefix.join(symlink.dst.strip_prefix(named_caches.base_path()).unwrap()),
})
.collect::<Vec<_>>(),
None => symlinks,
Some(prefix) => workdir_symlinks.extend(symlinks.into_iter().map(|symlink| WorkdirSymlink {
src: symlink.src,
dst: prefix.join(symlink.dst.strip_prefix(named_caches.base_path()).unwrap()),
})),
None => workdir_symlinks.extend(symlinks),
}
};
let workdir_symlinks = immutable_inputs_symlinks
.into_iter()
.chain(named_caches_symlinks.into_iter())
.collect::<Vec<_>>();
}
paths.extend(workdir_symlinks.iter().map(|symlink| TypedPath::Link {
path: &symlink.src,
target: &symlink.dst,
}));

// Symlink for JDK.
if let Some(jdk_home) = &req.jdk_home {
paths.push(TypedPath::Link {
path: Path::new(".jdk"),
target: jdk_home,
});
}

// The bazel remote execution API specifies that the parent directories for output files and
// output directories should be created before execution completes.
let parent_paths_to_create: HashSet<_> = req
.output_files
.iter()
.chain(req.output_directories.iter())
.filter_map(|rel_path| rel_path.as_ref().parent())
.filter(|parent| !parent.as_os_str().is_empty())
.collect();
paths.extend(parent_paths_to_create.into_iter().map(TypedPath::Dir));

// Finally, create a tree for all of the additional paths, and merge it with the input
// Digest.
let additions = DigestTrie::from_unique_paths(paths, &HashMap::new())?;

store.merge(vec![input_digest, additions.into()]).await
}

/// Prepares the given workdir for use by the given Process.
///
/// Returns true if the executable for the Process was created in the workdir, indicating that
/// `exclusive_spawn` is required.
///
pub async fn prepare_workdir(
workdir_path: PathBuf,
req: &Process,
materialized_input_digest: DirectoryDigest,
store: &Store,
named_caches: &NamedCaches,
immutable_inputs: &ImmutableInputs,
named_caches_prefix: Option<&Path>,
immutable_inputs_prefix: Option<&Path>,
) -> Result<bool, StoreError> {
// Capture argv0 as the executable path so that we can test whether we have created it in the
// sandbox.
let maybe_executable_path = {
Expand All @@ -709,89 +735,44 @@ pub async fn prepare_workdir(
if let Some(working_directory) = &req.working_directory {
executable_path = working_directory.as_ref().join(executable_path)
}
Some(executable_path)
Some(workdir_path.join(executable_path))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a bug-fix picked up along the way? Afaict this is new over and above the shuffle on the LHS.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bug fix: we have a good test for this. I just pushed up the joining of the workdir_path to avoid an unnecessary clone.

} else {
None
}
};

// Start with async materialization of input snapshots, followed by synchronous materialization
// of other configured inputs. Note that we don't do this in parallel, as that might cause
// non-determinism when paths overlap: see the method doc.
let store2 = store.clone();
let workdir_path_2 = workdir_path.clone();
let mut mutable_paths = req.output_files.clone();
mutable_paths.extend(req.output_directories.clone());
// Prepare the digest to use, and then materialize it.
in_workunit!("setup_sandbox", Level::Debug, |_workunit| async move {
store2
let complete_input_digest = prepare_workdir_digest(
req,
materialized_input_digest,
store,
named_caches,
Some(immutable_inputs),
named_caches_prefix,
immutable_inputs_prefix,
)
.await?;

let mut mutable_paths = req.output_files.clone();
mutable_paths.extend(req.output_directories.clone());
store
.materialize_directory(
workdir_path_2,
materialized_input_digest,
workdir_path,
complete_input_digest,
&mutable_paths,
Some(immutable_inputs),
Permissions::Writable,
)
.await
})
.await?;

let workdir_path2 = workdir_path.clone();
let output_file_paths = req.output_files.clone();
let output_dir_paths = req.output_directories.clone();
let maybe_jdk_home = req.jdk_home.clone();
let exclusive_spawn = executor
.spawn_blocking(
move || {
if let Some(jdk_home) = maybe_jdk_home {
symlink(jdk_home, workdir_path2.join(".jdk"))
.map_err(|err| format!("Error making JDK symlink for local execution: {err:?}"))?
}

// The bazel remote execution API specifies that the parent directories for output files and
// output directories should be created before execution completes: see the method doc.
let parent_paths_to_create: HashSet<_> = output_file_paths
.iter()
.chain(output_dir_paths.iter())
.map(|relative_path| relative_path.as_ref())
.chain(workdir_symlinks.iter().map(|s| s.src.as_path()))
.filter_map(|rel_path| rel_path.parent())
.map(|parent_relpath| workdir_path2.join(parent_relpath))
.collect();
for path in parent_paths_to_create {
create_dir_all(path.clone()).map_err(|err| {
format!("Error making parent directory {path:?} for local execution: {err:?}")
})?;
}

for workdir_symlink in workdir_symlinks {
let src = workdir_path2.join(&workdir_symlink.src);
symlink(&workdir_symlink.dst, &src).map_err(|err| {
format!(
"Error linking {} -> {} for local execution: {:?}",
src.display(),
workdir_symlink.dst.display(),
err
)
})?;
}
.await?;

let exe_was_materialized = maybe_executable_path
.as_ref()
.map_or(false, |p| workdir_path2.join(p).exists());
if exe_was_materialized {
debug!(
"Obtaining exclusive spawn lock for process since \
we materialized its executable {:?}.",
maybe_executable_path
);
}
let res: Result<_, String> = Ok(exe_was_materialized);
res
},
|e| Err(format!("Directory materialization task failed: {e}")),
)
.await?;
Ok(exclusive_spawn)
if let Some(executable_path) = maybe_executable_path {
Ok(tokio::fs::metadata(executable_path).await.is_ok())
} else {
Ok(false)
}
})
.await
}

///
Expand Down
3 changes: 1 addition & 2 deletions src/rust/engine/process_execution/src/local_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,7 @@ async fn prepare_workdir_exclusive_relative() {
work_dir.path().to_owned(),
&process,
TestDirectory::recursive().directory_digest(),
store,
executor,
&store,
&named_caches,
&immutable_inputs,
None,
Expand Down
3 changes: 1 addition & 2 deletions src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,7 @@ fn interactive_process(
tempdir.path().to_owned(),
&process,
process.input_digests.inputs.clone(),
context.core.store(),
context.core.executor.clone(),
&context.core.store(),
&context.core.named_caches,
&context.core.immutable_inputs,
None,
Expand Down