diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 9a3c10c8acc..48be187a76b 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -59,6 +59,15 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-oncecell" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e81301a5b23aaf8a2b002c722af894aca1d423ac1b0e17f553e8a2964e441a3" +dependencies = [ + "futures", +] + [[package]] name = "async-stream" version = "0.3.2" @@ -571,17 +580,6 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" -[[package]] -name = "double-checked-cell-async" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f22db4315075554331c4976a70d10d2d07192d1df01e9e68553bb632c3fa157" -dependencies = [ - "futures-util", - "unreachable", - "void", -] - [[package]] name = "either" version = "1.6.1" @@ -607,6 +605,7 @@ dependencies = [ name = "engine" version = "0.0.1" dependencies = [ + "async-oncecell", "async-trait", "async_latch", "async_semaphore", @@ -615,7 +614,6 @@ dependencies = [ "concrete_time", "crossbeam-channel 0.4.4", "derivative", - "double-checked-cell-async", "either", "env_logger", "fnv", @@ -2173,6 +2171,7 @@ name = "process_execution" version = "0.0.1" dependencies = [ "async-lock", + "async-oncecell", "async-trait", "async_semaphore", "bincode", @@ -2180,7 +2179,6 @@ dependencies = [ "cache", "concrete_time", "derivative", - "double-checked-cell-async", "fs", "futures", "grpc_util", @@ -2944,12 +2942,12 @@ dependencies = [ name = "store" version = "0.1.0" dependencies = [ + "async-oncecell", "async-stream", "async-trait", "bytes", "concrete_time", "criterion", - "double-checked-cell-async", "fs", "futures", "glob", @@ -3550,15 +3548,6 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f14ee04d9415b52b3aeab06258a3f07093182b88ba0f9b8d203f211a7a7d41c7" -[[package]] -name = "unreachable" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" -dependencies = [ - "void", -] - [[package]] name = "untrusted" version = "0.7.1" @@ -3608,12 +3597,6 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" -[[package]] -name = "void" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" - [[package]] name = "walkdir" version = "2.3.1" diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index 84477a28537..9a3c7f2c7b2 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -114,7 +114,7 @@ cache = { path = "cache" } concrete_time = { path = "concrete_time" } crossbeam-channel = "0.4" derivative = "2.2" -double-checked-cell-async = "2.0" +async-oncecell = "0.2" either = "1.6" fnv = "1.0.5" fs = { path = "fs" } diff --git a/src/rust/engine/fs/store/Cargo.toml b/src/rust/engine/fs/store/Cargo.toml index 1256ec80822..e8767791120 100644 --- a/src/rust/engine/fs/store/Cargo.toml +++ b/src/rust/engine/fs/store/Cargo.toml @@ -11,7 +11,7 @@ async-trait = "=0.1.42" protos = { path = "../../protos" } bytes = "1.0" concrete_time = { path = "../../concrete_time" } -double-checked-cell-async = "2.0" +async-oncecell = "0.2" grpc_util = { path = "../../grpc_util" } fs = { path = ".." } futures = "0.3" diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index ede5e43b6f2..49544b7f7a4 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -44,9 +44,9 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant}; +use async_oncecell::OnceCell; use async_trait::async_trait; use bytes::Bytes; -use double_checked_cell_async::DoubleCheckedCell; use fs::{default_cache_path, DigestEntry, FileContent, FileEntry, Permissions, RelativePath}; use futures::future::{self, BoxFuture, Either, FutureExt, TryFutureExt}; use grpc_util::prost::MessageExt; @@ -235,7 +235,7 @@ impl Store { upload_timeout: Duration, rpc_retries: usize, rpc_concurrency_limit: usize, - capabilities_cell_opt: Option>>, + capabilities_cell_opt: Option>>, batch_api_size_limit: usize, ) -> Result { Ok(Store { diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index b3bdf87e35a..846415b7759 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -6,8 +6,8 @@ use std::ops::Range; use std::sync::Arc; use std::time::{Duration, Instant}; +use async_oncecell::OnceCell; use bytes::{Bytes, BytesMut}; -use double_checked_cell_async::DoubleCheckedCell; use futures::Future; use futures::StreamExt; use grpc_util::retry::{retry_call, status_is_retryable}; @@ -32,7 +32,7 @@ pub struct ByteStore { _rpc_attempts: usize, byte_stream_client: Arc>, cas_client: Arc>, - capabilities_cell: Arc>, + capabilities_cell: Arc>, capabilities_client: Arc>, batch_api_size_limit: usize, } @@ -76,7 +76,7 @@ impl ByteStore { upload_timeout: Duration, rpc_retries: usize, rpc_concurrency_limit: usize, - capabilities_cell_opt: Option>>, + capabilities_cell_opt: Option>>, batch_api_size_limit: usize, ) -> Result { let tls_client_config = if cas_address.starts_with("https://") { @@ -107,8 +107,7 @@ impl ByteStore { _rpc_attempts: rpc_retries + 1, byte_stream_client, cas_client, - capabilities_cell: capabilities_cell_opt - .unwrap_or_else(|| Arc::new(DoubleCheckedCell::new())), + capabilities_cell: capabilities_cell_opt.unwrap_or_else(|| Arc::new(OnceCell::new())), capabilities_client, batch_api_size_limit, }) diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index 1325a13e3b5..d6c7fb94e2a 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -41,7 +41,7 @@ parking_lot = "0.11" itertools = "0.10" serde = "1.0.104" bincode = "1.2.1" -double-checked-cell-async = "2.0" +async-oncecell = "0.2" rand = "0.8" prost = "0.9" prost-types = "0.9" diff --git a/src/rust/engine/process_execution/src/immutable_inputs.rs b/src/rust/engine/process_execution/src/immutable_inputs.rs index 50cbcf7ab87..74c4666d063 100644 --- a/src/rust/engine/process_execution/src/immutable_inputs.rs +++ b/src/rust/engine/process_execution/src/immutable_inputs.rs @@ -2,8 +2,9 @@ use std::collections::{BTreeMap, HashMap}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use double_checked_cell_async::DoubleCheckedCell; +use async_oncecell::OnceCell; use fs::{Permissions, RelativePath}; +use futures::TryFutureExt; use hashing::Digest; use parking_lot::Mutex; use store::Store; @@ -11,6 +12,47 @@ use tempfile::TempDir; use crate::WorkdirSymlink; +async fn rename_readonly_directory( + src: impl AsRef, + dest: impl AsRef, + map_rename_err: impl Fn(std::io::Error) -> String, +) -> Result<(), String> { + // If you try to rename a read-only directory (mode 0o555) under masOS you get permission + // denied; so we temporarily make the directory writeable by the current process in order to be + // able to rename it without error. + #[cfg(target_os = "macos")] + { + use std::os::unix::fs::PermissionsExt; + tokio::fs::set_permissions(&src, std::fs::Permissions::from_mode(0o755)) + .map_err(|e| { + format!( + "Failed to prepare {src:?} perms for a rename to {dest:?}: {err}", + src = src.as_ref(), + dest = dest.as_ref(), + err = e + ) + }) + .await?; + } + tokio::fs::rename(&src, &dest) + .map_err(map_rename_err) + .await?; + #[cfg(target_os = "macos")] + { + use std::os::unix::fs::PermissionsExt; + tokio::fs::set_permissions(&dest, std::fs::Permissions::from_mode(0o555)) + .map_err(|e| { + format!( + "Failed to seal {dest:?} as read-only: {err}", + dest = dest.as_ref(), + err = e + ) + }) + .await?; + } + Ok(()) +} + /// Holds Digests materialized into a temporary directory, for symlinking into local sandboxes. pub struct ImmutableInputs { store: Store, @@ -18,17 +60,20 @@ pub struct ImmutableInputs { workdir: TempDir, // A map from Digest to the location it has been materialized at. The DoubleCheckedCell allows // for cooperation between threads attempting to create Digests. - contents: Mutex>>>, + contents: Mutex>>>, } impl ImmutableInputs { pub fn new(store: Store, base: &Path) -> Result { - let workdir = TempDir::new_in(base).map_err(|e| { - format!( - "Failed to create temporary directory for immutable inputs: {}", - e - ) - })?; + let workdir = tempfile::Builder::new() + .prefix("immutable_inputs") + .tempdir_in(base) + .map_err(|e| { + format!( + "Failed to create temporary directory for immutable inputs: {}", + e + ) + })?; Ok(Self { store, workdir, @@ -39,28 +84,63 @@ impl ImmutableInputs { /// Returns an absolute Path to immutably consume the given Digest from. async fn path(&self, digest: Digest) -> Result { let cell = self.contents.lock().entry(digest).or_default().clone(); - let value: Result<_, String> = cell + let value = cell .get_or_try_init(async { - let digest_str = digest.hash.to_hex(); - - let path = self.workdir.path().join(digest_str); - if let Ok(meta) = tokio::fs::metadata(&path).await { - // TODO: If this error triggers, it indicates that we have previously checked out this - // directory, either due to a race condition, or due to a previous failure to - // materialize. See https://github.com/pantsbuild/pants/issues/13899 - return Err(format!( - "Destination for immutable digest already exists: {:?}", - meta - )); - } + let chroot = TempDir::new_in(self.workdir.path()).map_err(|e| { + format!( + "Failed to create a temporary directory for materialization of immutable input \ + digest {:?}: {}", + digest, e + ) + })?; self .store - .materialize_directory(path.clone(), digest, Permissions::ReadOnly) + .materialize_directory(chroot.path().to_path_buf(), digest, Permissions::ReadOnly) .await?; - Ok(path) + let src = chroot.into_path(); + let dest = self.workdir.path().join(digest.hash.to_hex()); + rename_readonly_directory(&src, &dest, |e| { + // TODO(John Sirois): This diagnostic is over the top and should be trimmed down once + // we have confidence in the fix. We've had issues with permission denied errors in + // the past though; so all this information is in here to root-cause the issue should + // it persist. + let maybe_collision_metadata = std::fs::metadata(&dest); + let maybe_unwriteable_parent_metadata = dest + .parent() + .ok_or(format!( + "The destination directory for digest {:?} of {:?} has no parent dir.", + &digest, &dest + )) + .map(|p| std::fs::metadata(&p)); + format!( + "Failed to move materialized immutable input for {digest:?} from {src:?} to \ + {dest:?}: {err}\n\ + Parent directory (un-writeable parent dir?) metadata: {parent_metadata:?}\n\ + Destination directory (collision?) metadata: {existing_metadata:?}\n\ + Current immutable check outs (~dup fingerprints / differing sizes?): {contents:?} + ", + digest = digest, + src = src, + dest = &dest, + // If the parent dir is un-writeable, which is unexpected, we will get permission + // denied on the rename. + parent_metadata = maybe_unwriteable_parent_metadata, + // If the destination directory already exists then we have a leaky locking regime or + // broken materialization failure cleanup. + existing_metadata = maybe_collision_metadata, + // Two digests that have different size_bytes but the same fingerprint is a bug in its + // own right, but would lead to making the same `digest_str` accessible via two + // different Digest keys here; so display all the keys and values to be able to spot + // this should it occur. + contents = self.contents.lock(), + err = e + ) + }) + .await?; + Ok::<_, String>(dest) }) - .await; - Ok(value?.clone()) + .await?; + Ok(value.clone()) } /// diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 4c5a14c9b6a..317a19fd707 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -7,10 +7,10 @@ use std::sync::Arc; use std::time::SystemTime; use std::time::{Duration, Instant}; +use async_oncecell::OnceCell; use async_trait::async_trait; use bytes::Bytes; use concrete_time::TimeSpan; -use double_checked_cell_async::DoubleCheckedCell; use fs::{self, File, PathStat}; use futures::future::{self, BoxFuture, TryFutureExt}; use futures::FutureExt; @@ -104,7 +104,7 @@ pub struct CommandRunner { action_cache_client: Arc>, overall_deadline: Duration, retry_interval_duration: Duration, - capabilities_cell: Arc>, + capabilities_cell: Arc>, capabilities_client: Arc>, } @@ -127,7 +127,7 @@ impl CommandRunner { retry_interval_duration: Duration, execution_concurrency_limit: usize, cache_concurrency_limit: usize, - capabilities_cell_opt: Option>>, + capabilities_cell_opt: Option>>, ) -> Result { let execution_use_tls = execution_address.starts_with("https://"); let store_use_tls = store_address.starts_with("https://"); @@ -177,8 +177,7 @@ impl CommandRunner { platform, overall_deadline, retry_interval_duration, - capabilities_cell: capabilities_cell_opt - .unwrap_or_else(|| Arc::new(DoubleCheckedCell::new())), + capabilities_cell: capabilities_cell_opt.unwrap_or_else(|| Arc::new(OnceCell::new())), capabilities_client, }; diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 5454286362b..27c149a7bee 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -17,8 +17,8 @@ use crate::session::{Session, Sessions}; use crate::tasks::{Rule, Tasks}; use crate::types::Types; +use async_oncecell::OnceCell; use cache::PersistentCache; -use double_checked_cell_async::DoubleCheckedCell; use fs::{safe_create_dir_all_ioerror, GitignoreStyleExcludes, PosixFS}; use graph::{self, EntryId, Graph, InvalidationResult, NodeContext}; use log::info; @@ -133,7 +133,7 @@ impl Core { remoting_opts: &RemotingOptions, remote_store_address: &Option, root_ca_certs: &Option>, - capabilities_cell_opt: Option>>, + capabilities_cell_opt: Option>>, ) -> Result { let local_only = Store::local_only_with_options( executor.clone(), @@ -213,7 +213,7 @@ impl Core { root_ca_certs: &Option>, exec_strategy_opts: &ExecutionStrategyOptions, remoting_opts: &RemotingOptions, - capabilities_cell_opt: Option>>, + capabilities_cell_opt: Option>>, ) -> Result, String> { let remote_caching_used = exec_strategy_opts.remote_cache_read || exec_strategy_opts.remote_cache_write; @@ -363,7 +363,7 @@ impl Core { && remoting_opts.execution_address == remoting_opts.store_address && remoting_opts.execution_headers == remoting_opts.store_headers { - Some(Arc::new(DoubleCheckedCell::new())) + Some(Arc::new(OnceCell::new())) } else { None };