diff --git a/src/python/pants/engine/internals/scheduler.py b/src/python/pants/engine/internals/scheduler.py index 579f7922876..ecdaa88a0da 100644 --- a/src/python/pants/engine/internals/scheduler.py +++ b/src/python/pants/engine/internals/scheduler.py @@ -209,6 +209,7 @@ def __init__( local_parallelism=execution_options.process_execution_local_parallelism, local_enable_nailgun=execution_options.process_execution_local_enable_nailgun, remote_parallelism=execution_options.process_execution_remote_parallelism, + docker_strategy=execution_options.docker_strategy.value, child_max_memory=execution_options.process_total_child_memory_usage or 0, child_default_memory=execution_options.process_per_child_memory_usage, graceful_shutdown_timeout=execution_options.process_execution_graceful_shutdown_timeout, diff --git a/src/python/pants/engine/internals/scheduler_test_base.py b/src/python/pants/engine/internals/scheduler_test_base.py index d1f368ed2c2..174a831e754 100644 --- a/src/python/pants/engine/internals/scheduler_test_base.py +++ b/src/python/pants/engine/internals/scheduler_test_base.py @@ -2,12 +2,17 @@ # Licensed under the Apache License, Version 2.0 (see LICENSE). import os +from dataclasses import replace from pathlib import Path from pants.engine.internals.native_engine import PyExecutor from pants.engine.internals.scheduler import Scheduler, SchedulerSession from pants.engine.unions import UnionMembership -from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS, DEFAULT_LOCAL_STORE_OPTIONS +from pants.option.global_options import ( + DEFAULT_EXECUTION_OPTIONS, + DEFAULT_LOCAL_STORE_OPTIONS, + DockerStrategy, +) from pants.util.contextutil import temporary_file_path from pants.util.dirutil import safe_mkdtemp from pants.util.logging import LogLevel @@ -35,6 +40,7 @@ def mk_scheduler( local_execution_root_dir = os.path.realpath(safe_mkdtemp()) named_caches_dir = os.path.realpath(safe_mkdtemp()) + execution_options = replace(DEFAULT_EXECUTION_OPTIONS, docker_strategy=DockerStrategy.mount) scheduler = Scheduler( ignore_patterns=[], use_gitignore=False, @@ -45,7 +51,7 @@ def mk_scheduler( rules=rules, union_membership=UnionMembership({}), executor=self._executor, - execution_options=DEFAULT_EXECUTION_OPTIONS, + execution_options=execution_options, local_store_options=DEFAULT_LOCAL_STORE_OPTIONS, include_trace_on_error=include_trace_on_error, ) diff --git a/src/python/pants/engine/rules_test.py b/src/python/pants/engine/rules_test.py index 15848fda14d..36e28e2e5fa 100644 --- a/src/python/pants/engine/rules_test.py +++ b/src/python/pants/engine/rules_test.py @@ -2,7 +2,7 @@ # Licensed under the Apache License, Version 2.0 (see LICENSE). import re -from dataclasses import dataclass +from dataclasses import dataclass, replace from enum import Enum from pathlib import Path from textwrap import dedent @@ -28,7 +28,11 @@ rule_helper, ) from pants.engine.unions import UnionMembership -from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS, DEFAULT_LOCAL_STORE_OPTIONS +from pants.option.global_options import ( + DEFAULT_EXECUTION_OPTIONS, + DEFAULT_LOCAL_STORE_OPTIONS, + DockerStrategy, +) from pants.testutil.rule_runner import MockGet, run_rule_with_mocks from pants.util.enums import match from pants.util.logging import LogLevel @@ -36,6 +40,7 @@ def create_scheduler(rules, validate=True): """Create a Scheduler.""" + execution_options = replace(DEFAULT_EXECUTION_OPTIONS, docker_strategy=DockerStrategy.mount) return Scheduler( ignore_patterns=[], use_gitignore=False, @@ -46,7 +51,7 @@ def create_scheduler(rules, validate=True): rules=rules, union_membership=UnionMembership({}), executor=PyExecutor(core_threads=2, max_threads=4), - execution_options=DEFAULT_EXECUTION_OPTIONS, + execution_options=execution_options, local_store_options=DEFAULT_LOCAL_STORE_OPTIONS, validate_reachability=validate, ) diff --git a/src/python/pants/init/engine_initializer.py b/src/python/pants/init/engine_initializer.py index 0f3a2a82737..8437328eb5e 100644 --- a/src/python/pants/init/engine_initializer.py +++ b/src/python/pants/init/engine_initializer.py @@ -42,7 +42,6 @@ from pants.init import specs_calculator from pants.init.bootstrap_scheduler import BootstrapStatus from pants.option.global_options import ( - DEFAULT_EXECUTION_OPTIONS, DynamicRemoteOptions, ExecutionOptions, GlobalOptions, @@ -237,8 +236,6 @@ def setup_graph_extended( union_membership: UnionMembership registered_target_types = RegisteredTargetTypes.create(build_configuration.target_types) - execution_options = execution_options or DEFAULT_EXECUTION_OPTIONS - @rule def parser_singleton() -> Parser: return Parser( diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 379bc702e0c..8fd2deb32d3 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -27,6 +27,7 @@ from pants.engine.env_vars import CompleteEnvironmentVars from pants.engine.fs import FileContent from pants.engine.internals.native_engine import PyExecutor +from pants.engine.platform import Platform from pants.option.custom_types import memory_size from pants.option.errors import OptionsError from pants.option.option_types import ( @@ -130,6 +131,14 @@ class KeepSandboxes(Enum): never = "never" +class DockerStrategy(Enum): + """An enum for the global option `docker_strategy`.""" + + auto = "auto" + mount = "mount" + pipe = "pipe" + + @enum.unique class AuthPluginState(Enum): OK = "ok" @@ -491,6 +500,8 @@ class ExecutionOptions: process_execution_graceful_shutdown_timeout: int cache_content_behavior: CacheContentBehavior + docker_strategy: DockerStrategy + process_total_child_memory_usage: int | None process_per_child_memory_usage: int @@ -535,6 +546,7 @@ def from_options( process_execution_cache_namespace=bootstrap_options.process_execution_cache_namespace, process_execution_graceful_shutdown_timeout=bootstrap_options.process_execution_graceful_shutdown_timeout, process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun, + docker_strategy=GlobalOptions.resolve_docker_strategy(bootstrap_options), cache_content_behavior=bootstrap_options.cache_content_behavior, process_total_child_memory_usage=bootstrap_options.process_total_child_memory_usage, process_per_child_memory_usage=bootstrap_options.process_per_child_memory_usage, @@ -620,6 +632,7 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions: keep_sandboxes=KeepSandboxes.never, local_cache=True, cache_content_behavior=CacheContentBehavior.fetch, + docker_strategy=DockerStrategy.auto, process_execution_local_enable_nailgun=True, process_execution_graceful_shutdown_timeout=3, # Remote store setup. @@ -1042,7 +1055,11 @@ class BootstrapOptions: help=softwrap( """ The maximum number of threads to use to execute `@rule` logic. Defaults to - a small multiple of `--rule-threads-core`. + `16 * --rule-threads-core`. + + Note that setting too low a `--rule-threads-max` value can lead to deadlocks: Pants + uses blocking operations internally for a few use cases, and if the pool of blocking + threads is exhausted, those use cases will wait. """ ), ) @@ -1185,6 +1202,24 @@ class BootstrapOptions: """ ), ) + process_execution_docker_strategy = EnumOption( + default=DEFAULT_EXECUTION_OPTIONS.docker_strategy, + help=softwrap( + """ + The strategy used to provide inputs to Docker containers when the `docker_environment` + target is in use. + + The `mount` strategy provides inputs via bind mounts. The `pipe` strategy provides + inputs by tar-pipe'ing them into the container. + + The default value of `auto` will choose the fastest known-consistent strategy for the + platform that Pants is running on, which generally means using the `pipe` strategy when + Docker is implemented via virtualization (on macOS and Windows in particular). See + https://github.com/docker/roadmap/issues/7 for more information on macOS filesystem + virtualization status. + """ + ), + ) cache_content_behavior = EnumOption( advanced=True, default=DEFAULT_EXECUTION_OPTIONS.cache_content_behavior, @@ -1823,10 +1858,22 @@ def validate_remote_headers(opt_name: str) -> None: @staticmethod def create_py_executor(bootstrap_options: OptionValueContainer) -> PyExecutor: + # NB: See the `--rule-threads-max` option help for a warning on setting this too low. + # + # This value is chosen somewhat arbitrarily, but has a few concerns at play: + # * When set too low, tasks using `Executor::spawn_blocking` on the Rust side can deadlock + # when too many blocking operations already running. + # * Higher thread counts mean less bounded access to the LMDB store (which is the primary user + # of blocking tasks), which is good up to a point, but then begins to increase kernel time + # as many threads are blocked waiting for IO and locks. + # + # The value 16 was chosen to avoid deadlocks with all current `spawn_blocking` calls, and based + # on the observation that performance drops off by 2-3% points (on my machine!) when multiples + # of 32 and 64. rule_threads_max = ( bootstrap_options.rule_threads_max if bootstrap_options.rule_threads_max - else 4 * bootstrap_options.rule_threads_core + else 16 * bootstrap_options.rule_threads_core ) return PyExecutor( core_threads=bootstrap_options.rule_threads_core, max_threads=rule_threads_max @@ -1853,6 +1900,19 @@ def resolve_keep_sandboxes( else: raise TypeError(f"Unexpected option value for `keep_sandboxes`: {resolved_value}") + @staticmethod + def resolve_docker_strategy( + bootstrap_options: OptionValueContainer, + ) -> DockerStrategy: + strategy = cast(DockerStrategy, bootstrap_options.process_execution_docker_strategy) + if strategy == DockerStrategy.auto: + return ( + DockerStrategy.pipe + if Platform.create_for_localhost().is_macos + else DockerStrategy.mount + ) + return strategy + @staticmethod def compute_pants_ignore(buildroot, global_options): """Computes the merged value of the `--pants-ignore` flag. diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 84be4b9824d..7f95b6d908a 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -2362,6 +2362,7 @@ dependencies = [ "concrete_time", "deepsize", "derivative", + "env_logger", "fs", "futures", "grpc_util", @@ -2375,6 +2376,7 @@ dependencies = [ "nails", "nix", "once_cell", + "os_pipe", "parking_lot 0.12.1", "prost", "prost-types", @@ -2514,7 +2516,7 @@ dependencies = [ "cfg-if 1.0.0", "indoc", "libc", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "pyo3-build-config 0.16.6", "pyo3-ffi", "pyo3-macros", @@ -3233,6 +3235,7 @@ dependencies = [ "serde", "serde_derive", "sharded_lmdb", + "tar", "task_executor", "tempfile", "testutil", @@ -3339,6 +3342,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "tar" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b55807c0344e1e6c04d7c965f5289c39a8d94ae23ed5c0b57aabac549f871c6" +dependencies = [ + "filetime", + "libc", +] + [[package]] name = "target-lexicon" version = "0.12.3" diff --git a/src/rust/engine/fs/store/Cargo.toml b/src/rust/engine/fs/store/Cargo.toml index e66610b2a51..a854623c63e 100644 --- a/src/rust/engine/fs/store/Cargo.toml +++ b/src/rust/engine/fs/store/Cargo.toml @@ -32,6 +32,7 @@ prost-types = "0.9" serde = "1.0" serde_derive = "1.0" sharded_lmdb = { path = "../../sharded_lmdb" } +tar = { version = "0.4", default-features = false } task_executor = { path = "../../task_executor" } tempfile = "3" tokio-rustls = "0.23" diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index b18bec879e3..12456a6e1f0 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -1593,6 +1593,113 @@ impl Store { res.boxed() } + /// Writes the given DirectoryDigest to the given Write destination as a tar file. + /// + /// The tar file will contain contents that are as close as possible to what would be produced on + /// disk by a call to `materialize_directory`. + pub async fn directory_as_tar_file( + &self, + digest: DirectoryDigest, + dest: impl std::io::Write + Send + 'static, + ) -> Result<(), StoreError> { + let entries = self.entries_for_directory(digest).await?; + let executor = self.local.executor().clone(); + let store = self.clone(); + self + .local + .executor() + .spawn_blocking( + move || { + let header = |entry_type: tar::EntryType, size: u64, execute_bit: bool| -> tar::Header { + // See https://github.com/alexcrichton/tar-rs/blob/f4f439ca0cd3a984d2a66fb8e42f6e2307876afd/src/header.rs#L750-L772 + // for notes on these values. + let mut header = tar::Header::new_gnu(); + header.set_entry_type(entry_type); + header.set_size(size); + header.set_mtime(1153704088); + header.set_uid(0); + header.set_gid(0); + header.set_mode(if execute_bit { 0o755 } else { 0o644 }); + header + }; + let mut appended_directories = HashSet::new(); + let mut append_directory_all = + |tar: &mut tar::Builder<_>, directory: &Path| -> Result<(), std::io::Error> { + let mut to_append_stack = Vec::new(); + // Find which parent directories need to be appended. + for ancestor in directory.ancestors() { + if ancestor.as_os_str().is_empty() || appended_directories.contains(ancestor) { + break; + } + appended_directories.insert(ancestor.to_owned()); + to_append_stack.push(ancestor); + } + + // Then append them top to bottom. + for to_append in to_append_stack.iter().rev() { + tar.append_data( + &mut header(tar::EntryType::Directory, 0, true), + to_append, + &mut std::io::empty(), + )?; + } + + Ok(()) + }; + + let mut tar = tar::Builder::new(dest); + tar.mode(tar::HeaderMode::Deterministic); + let tar = Arc::new(Mutex::new(tar)); + for entry in entries { + match entry { + DigestEntry::File(f) => { + let tar = tar.clone(); + if let Some(parent) = f.path.parent() { + append_directory_all(&mut tar.lock(), parent)?; + } + executor.block_on(store.load_file_bytes_with(f.digest, move |bytes| { + tar.lock().append_data( + &mut header( + tar::EntryType::Regular, + f.digest.size_bytes as u64, + f.is_executable, + ), + &f.path, + std::io::Cursor::new(bytes), + ) + }))??; + } + DigestEntry::Symlink(s) => { + let mut tar = tar.lock(); + if let Some(parent) = s.path.parent() { + append_directory_all(&mut tar, parent)?; + } + tar.append_link( + &mut header(tar::EntryType::Symlink, 0, false), + &s.path, + &s.target, + )?; + } + DigestEntry::EmptyDirectory(d) => { + append_directory_all(&mut tar.lock(), &d)?; + } + } + } + + tar.lock().finish()?; + + Ok(()) + }, + |je| { + Err(StoreError::Unclassified(format!( + "Tar file writing task failed: {je}" + ))) + }, + ) + .await?; + Ok(()) + } + pub fn all_local_digests(&self, entry_type: EntryType) -> Result, String> { self.local.all_digests(entry_type) } diff --git a/src/rust/engine/fs/store/src/snapshot_tests.rs b/src/rust/engine/fs/store/src/snapshot_tests.rs index 0a42a1f7649..8935be27009 100644 --- a/src/rust/engine/fs/store/src/snapshot_tests.rs +++ b/src/rust/engine/fs/store/src/snapshot_tests.rs @@ -174,6 +174,46 @@ async fn snapshot_recursive_directories_including_empty() { ); } +#[tokio::test] +async fn snapshot_tar_roundtrip() { + let (store, dir, posix_fs, digester) = setup(); + + let cats = PathBuf::from("cats"); + let roland = cats.join("roland"); + std::fs::create_dir_all(dir.path().join(cats)).unwrap(); + make_file(&dir.path().join(&roland), STR.as_bytes(), 0o600); + + let path_stats = expand_all_sorted(posix_fs).await; + let original_snapshot = Snapshot::from_path_stats(digester, path_stats) + .await + .unwrap(); + + // Materialize the Snapshot via tar, and confirm that capturing it results in the same digest. + let (_, dir, posix_fs, digester) = setup(); + let mut tar = std::process::Command::new("tar") + .args(["-xf", "-"]) + .stdin(std::process::Stdio::piped()) + .current_dir(&dir) + .spawn() + .unwrap(); + store + .directory_as_tar_file( + original_snapshot.tree.clone().into(), + tar.stdin.take().unwrap(), + ) + .await + .unwrap(); + + assert_eq!(0, tar.wait().unwrap().code().unwrap()); + + let path_stats = expand_all_sorted(posix_fs).await; + let snapshot = Snapshot::from_path_stats(digester, path_stats) + .await + .unwrap(); + + assert_eq!(original_snapshot.digest, snapshot.digest); +} + #[tokio::test] async fn merge_directories_two_files() { let (store, _, _, _) = setup(); diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index e86ea68230f..eee7cf6d50d 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -43,18 +43,20 @@ lazy_static = "1" parking_lot = "0.12" itertools = "0.10" serde = "1.0.136" -bincode = "1.3.3" async-oncecell = "0.2" +bincode = "1.3.3" once_cell = "1.15" -rand = "0.8" +os_pipe = "1.0" prost = "0.9" prost-types = "0.9" +rand = "0.8" strum = "0.24" strum_macros = "0.24" tonic = { version = "0.6", features = ["transport", "codegen", "tls", "tls-roots", "prost"] } tryfuture = { path = "../tryfuture" } [dev-dependencies] +env_logger = "0.9.0" maplit = "1.0.1" mock = { path = "../testutil/mock" } parking_lot = "0.12" diff --git a/src/rust/engine/process_execution/src/cache_tests.rs b/src/rust/engine/process_execution/src/cache_tests.rs index 583c3e32aa7..5fc97f460ac 100644 --- a/src/rust/engine/process_execution/src/cache_tests.rs +++ b/src/rust/engine/process_execution/src/cache_tests.rs @@ -32,7 +32,7 @@ fn create_local_runner() -> (Box, Store, TempDir) { store.clone(), runtime, base_dir.path().to_owned(), - NamedCaches::new(named_cache_dir), + NamedCaches::new(named_cache_dir).unwrap(), ImmutableInputs::new(store.clone(), base_dir.path()).unwrap(), KeepSandboxes::Never, )); diff --git a/src/rust/engine/process_execution/src/docker.rs b/src/rust/engine/process_execution/src/docker.rs index 7fc600539a2..c9e967d075d 100644 --- a/src/rust/engine/process_execution/src/docker.rs +++ b/src/rust/engine/process_execution/src/docker.rs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::BTreeMap; use std::fmt; +use std::os::fd::{AsRawFd, FromRawFd}; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -18,13 +19,16 @@ use log::Level; use nails::execution::ExitCode; use once_cell::sync::Lazy; use parking_lot::Mutex; -use store::{ImmutableInputs, Store}; +use tokio::io::AsyncWriteExt; + +use fs::DirectoryDigest; +use store::{ImmutableInputs, Store, StoreError}; use task_executor::Executor; use workunit_store::{in_workunit, Metric, RunningWorkunit}; use crate::local::{ - apply_chroot, create_sandbox, prepare_workdir, setup_run_sh_script, CapturedWorkdir, ChildOutput, - KeepSandboxes, + apply_chroot, create_sandbox, prepare_workdir, prepare_workdir_digest, setup_run_sh_script, + CapturedWorkdir, ChildOutput, KeepSandboxes, }; use crate::{ Context, FallibleProcessResultWithPlatform, NamedCaches, Platform, Process, ProcessError, @@ -41,6 +45,18 @@ pub static IMAGE_PULL_CACHE: Lazy = Lazy::new(ImagePullCache::ne /// Process-wide Docker connection. pub static DOCKER: Lazy = Lazy::new(DockerOnceCell::new); +#[derive(Copy, Clone, Debug, Eq, PartialEq, strum_macros::EnumString)] +#[strum(serialize_all = "snake_case")] +pub enum DockerStrategy { + /// Inputs are written to a bind mount on the Docker host. This strategy is not completely + /// reliable in the case of virtualization (such as is used by Docker on macOS or Windows), + /// because bind-mount propagation can have latency or bugs. See #18162. + Mount, + /// Inputs are piped into an `exec` of `tar`, such that they are extracted inside the + /// container. + Pipe, +} + /// `CommandRunner` that executes processes using a local Docker client. pub struct CommandRunner<'a> { store: Store, @@ -50,6 +66,7 @@ pub struct CommandRunner<'a> { named_caches: NamedCaches, immutable_inputs: ImmutableInputs, keep_sandboxes: KeepSandboxes, + strategy: DockerStrategy, container_cache: ContainerCache<'a>, } @@ -286,6 +303,7 @@ impl<'a> CommandRunner<'a> { named_caches: NamedCaches, immutable_inputs: ImmutableInputs, keep_sandboxes: KeepSandboxes, + strategy: DockerStrategy, ) -> Result { let container_cache = ContainerCache::new( docker, @@ -303,6 +321,7 @@ impl<'a> CommandRunner<'a> { named_caches, immutable_inputs, keep_sandboxes, + strategy, container_cache, }) } @@ -362,20 +381,64 @@ impl<'a> super::CommandRunner for CommandRunner<'a> { &sandbox_path_in_container ); + // Obtain ID of the base container in which to run the execution for this process. + let container_id = { + let image = match &req.execution_strategy { + ProcessExecutionStrategy::Docker(image) => Ok(image), + pes => Err(format!( + "The Docker execution strategy was: {pes:?}, but the Docker CommandRunner was used." + )), + }?; + + self + .container_cache + .container_id_for_image(image, &req.platform, &context.build_id) + .await? + }; + // Prepare the workdir. - // DOCKER-NOTE: The input root will be bind mounted into the container. - let exclusive_spawn = prepare_workdir( - workdir.path().to_owned(), - &req, - req.input_digests.input_files.clone(), - self.store.clone(), - self.executor.clone(), - &self.named_caches, - &self.immutable_inputs, - Some(Path::new(NAMED_CACHES_BASE_PATH_IN_CONTAINER)), - Some(Path::new(IMMUTABLE_INPUTS_BASE_PATH_IN_CONTAINER)), - ) - .await?; + match self.strategy { + DockerStrategy::Mount => { + prepare_workdir( + workdir.path().to_owned(), + &req, + req.input_digests.input_files.clone(), + &self.store, + &self.named_caches, + &self.immutable_inputs, + Some(Path::new(NAMED_CACHES_BASE_PATH_IN_CONTAINER)), + Some(Path::new(IMMUTABLE_INPUTS_BASE_PATH_IN_CONTAINER)), + ) + .await?; + } + DockerStrategy::Pipe => { + // NB: For now, we continue to materialize files into a bind mount, but we do so from + // within the container. This allows us to capture outputs without an additional + // tar-pipe, but if it causes issues (outputs capture races similar to the input race + // on #18162), we'll need to adjust accordingly. + let inputs_digest = prepare_workdir_digest( + &req, + req.input_digests.input_files.clone(), + &self.store, + &self.named_caches, + None, + Some(Path::new(NAMED_CACHES_BASE_PATH_IN_CONTAINER)), + None, + ) + .await?; + + let docker = self.docker.get().await?; + pipe_inputs( + docker, + &self.store, + &self.executor, + &container_id, + &sandbox_path_in_container, + inputs_digest, + ) + .await?; + } + } workunit.increment_counter(Metric::DockerExecutionRequests, 1); @@ -386,8 +449,9 @@ impl<'a> super::CommandRunner for CommandRunner<'a> { self.store.clone(), self.executor.clone(), workdir.path().to_owned(), - sandbox_path_in_container, - exclusive_spawn, + (container_id, sandbox_path_in_container), + // Processes exec'd in Docker never require exclusive spawning. + false, req.platform, ) .map_err(|msg| { @@ -435,13 +499,13 @@ impl<'a> super::CommandRunner for CommandRunner<'a> { #[async_trait] impl<'a> CapturedWorkdir for CommandRunner<'a> { - type WorkdirToken = String; + type WorkdirToken = (String, String); async fn run_in_workdir<'s, 'c, 'w, 'r>( &'s self, - context: &'c Context, + _context: &'c Context, _workdir_path: &'w Path, - sandbox_path_in_container: Self::WorkdirToken, + (container_id, sandbox_path_in_container): Self::WorkdirToken, req: Process, _exclusive_spawn: bool, ) -> Result>, String> { @@ -463,17 +527,6 @@ impl<'a> CapturedWorkdir for CommandRunner<'a> { format!("Unable to convert working directory due to non UTF-8 characters: {s:?}") })?; - let image = match req.execution_strategy { - ProcessExecutionStrategy::Docker(image) => Ok(image), - _ => Err("The Docker execution strategy was not set on the Process, but the Docker CommandRunner was used.") - }?; - - // Obtain ID of the base container in which to run the execution for this process. - let container_id = self - .container_cache - .container_id_for_image(&image, &req.platform, &context.build_id) - .await?; - let config = bollard::exec::CreateExecOptions { env: Some(env), cmd: Some(req.argv), @@ -544,6 +597,110 @@ impl<'a> CapturedWorkdir for CommandRunner<'a> { } } +async fn pipe_inputs( + docker: &Docker, + store: &Store, + executor: &Executor, + container_id: &str, + working_dir: &str, + inputs_digest: DirectoryDigest, +) -> Result<(), ProcessError> { + let config = bollard::exec::CreateExecOptions { + cmd: Some(vec!["tar", "-xf", "-"]), + working_dir: Some(working_dir), + attach_stdin: Some(true), + attach_stdout: Some(true), + attach_stderr: Some(true), + ..bollard::exec::CreateExecOptions::default() + }; + + let exec = docker + .create_exec(container_id, config) + .await + .map_err(|err| format!("Failed to create Docker execution in container: {err:?}"))?; + + let exec_result = docker + .start_exec(&exec.id, None) + .await + .map_err(|err| format!("Failed to start Docker execution `{}`: {:?}", &exec.id, err))?; + let StartExecResults::Attached { mut output, mut input } = exec_result else { + panic!("Unexpected value returned from start_exec: {exec_result:?}"); + }; + + // Spawn tasks to write the input tar file. + let input_tasks = { + // NB: Because this takes a blocking `Write` instance, we copy from an intermediate pipe. + let (pipe_reader, pipe_writer) = os_pipe::pipe() + .map_err(|e| ProcessError::Unclassified(format!("Failed to create pipe: {e}")))?; + let store = store.clone(); + let writer = executor.spawn( + async move { + store + .directory_as_tar_file(inputs_digest, pipe_writer) + .await + }, + |e| Err(format!("Writer task failed: {e}").into()), + ); + let copier = executor.spawn( + async move { + let mut reader = unsafe { tokio::fs::File::from_raw_fd(pipe_reader.as_raw_fd()) }; + tokio::io::copy(&mut reader, &mut input) + .await + .map_err(|e| format!("Failed to copy tar file inputs: {e}"))?; + input + .shutdown() + .await + .map_err(|e| format!("Failed to flush tar file inputs: {e}")) + }, + |e| Err(format!("Copier task failed: {e}")), + ); + futures::future::try_join(writer, copier.map_err(StoreError::from)) + }; + + // Read output from the execution. + let mut output_stdio = Vec::new(); + while let Some(output_msg_res) = output.next().await { + let output_msg = output_msg_res + .map_err(|e| format!("Failed to read container output while streaming inputs: {e}"))?; + match output_msg { + LogOutput::StdOut { message } | LogOutput::StdErr { message } => output_stdio.push(message), + _ => { + // TODO: Is this going to echo stdin back to us as well? + } + } + } + + input_tasks.await?; + + let exec_metadata = docker.inspect_exec(&exec.id).await.map_err(|err| { + format!( + "Failed to inspect Docker execution `{}`: {:?}", + &exec.id, err + ) + })?; + + let status_code = exec_metadata.exit_code.ok_or_else(|| { + format!( + "Inspected execution `{}` for exit status but status was missing.", + &exec.id + ) + })?; + + if status_code == 0 { + Ok(()) + } else { + let last_lines = output_stdio + .iter() + .rev() + .take(5) + .rev() + .map(|l| String::from_utf8_lossy(l)) + .collect::>() + .join(""); + Err(format!("Failed to pipe inputs to container. Trailing output:\n{last_lines:?}").into()) + } +} + /// Caches running containers so that build actions can be invoked by running "executions" /// within those cached containers. struct ContainerCache<'a> { diff --git a/src/rust/engine/process_execution/src/docker_tests.rs b/src/rust/engine/process_execution/src/docker_tests.rs index 6c74b756290..c66e19a9f10 100644 --- a/src/rust/engine/process_execution/src/docker_tests.rs +++ b/src/rust/engine/process_execution/src/docker_tests.rs @@ -16,8 +16,9 @@ use testutil::data::{TestData, TestDirectory}; use testutil::{owned_string_vec, relative_paths}; use workunit_store::{RunningWorkunit, WorkunitStore}; -use super::docker::SANDBOX_BASE_PATH_IN_CONTAINER; -use crate::docker::{DockerOnceCell, ImagePullCache}; +use crate::docker::{ + DockerOnceCell, DockerStrategy, ImagePullCache, SANDBOX_BASE_PATH_IN_CONTAINER, +}; use crate::local::KeepSandboxes; use crate::local_tests::named_caches_and_immutable_inputs; use crate::{ @@ -102,9 +103,7 @@ async fn runner_errors_if_docker_image_not_set() { .await .unwrap_err(); if let ProcessError::Unclassified(msg) = &err { - assert!( - msg.contains("The Docker execution strategy was not set on the Process, but the Docker CommandRunner was used") - ); + assert!(msg.ends_with(", but the Docker CommandRunner was used.")); } else { panic!("unexpected value: {err:?}") } @@ -174,7 +173,7 @@ fn extract_env( exclude_keys: &[&str], ) -> Result, String> { let content = - String::from_utf8(content).map_err(|_| "Invalid UTF-8 in env output".to_string())?; + String::from_utf8(content).map_err(|e| format!("Invalid UTF-8 in env output: {e}"))?; let result = content .split('\n') .filter(|line| !line.is_empty()) @@ -190,6 +189,18 @@ fn extract_env( Ok(result) } +fn extract_ls(content: Vec) -> Result, String> { + let content = + String::from_utf8(content).map_err(|e| format!("Invalid UTF-8 in ls output: {e}"))?; + let mut items: Vec<_> = content + .split('\n') + .filter(|line| !line.is_empty()) + .map(|line| line.to_owned()) + .collect(); + items.sort(); + Ok(items) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[cfg(unix)] async fn env() { @@ -442,6 +453,8 @@ async fn output_overlapping_file_and_dir() { async fn append_only_cache_created() { skip_if_no_docker_available_in_macos_ci!(); + let _logger = env_logger::try_init(); + let name = "geo"; let dest_base = ".cache"; let cache_name = CacheName::new(name.to_owned()).unwrap(); @@ -495,6 +508,7 @@ async fn test_chroot_placeholder() { .docker(IMAGE.to_owned()), work_root.clone(), KeepSandboxes::Always, + DockerStrategy::Mount, &mut workunit, None, None, @@ -508,6 +522,53 @@ async fn test_chroot_placeholder() { assert!(got_env.get(&"PATH".to_string()).unwrap().ends_with("/bin")); } +#[tokio::test(flavor = "multi_thread")] +async fn test_piped_inputs() { + skip_if_no_docker_available_in_macos_ci!(); + + let (_, mut workunit) = WorkunitStore::setup_for_tests(); + + let store_dir = TempDir::new().unwrap(); + let executor = task_executor::Executor::new(); + let store = Store::local_only(executor.clone(), store_dir.path()).unwrap(); + + store + .store_file_bytes(TestData::roland().bytes(), false) + .await + .expect("Error saving file bytes"); + store + .record_directory(&TestDirectory::containing_roland().directory(), true) + .await + .expect("Error saving directory"); + + let work_tmpdir = TempDir::new().unwrap(); + + let mut process = Process::new(vec!["/bin/ls".to_owned()]).docker(IMAGE.to_owned()); + process.input_digests = InputDigests::new( + &store, + TestDirectory::containing_roland().directory_digest(), + BTreeMap::default(), + BTreeSet::default(), + ) + .await + .unwrap(); + + let result = run_command_via_docker_in_dir( + process, + work_tmpdir.path().to_owned(), + KeepSandboxes::Always, + DockerStrategy::Pipe, + &mut workunit, + Some(store), + Some(executor), + ) + .await + .unwrap(); + + let got_ls = extract_ls(result.stdout_bytes).unwrap(); + assert_eq!(vec!["roland.ext"], got_ls); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn all_containing_directories_for_outputs_are_created() { skip_if_no_docker_available_in_macos_ci!(); @@ -578,7 +639,7 @@ async fn timeout() { ]; let mut process = Process::new(argv).docker(IMAGE.to_owned()); - process.timeout = Some(Duration::from_millis(500)); + process.timeout = Some(Duration::from_secs(2)); process.description = "sleepy-cat".to_string(); let result = run_command_via_docker(process).await.unwrap(); @@ -634,6 +695,7 @@ async fn working_directory() { process, work_dir.path().to_owned(), KeepSandboxes::Never, + DockerStrategy::Mount, &mut workunit, Some(store), Some(executor), @@ -703,6 +765,7 @@ async fn immutable_inputs() { process, work_dir.path().to_owned(), KeepSandboxes::Never, + DockerStrategy::Mount, &mut workunit, Some(store), Some(executor), @@ -723,6 +786,7 @@ async fn run_command_via_docker_in_dir( mut req: Process, dir: PathBuf, cleanup: KeepSandboxes, + strategy: DockerStrategy, workunit: &mut RunningWorkunit, store: Option, executor: Option, @@ -746,6 +810,7 @@ async fn run_command_via_docker_in_dir( named_caches, immutable_inputs, cleanup, + strategy, )?; let result: Result<_, ProcessError> = async { let original = runner.run(Context::default(), workunit, req).await?; @@ -775,6 +840,7 @@ async fn run_command_via_docker(req: Process) -> Result, named_caches_prefix: Option<&Path>, immutable_inputs_prefix: Option<&Path>, -) -> Result { - // Collect the symlinks to create for immutable inputs or named caches. - let immutable_inputs_symlinks = { - let symlinks = immutable_inputs - .local_paths(&req.input_digests.immutable_inputs) - .await?; +) -> Result { + let mut paths = Vec::new(); - match immutable_inputs_prefix { - Some(prefix) => symlinks - .into_iter() - .map(|symlink| WorkdirSymlink { - src: symlink.src, - dst: prefix.join( - symlink - .dst - .strip_prefix(immutable_inputs.workdir()) - .unwrap(), - ), - }) - .collect::>(), - None => symlinks, + // Symlinks for immutable inputs and named caches. + let mut workdir_symlinks = Vec::new(); + { + if let Some(immutable_inputs) = immutable_inputs { + let symlinks = immutable_inputs + .local_paths(&req.input_digests.immutable_inputs) + .await?; + + match immutable_inputs_prefix { + Some(prefix) => workdir_symlinks.extend(symlinks.into_iter().map(|symlink| { + WorkdirSymlink { + src: symlink.src, + dst: prefix.join( + symlink + .dst + .strip_prefix(immutable_inputs.workdir()) + .unwrap(), + ), + } + })), + None => workdir_symlinks.extend(symlinks), + } } - }; - let named_caches_symlinks = { + let symlinks = named_caches .local_paths(&req.append_only_caches) .map_err(|err| { @@ -676,21 +664,59 @@ pub async fn prepare_workdir( )) })?; match named_caches_prefix { - Some(prefix) => symlinks - .into_iter() - .map(|symlink| WorkdirSymlink { - src: symlink.src, - dst: prefix.join(symlink.dst.strip_prefix(named_caches.base_dir()).unwrap()), - }) - .collect::>(), - None => symlinks, + Some(prefix) => workdir_symlinks.extend(symlinks.into_iter().map(|symlink| WorkdirSymlink { + src: symlink.src, + dst: prefix.join(symlink.dst.strip_prefix(named_caches.base_dir()).unwrap()), + })), + None => workdir_symlinks.extend(symlinks), } - }; - let workdir_symlinks = immutable_inputs_symlinks - .into_iter() - .chain(named_caches_symlinks.into_iter()) - .collect::>(); + } + paths.extend(workdir_symlinks.iter().map(|symlink| TypedPath::Link { + path: &symlink.src, + target: &symlink.dst, + })); + + // Symlink for JDK. + if let Some(jdk_home) = &req.jdk_home { + paths.push(TypedPath::Link { + path: Path::new(".jdk"), + target: jdk_home, + }); + } + // The bazel remote execution API specifies that the parent directories for output files and + // output directories should be created before execution completes. + let parent_paths_to_create: HashSet<_> = req + .output_files + .iter() + .chain(req.output_directories.iter()) + .filter_map(|rel_path| rel_path.as_ref().parent()) + .filter(|parent| !parent.as_os_str().is_empty()) + .collect(); + paths.extend(parent_paths_to_create.into_iter().map(TypedPath::Dir)); + + // Finally, create a tree for all of the additional paths, and merge it with the input + // Digest. + let additions = DigestTrie::from_unique_paths(paths, &HashMap::new())?; + + store.merge(vec![input_digest, additions.into()]).await +} + +/// Prepares the given workdir for use by the given Process. +/// +/// Returns true if the executable for the Process was created in the workdir, indicating that +/// `exclusive_spawn` is required. +/// +pub async fn prepare_workdir( + workdir_path: PathBuf, + req: &Process, + materialized_input_digest: DirectoryDigest, + store: &Store, + named_caches: &NamedCaches, + immutable_inputs: &ImmutableInputs, + named_caches_prefix: Option<&Path>, + immutable_inputs_prefix: Option<&Path>, +) -> Result { // Capture argv0 as the executable path so that we can test whether we have created it in the // sandbox. let maybe_executable_path = { @@ -699,89 +725,44 @@ pub async fn prepare_workdir( if let Some(working_directory) = &req.working_directory { executable_path = working_directory.as_ref().join(executable_path) } - Some(executable_path) + Some(workdir_path.join(executable_path)) } else { None } }; - // Start with async materialization of input snapshots, followed by synchronous materialization - // of other configured inputs. Note that we don't do this in parallel, as that might cause - // non-determinism when paths overlap: see the method doc. - let store2 = store.clone(); - let workdir_path_2 = workdir_path.clone(); - let mut mutable_paths = req.output_files.clone(); - mutable_paths.extend(req.output_directories.clone()); + // Prepare the digest to use, and then materialize it. in_workunit!("setup_sandbox", Level::Debug, |_workunit| async move { - store2 + let complete_input_digest = prepare_workdir_digest( + req, + materialized_input_digest, + store, + named_caches, + Some(immutable_inputs), + named_caches_prefix, + immutable_inputs_prefix, + ) + .await?; + + let mut mutable_paths = req.output_files.clone(); + mutable_paths.extend(req.output_directories.clone()); + store .materialize_directory( - workdir_path_2, - materialized_input_digest, + workdir_path, + complete_input_digest, &mutable_paths, Some(immutable_inputs), Permissions::Writable, ) - .await - }) - .await?; - - let workdir_path2 = workdir_path.clone(); - let output_file_paths = req.output_files.clone(); - let output_dir_paths = req.output_directories.clone(); - let maybe_jdk_home = req.jdk_home.clone(); - let exclusive_spawn = executor - .spawn_blocking( - move || { - if let Some(jdk_home) = maybe_jdk_home { - symlink(jdk_home, workdir_path2.join(".jdk")) - .map_err(|err| format!("Error making JDK symlink for local execution: {err:?}"))? - } - - // The bazel remote execution API specifies that the parent directories for output files and - // output directories should be created before execution completes: see the method doc. - let parent_paths_to_create: HashSet<_> = output_file_paths - .iter() - .chain(output_dir_paths.iter()) - .map(|relative_path| relative_path.as_ref()) - .chain(workdir_symlinks.iter().map(|s| s.src.as_path())) - .filter_map(|rel_path| rel_path.parent()) - .map(|parent_relpath| workdir_path2.join(parent_relpath)) - .collect(); - for path in parent_paths_to_create { - create_dir_all(path.clone()).map_err(|err| { - format!("Error making parent directory {path:?} for local execution: {err:?}") - })?; - } - - for workdir_symlink in workdir_symlinks { - let src = workdir_path2.join(&workdir_symlink.src); - symlink(&workdir_symlink.dst, &src).map_err(|err| { - format!( - "Error linking {} -> {} for local execution: {:?}", - src.display(), - workdir_symlink.dst.display(), - err - ) - })?; - } + .await?; - let exe_was_materialized = maybe_executable_path - .as_ref() - .map_or(false, |p| workdir_path2.join(p).exists()); - if exe_was_materialized { - debug!( - "Obtaining exclusive spawn lock for process since \ - we materialized its executable {:?}.", - maybe_executable_path - ); - } - let res: Result<_, String> = Ok(exe_was_materialized); - res - }, - |e| Err(format!("Directory materialization task failed: {e}")), - ) - .await?; - Ok(exclusive_spawn) + if let Some(executable_path) = maybe_executable_path { + Ok(tokio::fs::metadata(executable_path).await.is_ok()) + } else { + Ok(false) + } + }) + .await } /// diff --git a/src/rust/engine/process_execution/src/local_tests.rs b/src/rust/engine/process_execution/src/local_tests.rs index 7d82fcb79db..d733ca4d3ce 100644 --- a/src/rust/engine/process_execution/src/local_tests.rs +++ b/src/rust/engine/process_execution/src/local_tests.rs @@ -747,8 +747,7 @@ async fn prepare_workdir_exclusive_relative() { work_dir.path().to_owned(), &process, TestDirectory::recursive().directory_digest(), - store, - executor, + &store, &named_caches, &immutable_inputs, None, @@ -769,7 +768,7 @@ pub(crate) fn named_caches_and_immutable_inputs( ( root, - NamedCaches::new(named_cache_dir), + NamedCaches::new(named_cache_dir).unwrap(), ImmutableInputs::new(store, &root_path).unwrap(), ) } diff --git a/src/rust/engine/process_execution/src/nailgun/mod.rs b/src/rust/engine/process_execution/src/nailgun/mod.rs index a14b28fb02a..1f536d46bd3 100644 --- a/src/rust/engine/process_execution/src/nailgun/mod.rs +++ b/src/rust/engine/process_execution/src/nailgun/mod.rs @@ -185,8 +185,7 @@ impl super::CommandRunner for CommandRunner { nailgun_process.workdir_path().to_owned(), &client_req, client_req.input_digests.input_files.clone(), - self.store.clone(), - self.executor.clone(), + &self.store, &self.named_caches, &self.immutable_inputs, None, diff --git a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs index 2bfc6acd6f4..f64ab822776 100644 --- a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs +++ b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs @@ -369,8 +369,7 @@ impl NailgunProcess { workdir.path().to_owned(), &startup_options, startup_options.input_digests.input_files.clone(), - store.clone(), - executor.clone(), + store, named_caches, immutable_inputs, None, diff --git a/src/rust/engine/process_execution/src/nailgun/tests.rs b/src/rust/engine/process_execution/src/nailgun/tests.rs index fea836db485..5d5bc9faa58 100644 --- a/src/rust/engine/process_execution/src/nailgun/tests.rs +++ b/src/rust/engine/process_execution/src/nailgun/tests.rs @@ -22,7 +22,7 @@ fn pool(size: usize) -> (NailgunPool, NamedCaches, ImmutableInputs) { let pool = NailgunPool::new(base_dir.clone(), size, store.clone(), executor); ( pool, - NamedCaches::new(named_caches_dir.path().to_owned()), + NamedCaches::new(named_caches_dir.path().to_owned()).unwrap(), ImmutableInputs::new(store, &base_dir).unwrap(), ) } diff --git a/src/rust/engine/process_execution/src/named_caches.rs b/src/rust/engine/process_execution/src/named_caches.rs index 630029fadac..bdf29431ffb 100644 --- a/src/rust/engine/process_execution/src/named_caches.rs +++ b/src/rust/engine/process_execution/src/named_caches.rs @@ -6,7 +6,7 @@ use std::path::{Path, PathBuf}; use deepsize::DeepSizeOf; use serde::Serialize; -use fs::{default_cache_path, safe_create_dir_all_ioerror, RelativePath}; +use fs::{default_cache_path, safe_create_dir_all, RelativePath}; use store::WorkdirSymlink; #[derive(Clone, Debug, DeepSizeOf, Eq, PartialEq, Hash, PartialOrd, Ord, Serialize)] @@ -41,8 +41,9 @@ pub struct NamedCaches { } impl NamedCaches { - pub fn new(local_base: PathBuf) -> NamedCaches { - NamedCaches { local_base } + pub fn new(local_base: PathBuf) -> Result { + safe_create_dir_all(&local_base)?; + Ok(Self { local_base }) } pub fn base_dir(&self) -> &Path { @@ -70,13 +71,7 @@ impl NamedCaches { .collect::>(); for symlink in &symlinks { - safe_create_dir_all_ioerror(&symlink.dst).map_err(|err| { - format!( - "Error creating directory {}: {:?}", - symlink.dst.display(), - err - ) - })? + safe_create_dir_all(&symlink.dst)?; } Ok(symlinks) diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 990a6c333e7..7801d0e0ac2 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -348,7 +348,8 @@ async fn main() { args .named_cache_path .unwrap_or_else(NamedCaches::default_path), - ), + ) + .unwrap(), ImmutableInputs::new(store.clone(), &workdir).unwrap(), KeepSandboxes::Never, )) as Box, diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 07a26f5a71a..1cc36a75973 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -114,6 +114,7 @@ pub struct ExecutionStrategyOptions { pub local_enable_nailgun: bool, pub remote_cache_read: bool, pub remote_cache_write: bool, + pub docker_strategy: docker::DockerStrategy, pub child_max_memory: usize, pub child_default_memory: usize, pub graceful_shutdown_timeout: Duration, @@ -248,6 +249,7 @@ impl Core { named_caches.clone(), immutable_inputs.clone(), exec_strategy_opts.local_keep_sandboxes, + exec_strategy_opts.docker_strategy, )?); let runner = Box::new(SwitchedCommandRunner::new(docker_runner, runner, |req| { matches!(req.execution_strategy, ProcessExecutionStrategy::Docker(_)) @@ -535,7 +537,7 @@ impl Core { }; let immutable_inputs = ImmutableInputs::new(store.clone(), &local_execution_root_dir)?; - let named_caches = NamedCaches::new(named_caches_dir); + let named_caches = NamedCaches::new(named_caches_dir)?; let command_runners = Self::make_command_runners( &full_store, &store, diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 53ddaa16aff..95d81ce4106 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -255,30 +255,31 @@ impl PyExecutionStrategyOptions { fn __new__( local_parallelism: usize, remote_parallelism: usize, - local_keep_sandboxes: String, + local_keep_sandboxes: &str, local_cache: bool, local_enable_nailgun: bool, remote_cache_read: bool, remote_cache_write: bool, + docker_strategy: &str, child_default_memory: usize, child_max_memory: usize, graceful_shutdown_timeout: usize, - ) -> Self { - Self(ExecutionStrategyOptions { + ) -> PyO3Result { + Ok(Self(ExecutionStrategyOptions { local_parallelism, remote_parallelism, - local_keep_sandboxes: process_execution::local::KeepSandboxes::from_str( - &local_keep_sandboxes, - ) - .unwrap(), + local_keep_sandboxes: process_execution::local::KeepSandboxes::from_str(local_keep_sandboxes) + .map_err(|e| PyException::new_err(format!("{e}")))?, local_cache, local_enable_nailgun, remote_cache_read, remote_cache_write, + docker_strategy: process_execution::docker::DockerStrategy::from_str(docker_strategy) + .map_err(|e| PyException::new_err(format!("{e}")))?, child_default_memory, child_max_memory, - graceful_shutdown_timeout: Duration::from_secs(graceful_shutdown_timeout.try_into().unwrap()), - }) + graceful_shutdown_timeout: Duration::from_secs(graceful_shutdown_timeout.try_into()?), + })) } } diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index 6b20264c8b6..815ff328574 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -564,8 +564,7 @@ fn interactive_process( tempdir.path().to_owned(), &process, process.input_digests.input_files.clone(), - context.core.store(), - context.core.executor.clone(), + &context.core.store(), &context.core.named_caches, &context.core.immutable_inputs, None,