Skip to content

Commit

Permalink
Fix immutable inputs DCL bug.
Browse files Browse the repository at this point in the history
Previously we used the double-checked-cell-async crate (See:
https://github.com/chrislearn/double-checked-cell-async/blob/46cd3b04eddddbe279282143fe8a936d5854588c/src/lib.rs#L228-L260),
which performed the second check of the double check lock with relaxed
ordering. The relevant code is snipped below and annotated:

```rust
pub struct DoubleCheckedCell<T> {
    value: UnsafeCell<Option<T>>,
    initialized: AtomicBool,
    lock: Mutex<()>,
}

impl<T> DoubleCheckedCell<T> {
    pub fn new() -> DoubleCheckedCell<T> {
        DoubleCheckedCell {
            value: UnsafeCell::new(None),
            initialized: AtomicBool::new(false),
            lock: Mutex::new(()),
        }
    }

    pub async fn get_or_try_init<Fut, E>(&self, init: Fut) -> Result<&T, E>
    where
        Fut: Future<Output = Result<T, E>>
    {
        // 1.) 1st load & check.
        if !self.initialized.load(Ordering::Acquire) {
            // 2.) Lock.
            let _lock = self.lock.lock().await;
            // 3.) 2nd load & check.
            if !self.initialized.load(Ordering::Relaxed) {
                {
                    // 4.) Critical section.
                    let result = init.await?;
                    let value = unsafe { &mut *self.value.get() };
                    value.replace(result);
                }
                // 5.) Store with lock held.
                self.initialized.store(true, Ordering::Release);
            }
        }
        let value = unsafe { &*self.value.get() };
        Ok(unsafe { value.as_ref().unchecked_unwrap() })
    }
}
```

Per the C++11 memory model used by Rust (See:
https://en.cppreference.com/w/cpp/language/memory_model), this would
seem to indicate the second load could be reordered to occur anywhere
after the 1st load with acquire ordering and anywhere before the store
with released ordering. If that second load was reordered to occur
before the lock was acquired, two threads could enter the critical
section in serial and the second thread would try to materialize to
paths that already were created and marked read-only. Switch to the
async-oncecell crate which performs both loads of the double-checked
lock with acquire ordering, ensuring they are not re-ordered with
respect to the interleaved non-atomics code.

Also fixup the materialization process to be atomic. We now cleanup
materialization chroots when materialization fails and only move their
contents to the destination path if the full materialization has
succeeded.

Fixes pantsbuild#13899

[ci skip-build-wheels]
  • Loading branch information
jsirois committed Dec 28, 2021
1 parent aa79833 commit 71a2369
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 61 deletions.
41 changes: 12 additions & 29 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ cache = { path = "cache" }
concrete_time = { path = "concrete_time" }
crossbeam-channel = "0.4"
derivative = "2.2"
double-checked-cell-async = "2.0"
async-oncecell = "0.2"
either = "1.6"
fnv = "1.0.5"
fs = { path = "fs" }
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/fs/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async-trait = "=0.1.42"
protos = { path = "../../protos" }
bytes = "1.0"
concrete_time = { path = "../../concrete_time" }
double-checked-cell-async = "2.0"
async-oncecell = "0.2"
grpc_util = { path = "../../grpc_util" }
fs = { path = ".." }
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};

use async_oncecell::OnceCell;
use async_trait::async_trait;
use bytes::Bytes;
use double_checked_cell_async::DoubleCheckedCell;
use fs::{default_cache_path, DigestEntry, FileContent, FileEntry, Permissions, RelativePath};
use futures::future::{self, BoxFuture, Either, FutureExt, TryFutureExt};
use grpc_util::prost::MessageExt;
Expand Down Expand Up @@ -235,7 +235,7 @@ impl Store {
upload_timeout: Duration,
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<Store, String> {
Ok(Store {
Expand Down
9 changes: 4 additions & 5 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::ops::Range;
use std::sync::Arc;
use std::time::{Duration, Instant};

use async_oncecell::OnceCell;
use bytes::{Bytes, BytesMut};
use double_checked_cell_async::DoubleCheckedCell;
use futures::Future;
use futures::StreamExt;
use grpc_util::retry::{retry_call, status_is_retryable};
Expand All @@ -32,7 +32,7 @@ pub struct ByteStore {
_rpc_attempts: usize,
byte_stream_client: Arc<ByteStreamClient<LayeredService>>,
cas_client: Arc<ContentAddressableStorageClient<LayeredService>>,
capabilities_cell: Arc<DoubleCheckedCell<ServerCapabilities>>,
capabilities_cell: Arc<OnceCell<ServerCapabilities>>,
capabilities_client: Arc<CapabilitiesClient<LayeredService>>,
batch_api_size_limit: usize,
}
Expand Down Expand Up @@ -76,7 +76,7 @@ impl ByteStore {
upload_timeout: Duration,
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
) -> Result<ByteStore, String> {
let tls_client_config = if cas_address.starts_with("https://") {
Expand Down Expand Up @@ -107,8 +107,7 @@ impl ByteStore {
_rpc_attempts: rpc_retries + 1,
byte_stream_client,
cas_client,
capabilities_cell: capabilities_cell_opt
.unwrap_or_else(|| Arc::new(DoubleCheckedCell::new())),
capabilities_cell: capabilities_cell_opt.unwrap_or_else(|| Arc::new(OnceCell::new())),
capabilities_client,
batch_api_size_limit,
})
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ parking_lot = "0.11"
itertools = "0.10"
serde = "1.0.104"
bincode = "1.2.1"
double-checked-cell-async = "2.0"
async-oncecell = "0.2"
rand = "0.8"
prost = "0.9"
prost-types = "0.9"
Expand Down
51 changes: 38 additions & 13 deletions src/rust/engine/process_execution/src/immutable_inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ use std::collections::{BTreeMap, HashMap};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use double_checked_cell_async::DoubleCheckedCell;
use async_oncecell::OnceCell;
use fs::{Permissions, RelativePath};
use futures::TryFutureExt;
use hashing::Digest;
use parking_lot::Mutex;
use store::Store;
Expand All @@ -18,17 +19,20 @@ pub struct ImmutableInputs {
workdir: TempDir,
// A map from Digest to the location it has been materialized at. The DoubleCheckedCell allows
// for cooperation between threads attempting to create Digests.
contents: Mutex<HashMap<Digest, Arc<DoubleCheckedCell<PathBuf>>>>,
contents: Mutex<HashMap<Digest, Arc<OnceCell<PathBuf>>>>,
}

impl ImmutableInputs {
pub fn new(store: Store, base: &Path) -> Result<Self, String> {
let workdir = TempDir::new_in(base).map_err(|e| {
format!(
"Failed to create temporary directory for immutable inputs: {}",
e
)
})?;
let workdir = tempfile::Builder::new()
.prefix("immutable_inputs")
.tempdir_in(base)
.map_err(|e| {
format!(
"Failed to create temporary directory for immutable inputs: {}",
e
)
})?;
Ok(Self {
store,
workdir,
Expand All @@ -42,20 +46,41 @@ impl ImmutableInputs {
let value: Result<_, String> = cell
.get_or_try_init(async {
let digest_str = digest.hash.to_hex();

let path = self.workdir.path().join(digest_str);
if let Ok(meta) = tokio::fs::metadata(&path).await {
// TODO: If this error triggers, it indicates that we have previously checked out this
// directory, either due to a race condition, or due to a previous failure to
// materialize. See https://github.com/pantsbuild/pants/issues/13899
// If this error triggers, it indicates that we have previously checked out this
// directory, and OnceCell double checked locking is broken.
// We can remove this sanity-check if we gain confidence that OnceCell does its job.
// See: https://github.com/pantsbuild/pants/issues/13899
return Err(format!(
"Destination for immutable digest already exists: {:?}",
meta
));
}

let chroot = TempDir::new_in(self.workdir.path()).map_err(|e| {
format!(
"Failed to created a temporary directory for materialization of immutable input \
digest {:?}: {}",
digest, e
)
})?;
self
.store
.materialize_directory(path.clone(), digest, Permissions::ReadOnly)
.materialize_directory(chroot.path().to_path_buf(), digest, Permissions::ReadOnly)
.await?;
let materialized_chroot = chroot.into_path();
tokio::fs::rename(&materialized_chroot, &path)
.map_err(|e| {
format!(
"Failed to move materialized chroot for {digest:?} from {chroot:?} to {dest:?}: \
{err}",
digest = digest,
chroot = materialized_chroot,
dest = &path,
err = e
)
})
.await?;
Ok(path)
})
Expand Down
9 changes: 4 additions & 5 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Duration, Instant};

use async_oncecell::OnceCell;
use async_trait::async_trait;
use bytes::Bytes;
use concrete_time::TimeSpan;
use double_checked_cell_async::DoubleCheckedCell;
use fs::{self, File, PathStat};
use futures::future::{self, BoxFuture, TryFutureExt};
use futures::FutureExt;
Expand Down Expand Up @@ -104,7 +104,7 @@ pub struct CommandRunner {
action_cache_client: Arc<ActionCacheClient<LayeredService>>,
overall_deadline: Duration,
retry_interval_duration: Duration,
capabilities_cell: Arc<DoubleCheckedCell<ServerCapabilities>>,
capabilities_cell: Arc<OnceCell<ServerCapabilities>>,
capabilities_client: Arc<CapabilitiesClient<LayeredService>>,
}

Expand All @@ -127,7 +127,7 @@ impl CommandRunner {
retry_interval_duration: Duration,
execution_concurrency_limit: usize,
cache_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
) -> Result<Self, String> {
let execution_use_tls = execution_address.starts_with("https://");
let store_use_tls = store_address.starts_with("https://");
Expand Down Expand Up @@ -177,8 +177,7 @@ impl CommandRunner {
platform,
overall_deadline,
retry_interval_duration,
capabilities_cell: capabilities_cell_opt
.unwrap_or_else(|| Arc::new(DoubleCheckedCell::new())),
capabilities_cell: capabilities_cell_opt.unwrap_or_else(|| Arc::new(OnceCell::new())),
capabilities_client,
};

Expand Down
8 changes: 4 additions & 4 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use crate::session::{Session, Sessions};
use crate::tasks::{Rule, Tasks};
use crate::types::Types;

use async_oncecell::OnceCell;
use cache::PersistentCache;
use double_checked_cell_async::DoubleCheckedCell;
use fs::{safe_create_dir_all_ioerror, GitignoreStyleExcludes, PosixFS};
use graph::{self, EntryId, Graph, InvalidationResult, NodeContext};
use log::info;
Expand Down Expand Up @@ -133,7 +133,7 @@ impl Core {
remoting_opts: &RemotingOptions,
remote_store_address: &Option<String>,
root_ca_certs: &Option<Vec<u8>>,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
) -> Result<Store, String> {
let local_only = Store::local_only_with_options(
executor.clone(),
Expand Down Expand Up @@ -213,7 +213,7 @@ impl Core {
root_ca_certs: &Option<Vec<u8>>,
exec_strategy_opts: &ExecutionStrategyOptions,
remoting_opts: &RemotingOptions,
capabilities_cell_opt: Option<Arc<DoubleCheckedCell<ServerCapabilities>>>,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
) -> Result<Box<dyn CommandRunner>, String> {
let remote_caching_used =
exec_strategy_opts.remote_cache_read || exec_strategy_opts.remote_cache_write;
Expand Down Expand Up @@ -363,7 +363,7 @@ impl Core {
&& remoting_opts.execution_address == remoting_opts.store_address
&& remoting_opts.execution_headers == remoting_opts.store_headers
{
Some(Arc::new(DoubleCheckedCell::new()))
Some(Arc::new(OnceCell::new()))
} else {
None
};
Expand Down

0 comments on commit 71a2369

Please sign in to comment.