Skip to content

Commit

Permalink
Finish parallelizing materialization of Process inputs (#18469)
Browse files Browse the repository at this point in the history
For a long time, the:
1. symlinks for immutable inputs, named caches, or the JDK
2. parent directories of output paths

... have been created synchronously in `prepare_workdir` before the call
to `materialize_directory` for the input digest.

As described in the TODO on `prepare_workdir`, that prevented validation
of collisions between those paths, and also meant that it was less
parallel than it could be.

This change switches to using the new symlinks-in-`Digest`s support that
@thejcannon added to create a `Digest` that fully describes the sandbox,
and then materialize that.
  • Loading branch information
stuhood authored Mar 11, 2023
1 parent d06bcaa commit f8cc00b
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 145 deletions.
3 changes: 1 addition & 2 deletions src/rust/engine/process_execution/docker/src/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,7 @@ impl<'a> process_execution::CommandRunner for CommandRunner<'a> {
workdir.path().to_owned(),
&req,
req.input_digests.inputs.clone(),
self.store.clone(),
self.executor.clone(),
&self.store,
&named_caches,
&self.immutable_inputs,
Some(Path::new(NAMED_CACHES_BASE_PATH_IN_CONTAINER)),
Expand Down
3 changes: 1 addition & 2 deletions src/rust/engine/process_execution/pe_nailgun/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ impl process_execution::CommandRunner for CommandRunner {
nailgun_process.workdir_path().to_owned(),
&client_req,
client_req.input_digests.inputs.clone(),
self.store.clone(),
self.executor.clone(),
&self.store,
&self.named_caches,
&self.immutable_inputs,
None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,7 @@ impl NailgunProcess {
workdir.path().to_owned(),
&startup_options,
startup_options.input_digests.inputs.clone(),
store.clone(),
executor.clone(),
store,
named_caches,
immutable_inputs,
None,
Expand Down
251 changes: 116 additions & 135 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
// Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::ffi::OsStr;
use std::fmt::{self, Debug};
use std::fs::create_dir_all;
use std::io::Write;
use std::ops::Neg;
use std::os::unix::{
fs::{symlink, OpenOptionsExt},
process::ExitStatusExt,
};
use std::os::unix::{fs::OpenOptionsExt, process::ExitStatusExt};
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::str;
Expand All @@ -19,16 +15,18 @@ use std::time::Instant;
use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use fs::{
self, DirectoryDigest, GlobExpansionConjunction, GlobMatching, PathGlobs, Permissions,
RelativePath, StrictGlobMatching, SymlinkBehavior, EMPTY_DIRECTORY_DIGEST,
self, DigestTrie, DirectoryDigest, GlobExpansionConjunction, GlobMatching, PathGlobs,
Permissions, RelativePath, StrictGlobMatching, SymlinkBehavior, TypedPath,
EMPTY_DIRECTORY_DIGEST,
};
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
use futures::{try_join, FutureExt, TryFutureExt};
use log::{debug, info};
use nails::execution::ExitCode;
use shell_quote::bash;
use store::{
ImmutableInputs, OneOffStoreFileByDigest, Snapshot, Store, StoreError, WorkdirSymlink,
ImmutableInputs, OneOffStoreFileByDigest, Snapshot, SnapshotOps, Store, StoreError,
WorkdirSymlink,
};
use task_executor::Executor;
use tempfile::TempDir;
Expand Down Expand Up @@ -276,8 +274,7 @@ impl super::CommandRunner for CommandRunner {
workdir.path().to_owned(),
&req,
req.input_digests.inputs.clone(),
self.store.clone(),
self.executor.clone(),
&self.store,
&self.named_caches,
&self.immutable_inputs,
None,
Expand Down Expand Up @@ -631,52 +628,43 @@ pub fn apply_chroot(chroot_path: &str, req: &mut Process) {
}
}

/// Prepares the given workdir for use by the given Process.
///
/// Returns true if the executable for the Process was created in the workdir, indicating that
/// `exclusive_spawn` is required.
///
/// TODO: Both the symlinks for named_caches/immutable_inputs and the empty output directories
/// required by the spec should be created via a synthetic Digest containing SymlinkNodes and
/// the empty output directories. That would:
/// 1. improve validation that nothing we create collides.
/// 2. allow for materialization to safely occur fully in parallel, rather than partially
/// synchronously in the background.
///
pub async fn prepare_workdir(
workdir_path: PathBuf,
/// Creates a Digest for the entire input sandbox contents of the given Process, including absolute
/// symlinks to immutable inputs, named caches, and JDKs (if configured).
pub async fn prepare_workdir_digest(
req: &Process,
materialized_input_digest: DirectoryDigest,
store: Store,
executor: Executor,
input_digest: DirectoryDigest,
store: &Store,
named_caches: &NamedCaches,
immutable_inputs: &ImmutableInputs,
immutable_inputs: Option<&ImmutableInputs>,
named_caches_prefix: Option<&Path>,
immutable_inputs_prefix: Option<&Path>,
) -> Result<bool, StoreError> {
// Collect the symlinks to create for immutable inputs or named caches.
let immutable_inputs_symlinks = {
let symlinks = immutable_inputs
.local_paths(&req.input_digests.immutable_inputs)
.await?;
) -> Result<DirectoryDigest, StoreError> {
let mut paths = Vec::new();

match immutable_inputs_prefix {
Some(prefix) => symlinks
.into_iter()
.map(|symlink| WorkdirSymlink {
src: symlink.src,
dst: prefix.join(
symlink
.dst
.strip_prefix(immutable_inputs.workdir())
.unwrap(),
),
})
.collect::<Vec<_>>(),
None => symlinks,
// Symlinks for immutable inputs and named caches.
let mut workdir_symlinks = Vec::new();
{
if let Some(immutable_inputs) = immutable_inputs {
let symlinks = immutable_inputs
.local_paths(&req.input_digests.immutable_inputs)
.await?;

match immutable_inputs_prefix {
Some(prefix) => workdir_symlinks.extend(symlinks.into_iter().map(|symlink| {
WorkdirSymlink {
src: symlink.src,
dst: prefix.join(
symlink
.dst
.strip_prefix(immutable_inputs.workdir())
.unwrap(),
),
}
})),
None => workdir_symlinks.extend(symlinks),
}
}
};
let named_caches_symlinks = {

let symlinks = named_caches
.paths(&req.append_only_caches)
.await
Expand All @@ -686,21 +674,59 @@ pub async fn prepare_workdir(
))
})?;
match named_caches_prefix {
Some(prefix) => symlinks
.into_iter()
.map(|symlink| WorkdirSymlink {
src: symlink.src,
dst: prefix.join(symlink.dst.strip_prefix(named_caches.base_path()).unwrap()),
})
.collect::<Vec<_>>(),
None => symlinks,
Some(prefix) => workdir_symlinks.extend(symlinks.into_iter().map(|symlink| WorkdirSymlink {
src: symlink.src,
dst: prefix.join(symlink.dst.strip_prefix(named_caches.base_path()).unwrap()),
})),
None => workdir_symlinks.extend(symlinks),
}
};
let workdir_symlinks = immutable_inputs_symlinks
.into_iter()
.chain(named_caches_symlinks.into_iter())
.collect::<Vec<_>>();
}
paths.extend(workdir_symlinks.iter().map(|symlink| TypedPath::Link {
path: &symlink.src,
target: &symlink.dst,
}));

// Symlink for JDK.
if let Some(jdk_home) = &req.jdk_home {
paths.push(TypedPath::Link {
path: Path::new(".jdk"),
target: jdk_home,
});
}

// The bazel remote execution API specifies that the parent directories for output files and
// output directories should be created before execution completes.
let parent_paths_to_create: HashSet<_> = req
.output_files
.iter()
.chain(req.output_directories.iter())
.filter_map(|rel_path| rel_path.as_ref().parent())
.filter(|parent| !parent.as_os_str().is_empty())
.collect();
paths.extend(parent_paths_to_create.into_iter().map(TypedPath::Dir));

// Finally, create a tree for all of the additional paths, and merge it with the input
// Digest.
let additions = DigestTrie::from_unique_paths(paths, &HashMap::new())?;

store.merge(vec![input_digest, additions.into()]).await
}

/// Prepares the given workdir for use by the given Process.
///
/// Returns true if the executable for the Process was created in the workdir, indicating that
/// `exclusive_spawn` is required.
///
pub async fn prepare_workdir(
workdir_path: PathBuf,
req: &Process,
materialized_input_digest: DirectoryDigest,
store: &Store,
named_caches: &NamedCaches,
immutable_inputs: &ImmutableInputs,
named_caches_prefix: Option<&Path>,
immutable_inputs_prefix: Option<&Path>,
) -> Result<bool, StoreError> {
// Capture argv0 as the executable path so that we can test whether we have created it in the
// sandbox.
let maybe_executable_path = {
Expand All @@ -709,89 +735,44 @@ pub async fn prepare_workdir(
if let Some(working_directory) = &req.working_directory {
executable_path = working_directory.as_ref().join(executable_path)
}
Some(executable_path)
Some(workdir_path.join(executable_path))
} else {
None
}
};

// Start with async materialization of input snapshots, followed by synchronous materialization
// of other configured inputs. Note that we don't do this in parallel, as that might cause
// non-determinism when paths overlap: see the method doc.
let store2 = store.clone();
let workdir_path_2 = workdir_path.clone();
let mut mutable_paths = req.output_files.clone();
mutable_paths.extend(req.output_directories.clone());
// Prepare the digest to use, and then materialize it.
in_workunit!("setup_sandbox", Level::Debug, |_workunit| async move {
store2
let complete_input_digest = prepare_workdir_digest(
req,
materialized_input_digest,
store,
named_caches,
Some(immutable_inputs),
named_caches_prefix,
immutable_inputs_prefix,
)
.await?;

let mut mutable_paths = req.output_files.clone();
mutable_paths.extend(req.output_directories.clone());
store
.materialize_directory(
workdir_path_2,
materialized_input_digest,
workdir_path,
complete_input_digest,
&mutable_paths,
Some(immutable_inputs),
Permissions::Writable,
)
.await
})
.await?;

let workdir_path2 = workdir_path.clone();
let output_file_paths = req.output_files.clone();
let output_dir_paths = req.output_directories.clone();
let maybe_jdk_home = req.jdk_home.clone();
let exclusive_spawn = executor
.spawn_blocking(
move || {
if let Some(jdk_home) = maybe_jdk_home {
symlink(jdk_home, workdir_path2.join(".jdk"))
.map_err(|err| format!("Error making JDK symlink for local execution: {err:?}"))?
}

// The bazel remote execution API specifies that the parent directories for output files and
// output directories should be created before execution completes: see the method doc.
let parent_paths_to_create: HashSet<_> = output_file_paths
.iter()
.chain(output_dir_paths.iter())
.map(|relative_path| relative_path.as_ref())
.chain(workdir_symlinks.iter().map(|s| s.src.as_path()))
.filter_map(|rel_path| rel_path.parent())
.map(|parent_relpath| workdir_path2.join(parent_relpath))
.collect();
for path in parent_paths_to_create {
create_dir_all(path.clone()).map_err(|err| {
format!("Error making parent directory {path:?} for local execution: {err:?}")
})?;
}

for workdir_symlink in workdir_symlinks {
let src = workdir_path2.join(&workdir_symlink.src);
symlink(&workdir_symlink.dst, &src).map_err(|err| {
format!(
"Error linking {} -> {} for local execution: {:?}",
src.display(),
workdir_symlink.dst.display(),
err
)
})?;
}
.await?;

let exe_was_materialized = maybe_executable_path
.as_ref()
.map_or(false, |p| workdir_path2.join(p).exists());
if exe_was_materialized {
debug!(
"Obtaining exclusive spawn lock for process since \
we materialized its executable {:?}.",
maybe_executable_path
);
}
let res: Result<_, String> = Ok(exe_was_materialized);
res
},
|e| Err(format!("Directory materialization task failed: {e}")),
)
.await?;
Ok(exclusive_spawn)
if let Some(executable_path) = maybe_executable_path {
Ok(tokio::fs::metadata(executable_path).await.is_ok())
} else {
Ok(false)
}
})
.await
}

///
Expand Down
3 changes: 1 addition & 2 deletions src/rust/engine/process_execution/src/local_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,7 @@ async fn prepare_workdir_exclusive_relative() {
work_dir.path().to_owned(),
&process,
TestDirectory::recursive().directory_digest(),
store,
executor,
&store,
&named_caches,
&immutable_inputs,
None,
Expand Down
3 changes: 1 addition & 2 deletions src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,7 @@ fn interactive_process(
tempdir.path().to_owned(),
&process,
process.input_digests.inputs.clone(),
context.core.store(),
context.core.executor.clone(),
&context.core.store(),
&context.core.named_caches,
&context.core.immutable_inputs,
None,
Expand Down

0 comments on commit f8cc00b

Please sign in to comment.