From f8cc00b6dcbf3d4233d90813766499f12844fe80 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Sat, 11 Mar 2023 09:42:56 -0800 Subject: [PATCH] Finish parallelizing materialization of Process inputs (#18469) For a long time, the: 1. symlinks for immutable inputs, named caches, or the JDK 2. parent directories of output paths ... have been created synchronously in `prepare_workdir` before the call to `materialize_directory` for the input digest. As described in the TODO on `prepare_workdir`, that prevented validation of collisions between those paths, and also meant that it was less parallel than it could be. This change switches to using the new symlinks-in-`Digest`s support that @thejcannon added to create a `Digest` that fully describes the sandbox, and then materialize that. --- .../process_execution/docker/src/docker.rs | 3 +- .../process_execution/pe_nailgun/src/lib.rs | 3 +- .../pe_nailgun/src/nailgun_pool.rs | 3 +- .../engine/process_execution/src/local.rs | 251 ++++++++---------- .../process_execution/src/local_tests.rs | 3 +- src/rust/engine/src/intrinsics.rs | 3 +- 6 files changed, 121 insertions(+), 145 deletions(-) diff --git a/src/rust/engine/process_execution/docker/src/docker.rs b/src/rust/engine/process_execution/docker/src/docker.rs index 763b4fac649..302913c8c33 100644 --- a/src/rust/engine/process_execution/docker/src/docker.rs +++ b/src/rust/engine/process_execution/docker/src/docker.rs @@ -398,8 +398,7 @@ impl<'a> process_execution::CommandRunner for CommandRunner<'a> { workdir.path().to_owned(), &req, req.input_digests.inputs.clone(), - self.store.clone(), - self.executor.clone(), + &self.store, &named_caches, &self.immutable_inputs, Some(Path::new(NAMED_CACHES_BASE_PATH_IN_CONTAINER)), diff --git a/src/rust/engine/process_execution/pe_nailgun/src/lib.rs b/src/rust/engine/process_execution/pe_nailgun/src/lib.rs index f3592fce79c..122d18218ee 100644 --- a/src/rust/engine/process_execution/pe_nailgun/src/lib.rs +++ b/src/rust/engine/process_execution/pe_nailgun/src/lib.rs @@ -209,8 +209,7 @@ impl process_execution::CommandRunner for CommandRunner { nailgun_process.workdir_path().to_owned(), &client_req, client_req.input_digests.inputs.clone(), - self.store.clone(), - self.executor.clone(), + &self.store, &self.named_caches, &self.immutable_inputs, None, diff --git a/src/rust/engine/process_execution/pe_nailgun/src/nailgun_pool.rs b/src/rust/engine/process_execution/pe_nailgun/src/nailgun_pool.rs index 5ab746265fe..4b05135bffe 100644 --- a/src/rust/engine/process_execution/pe_nailgun/src/nailgun_pool.rs +++ b/src/rust/engine/process_execution/pe_nailgun/src/nailgun_pool.rs @@ -369,8 +369,7 @@ impl NailgunProcess { workdir.path().to_owned(), &startup_options, startup_options.input_digests.inputs.clone(), - store.clone(), - executor.clone(), + store, named_caches, immutable_inputs, None, diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 7be9b0dccca..5aedfcef11e 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -1,15 +1,11 @@ // Copyright 2022 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::ffi::OsStr; use std::fmt::{self, Debug}; -use std::fs::create_dir_all; use std::io::Write; use std::ops::Neg; -use std::os::unix::{ - fs::{symlink, OpenOptionsExt}, - process::ExitStatusExt, -}; +use std::os::unix::{fs::OpenOptionsExt, process::ExitStatusExt}; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::str; @@ -19,8 +15,9 @@ use std::time::Instant; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use fs::{ - self, DirectoryDigest, GlobExpansionConjunction, GlobMatching, PathGlobs, Permissions, - RelativePath, StrictGlobMatching, SymlinkBehavior, EMPTY_DIRECTORY_DIGEST, + self, DigestTrie, DirectoryDigest, GlobExpansionConjunction, GlobMatching, PathGlobs, + Permissions, RelativePath, StrictGlobMatching, SymlinkBehavior, TypedPath, + EMPTY_DIRECTORY_DIGEST, }; use futures::stream::{BoxStream, StreamExt, TryStreamExt}; use futures::{try_join, FutureExt, TryFutureExt}; @@ -28,7 +25,8 @@ use log::{debug, info}; use nails::execution::ExitCode; use shell_quote::bash; use store::{ - ImmutableInputs, OneOffStoreFileByDigest, Snapshot, Store, StoreError, WorkdirSymlink, + ImmutableInputs, OneOffStoreFileByDigest, Snapshot, SnapshotOps, Store, StoreError, + WorkdirSymlink, }; use task_executor::Executor; use tempfile::TempDir; @@ -276,8 +274,7 @@ impl super::CommandRunner for CommandRunner { workdir.path().to_owned(), &req, req.input_digests.inputs.clone(), - self.store.clone(), - self.executor.clone(), + &self.store, &self.named_caches, &self.immutable_inputs, None, @@ -631,52 +628,43 @@ pub fn apply_chroot(chroot_path: &str, req: &mut Process) { } } -/// Prepares the given workdir for use by the given Process. -/// -/// Returns true if the executable for the Process was created in the workdir, indicating that -/// `exclusive_spawn` is required. -/// -/// TODO: Both the symlinks for named_caches/immutable_inputs and the empty output directories -/// required by the spec should be created via a synthetic Digest containing SymlinkNodes and -/// the empty output directories. That would: -/// 1. improve validation that nothing we create collides. -/// 2. allow for materialization to safely occur fully in parallel, rather than partially -/// synchronously in the background. -/// -pub async fn prepare_workdir( - workdir_path: PathBuf, +/// Creates a Digest for the entire input sandbox contents of the given Process, including absolute +/// symlinks to immutable inputs, named caches, and JDKs (if configured). +pub async fn prepare_workdir_digest( req: &Process, - materialized_input_digest: DirectoryDigest, - store: Store, - executor: Executor, + input_digest: DirectoryDigest, + store: &Store, named_caches: &NamedCaches, - immutable_inputs: &ImmutableInputs, + immutable_inputs: Option<&ImmutableInputs>, named_caches_prefix: Option<&Path>, immutable_inputs_prefix: Option<&Path>, -) -> Result { - // Collect the symlinks to create for immutable inputs or named caches. - let immutable_inputs_symlinks = { - let symlinks = immutable_inputs - .local_paths(&req.input_digests.immutable_inputs) - .await?; +) -> Result { + let mut paths = Vec::new(); - match immutable_inputs_prefix { - Some(prefix) => symlinks - .into_iter() - .map(|symlink| WorkdirSymlink { - src: symlink.src, - dst: prefix.join( - symlink - .dst - .strip_prefix(immutable_inputs.workdir()) - .unwrap(), - ), - }) - .collect::>(), - None => symlinks, + // Symlinks for immutable inputs and named caches. + let mut workdir_symlinks = Vec::new(); + { + if let Some(immutable_inputs) = immutable_inputs { + let symlinks = immutable_inputs + .local_paths(&req.input_digests.immutable_inputs) + .await?; + + match immutable_inputs_prefix { + Some(prefix) => workdir_symlinks.extend(symlinks.into_iter().map(|symlink| { + WorkdirSymlink { + src: symlink.src, + dst: prefix.join( + symlink + .dst + .strip_prefix(immutable_inputs.workdir()) + .unwrap(), + ), + } + })), + None => workdir_symlinks.extend(symlinks), + } } - }; - let named_caches_symlinks = { + let symlinks = named_caches .paths(&req.append_only_caches) .await @@ -686,21 +674,59 @@ pub async fn prepare_workdir( )) })?; match named_caches_prefix { - Some(prefix) => symlinks - .into_iter() - .map(|symlink| WorkdirSymlink { - src: symlink.src, - dst: prefix.join(symlink.dst.strip_prefix(named_caches.base_path()).unwrap()), - }) - .collect::>(), - None => symlinks, + Some(prefix) => workdir_symlinks.extend(symlinks.into_iter().map(|symlink| WorkdirSymlink { + src: symlink.src, + dst: prefix.join(symlink.dst.strip_prefix(named_caches.base_path()).unwrap()), + })), + None => workdir_symlinks.extend(symlinks), } - }; - let workdir_symlinks = immutable_inputs_symlinks - .into_iter() - .chain(named_caches_symlinks.into_iter()) - .collect::>(); + } + paths.extend(workdir_symlinks.iter().map(|symlink| TypedPath::Link { + path: &symlink.src, + target: &symlink.dst, + })); + + // Symlink for JDK. + if let Some(jdk_home) = &req.jdk_home { + paths.push(TypedPath::Link { + path: Path::new(".jdk"), + target: jdk_home, + }); + } + + // The bazel remote execution API specifies that the parent directories for output files and + // output directories should be created before execution completes. + let parent_paths_to_create: HashSet<_> = req + .output_files + .iter() + .chain(req.output_directories.iter()) + .filter_map(|rel_path| rel_path.as_ref().parent()) + .filter(|parent| !parent.as_os_str().is_empty()) + .collect(); + paths.extend(parent_paths_to_create.into_iter().map(TypedPath::Dir)); + + // Finally, create a tree for all of the additional paths, and merge it with the input + // Digest. + let additions = DigestTrie::from_unique_paths(paths, &HashMap::new())?; + + store.merge(vec![input_digest, additions.into()]).await +} +/// Prepares the given workdir for use by the given Process. +/// +/// Returns true if the executable for the Process was created in the workdir, indicating that +/// `exclusive_spawn` is required. +/// +pub async fn prepare_workdir( + workdir_path: PathBuf, + req: &Process, + materialized_input_digest: DirectoryDigest, + store: &Store, + named_caches: &NamedCaches, + immutable_inputs: &ImmutableInputs, + named_caches_prefix: Option<&Path>, + immutable_inputs_prefix: Option<&Path>, +) -> Result { // Capture argv0 as the executable path so that we can test whether we have created it in the // sandbox. let maybe_executable_path = { @@ -709,89 +735,44 @@ pub async fn prepare_workdir( if let Some(working_directory) = &req.working_directory { executable_path = working_directory.as_ref().join(executable_path) } - Some(executable_path) + Some(workdir_path.join(executable_path)) } else { None } }; - // Start with async materialization of input snapshots, followed by synchronous materialization - // of other configured inputs. Note that we don't do this in parallel, as that might cause - // non-determinism when paths overlap: see the method doc. - let store2 = store.clone(); - let workdir_path_2 = workdir_path.clone(); - let mut mutable_paths = req.output_files.clone(); - mutable_paths.extend(req.output_directories.clone()); + // Prepare the digest to use, and then materialize it. in_workunit!("setup_sandbox", Level::Debug, |_workunit| async move { - store2 + let complete_input_digest = prepare_workdir_digest( + req, + materialized_input_digest, + store, + named_caches, + Some(immutable_inputs), + named_caches_prefix, + immutable_inputs_prefix, + ) + .await?; + + let mut mutable_paths = req.output_files.clone(); + mutable_paths.extend(req.output_directories.clone()); + store .materialize_directory( - workdir_path_2, - materialized_input_digest, + workdir_path, + complete_input_digest, &mutable_paths, Some(immutable_inputs), Permissions::Writable, ) - .await - }) - .await?; - - let workdir_path2 = workdir_path.clone(); - let output_file_paths = req.output_files.clone(); - let output_dir_paths = req.output_directories.clone(); - let maybe_jdk_home = req.jdk_home.clone(); - let exclusive_spawn = executor - .spawn_blocking( - move || { - if let Some(jdk_home) = maybe_jdk_home { - symlink(jdk_home, workdir_path2.join(".jdk")) - .map_err(|err| format!("Error making JDK symlink for local execution: {err:?}"))? - } - - // The bazel remote execution API specifies that the parent directories for output files and - // output directories should be created before execution completes: see the method doc. - let parent_paths_to_create: HashSet<_> = output_file_paths - .iter() - .chain(output_dir_paths.iter()) - .map(|relative_path| relative_path.as_ref()) - .chain(workdir_symlinks.iter().map(|s| s.src.as_path())) - .filter_map(|rel_path| rel_path.parent()) - .map(|parent_relpath| workdir_path2.join(parent_relpath)) - .collect(); - for path in parent_paths_to_create { - create_dir_all(path.clone()).map_err(|err| { - format!("Error making parent directory {path:?} for local execution: {err:?}") - })?; - } - - for workdir_symlink in workdir_symlinks { - let src = workdir_path2.join(&workdir_symlink.src); - symlink(&workdir_symlink.dst, &src).map_err(|err| { - format!( - "Error linking {} -> {} for local execution: {:?}", - src.display(), - workdir_symlink.dst.display(), - err - ) - })?; - } + .await?; - let exe_was_materialized = maybe_executable_path - .as_ref() - .map_or(false, |p| workdir_path2.join(p).exists()); - if exe_was_materialized { - debug!( - "Obtaining exclusive spawn lock for process since \ - we materialized its executable {:?}.", - maybe_executable_path - ); - } - let res: Result<_, String> = Ok(exe_was_materialized); - res - }, - |e| Err(format!("Directory materialization task failed: {e}")), - ) - .await?; - Ok(exclusive_spawn) + if let Some(executable_path) = maybe_executable_path { + Ok(tokio::fs::metadata(executable_path).await.is_ok()) + } else { + Ok(false) + } + }) + .await } /// diff --git a/src/rust/engine/process_execution/src/local_tests.rs b/src/rust/engine/process_execution/src/local_tests.rs index e6eca2e8719..945fa81d19f 100644 --- a/src/rust/engine/process_execution/src/local_tests.rs +++ b/src/rust/engine/process_execution/src/local_tests.rs @@ -735,8 +735,7 @@ async fn prepare_workdir_exclusive_relative() { work_dir.path().to_owned(), &process, TestDirectory::recursive().directory_digest(), - store, - executor, + &store, &named_caches, &immutable_inputs, None, diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index 65a66c0ef85..dca08b37f61 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -561,8 +561,7 @@ fn interactive_process( tempdir.path().to_owned(), &process, process.input_digests.inputs.clone(), - context.core.store(), - context.core.executor.clone(), + &context.core.store(), &context.core.named_caches, &context.core.immutable_inputs, None,