Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix immutable inputs DCL bug. #14016

Merged
merged 3 commits into from
Jan 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 12 additions & 29 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ cache = { path = "cache" }
concrete_time = { path = "concrete_time" }
crossbeam-channel = "0.4"
derivative = "2.2"
double-checked-cell-async = "2.0"
async-oncecell = "0.2"
either = "1.6"
fnv = "1.0.5"
fs = { path = "fs" }
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/fs/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async-trait = "=0.1.42"
protos = { path = "../../protos" }
bytes = "1.0"
concrete_time = { path = "../../concrete_time" }
double-checked-cell-async = "2.0"
async-oncecell = "0.2"
grpc_util = { path = "../../grpc_util" }
fs = { path = ".." }
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};

use async_oncecell::OnceCell;
use async_trait::async_trait;
use bytes::Bytes;
use double_checked_cell_async::DoubleCheckedCell;
use fs::{default_cache_path, DigestEntry, FileContent, FileEntry, Permissions, RelativePath};
use futures::future::{self, BoxFuture, Either, FutureExt, TryFutureExt};
use grpc_util::prost::MessageExt;
Expand Down Expand Up @@ -235,7 +235,7 @@ impl Store {
upload_timeout: Duration,
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<Store, String> {
Ok(Store {
Expand Down
9 changes: 4 additions & 5 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::ops::Range;
use std::sync::Arc;
use std::time::{Duration, Instant};

use async_oncecell::OnceCell;
use bytes::{Bytes, BytesMut};
use double_checked_cell_async::DoubleCheckedCell;
use futures::Future;
use futures::StreamExt;
use grpc_util::retry::{retry_call, status_is_retryable};
Expand All @@ -32,7 +32,7 @@ pub struct ByteStore {
_rpc_attempts: usize,
byte_stream_client: Arc<ByteStreamClient<LayeredService>>,
cas_client: Arc<ContentAddressableStorageClient<LayeredService>>,
capabilities_cell: Arc<DoubleCheckedCell<ServerCapabilities>>,
capabilities_cell: Arc<OnceCell<ServerCapabilities>>,
capabilities_client: Arc<CapabilitiesClient<LayeredService>>,
batch_api_size_limit: usize,
}
Expand Down Expand Up @@ -76,7 +76,7 @@ impl ByteStore {
upload_timeout: Duration,
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<ByteStore, String> {
let tls_client_config = if cas_address.starts_with("https://") {
Expand Down Expand Up @@ -107,8 +107,7 @@ impl ByteStore {
_rpc_attempts: rpc_retries + 1,
byte_stream_client,
cas_client,
capabilities_cell: capabilities_cell_opt
.unwrap_or_else(|| Arc::new(DoubleCheckedCell::new())),
capabilities_cell: capabilities_cell_opt.unwrap_or_else(|| Arc::new(OnceCell::new())),
capabilities_client,
batch_api_size_limit,
})
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ parking_lot = "0.11"
itertools = "0.10"
serde = "1.0.104"
bincode = "1.2.1"
double-checked-cell-async = "2.0"
async-oncecell = "0.2"
rand = "0.8"
prost = "0.9"
prost-types = "0.9"
Expand Down
130 changes: 105 additions & 25 deletions src/rust/engine/process_execution/src/immutable_inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,78 @@ use std::collections::{BTreeMap, HashMap};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use double_checked_cell_async::DoubleCheckedCell;
use async_oncecell::OnceCell;
use fs::{Permissions, RelativePath};
use futures::TryFutureExt;
use hashing::Digest;
use parking_lot::Mutex;
use store::Store;
use tempfile::TempDir;

use crate::WorkdirSymlink;

async fn rename_readonly_directory(
src: impl AsRef<Path>,
dest: impl AsRef<Path>,
map_rename_err: impl Fn(std::io::Error) -> String,
) -> Result<(), String> {
Comment on lines +15 to +19
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we end up needing to do this for any other materialized directories, we might be able to bake it into materialize_directory itself, before the root ends up marked readonly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, maybe. The materialize and move case that this use has would need a callback or an expclict parameter for a final dest.

// If you try to rename a read-only directory (mode 0o555) under masOS you get permission
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// If you try to rename a read-only directory (mode 0o555) under masOS you get permission
// If you try to rename a read-only directory (mode 0o555) under macOS you get permission

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to save a tree since this is a comment and you've proved its understandable despite typos.

// denied; so we temporarily make the directory writeable by the current process in order to be
// able to rename it without error.
#[cfg(target_os = "macos")]
{
use std::os::unix::fs::PermissionsExt;
tokio::fs::set_permissions(&src, std::fs::Permissions::from_mode(0o755))
.map_err(|e| {
format!(
"Failed to prepare {src:?} perms for a rename to {dest:?}: {err}",
src = src.as_ref(),
dest = dest.as_ref(),
err = e
)
})
.await?;
}
tokio::fs::rename(&src, &dest)
.map_err(map_rename_err)
.await?;
#[cfg(target_os = "macos")]
{
use std::os::unix::fs::PermissionsExt;
tokio::fs::set_permissions(&dest, std::fs::Permissions::from_mode(0o555))
.map_err(|e| {
format!(
"Failed to seal {dest:?} as read-only: {err}",
dest = dest.as_ref(),
err = e
)
})
.await?;
}
Ok(())
}

/// Holds Digests materialized into a temporary directory, for symlinking into local sandboxes.
pub struct ImmutableInputs {
store: Store,
// The TempDir that digests are materialized in.
workdir: TempDir,
// A map from Digest to the location it has been materialized at. The DoubleCheckedCell allows
// for cooperation between threads attempting to create Digests.
contents: Mutex<HashMap<Digest, Arc<DoubleCheckedCell<PathBuf>>>>,
contents: Mutex<HashMap<Digest, Arc<OnceCell<PathBuf>>>>,
}

impl ImmutableInputs {
pub fn new(store: Store, base: &Path) -> Result<Self, String> {
let workdir = TempDir::new_in(base).map_err(|e| {
format!(
"Failed to create temporary directory for immutable inputs: {}",
e
)
})?;
let workdir = tempfile::Builder::new()
.prefix("immutable_inputs")
.tempdir_in(base)
.map_err(|e| {
format!(
"Failed to create temporary directory for immutable inputs: {}",
e
)
})?;
Ok(Self {
store,
workdir,
Expand All @@ -39,28 +84,63 @@ impl ImmutableInputs {
/// Returns an absolute Path to immutably consume the given Digest from.
async fn path(&self, digest: Digest) -> Result<PathBuf, String> {
let cell = self.contents.lock().entry(digest).or_default().clone();
let value: Result<_, String> = cell
let value = cell
.get_or_try_init(async {
let digest_str = digest.hash.to_hex();

let path = self.workdir.path().join(digest_str);
if let Ok(meta) = tokio::fs::metadata(&path).await {
// TODO: If this error triggers, it indicates that we have previously checked out this
// directory, either due to a race condition, or due to a previous failure to
// materialize. See https://github.com/pantsbuild/pants/issues/13899
return Err(format!(
"Destination for immutable digest already exists: {:?}",
meta
));
}
let chroot = TempDir::new_in(self.workdir.path()).map_err(|e| {
format!(
"Failed to create a temporary directory for materialization of immutable input \
digest {:?}: {}",
digest, e
)
})?;
self
.store
.materialize_directory(path.clone(), digest, Permissions::ReadOnly)
.materialize_directory(chroot.path().to_path_buf(), digest, Permissions::ReadOnly)
.await?;
Ok(path)
let src = chroot.into_path();
let dest = self.workdir.path().join(digest.hash.to_hex());
rename_readonly_directory(&src, &dest, |e| {
// TODO(John Sirois): This diagnostic is over the top and should be trimmed down once
// we have confidence in the fix. We've had issues with permission denied errors in
// the past though; so all this information is in here to root-cause the issue should
// it persist.
let maybe_collision_metadata = std::fs::metadata(&dest);
let maybe_unwriteable_parent_metadata = dest
.parent()
.ok_or(format!(
"The destination directory for digest {:?} of {:?} has no parent dir.",
&digest, &dest
))
.map(|p| std::fs::metadata(&p));
format!(
"Failed to move materialized immutable input for {digest:?} from {src:?} to \
{dest:?}: {err}\n\
Parent directory (un-writeable parent dir?) metadata: {parent_metadata:?}\n\
Destination directory (collision?) metadata: {existing_metadata:?}\n\
Current immutable check outs (~dup fingerprints / differing sizes?): {contents:?}
",
digest = digest,
src = src,
dest = &dest,
// If the parent dir is un-writeable, which is unexpected, we will get permission
// denied on the rename.
parent_metadata = maybe_unwriteable_parent_metadata,
// If the destination directory already exists then we have a leaky locking regime or
// broken materialization failure cleanup.
existing_metadata = maybe_collision_metadata,
// Two digests that have different size_bytes but the same fingerprint is a bug in its
// own right, but would lead to making the same `digest_str` accessible via two
// different Digest keys here; so display all the keys and values to be able to spot
// this should it occur.
contents = self.contents.lock(),
err = e
)
})
.await?;
Ok::<_, String>(dest)
})
.await;
Ok(value?.clone())
.await?;
Ok(value.clone())
}

///
Expand Down
Loading