Skip to content

Commit

Permalink
Fix "failed to create hardlink" error due to multiple mounts on the s…
Browse files Browse the repository at this point in the history
…ame device (#19894)

As described in #18757, the "check that source and dest are on same
device" strategy that was introduced in #18153 to decide whether we
could hardlink when materializing files was not robust when faced with
the same device being mounted in multiple locations.

This change moves to a "create a canary" strategy for deciding when
hardlinking between two destinations is legal. Hardlinks are canaried
and memoized on a per-destination-root basis, so this strategy might
actually be slightly cheaper than the previous one.

Fixes #18757.
  • Loading branch information
stuhood committed Sep 22, 2023
1 parent 95e1c59 commit c878d3b
Show file tree
Hide file tree
Showing 15 changed files with 131 additions and 70 deletions.
8 changes: 8 additions & 0 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
("tree", sub_match) => match expect_subcommand(sub_match) {
("materialize", args) => {
let destination = PathBuf::from(args.value_of("destination").unwrap());
// NB: We use `destination` as the root directory, because there is no need to
// memoize a check for whether some other parent directory is hardlinkable.
let destination_root = destination.clone();
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
let size_bytes = args
.value_of("size_bytes")
Expand All @@ -522,6 +525,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
store
.materialize_directory(
destination,
&destination_root,
output_digest,
false,
&BTreeSet::new(),
Expand All @@ -535,6 +539,9 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
("directory", sub_match) => match expect_subcommand(sub_match) {
("materialize", args) => {
let destination = PathBuf::from(args.value_of("destination").unwrap());
// NB: We use `destination` as the root directory, because there is no need to
// memoize a check for whether some other parent directory is hardlinkable.
let destination_root = destination.clone();
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
let size_bytes = args
.value_of("size_bytes")
Expand All @@ -546,6 +553,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
store
.materialize_directory(
destination,
&destination_root,
digest,
false,
&BTreeSet::new(),
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/fs/store/benches/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub fn criterion_benchmark_materialize(c: &mut Criterion) {
let _ = executor
.block_on(store.materialize_directory(
dest,
parent_dest_path,
digest.clone(),
false,
&BTreeSet::new(),
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/fs/store/src/immutable_inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl ImmutableInputs {
.store
.materialize_directory(
dest.clone(),
self.0.workdir.path(),
directory_digest,
false,
&BTreeSet::new(),
Expand Down
57 changes: 21 additions & 36 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use std::fs::OpenOptions;
use std::fs::Permissions as FSPermissions;
use std::future::Future;
use std::io::Write;
use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt};
use std::os::unix::fs::{OpenOptionsExt, PermissionsExt};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
Expand All @@ -56,7 +56,7 @@ use fs::{
FileEntry, Link, PathStat, Permissions, RelativePath, SymlinkBehavior, SymlinkEntry,
EMPTY_DIRECTORY_DIGEST,
};
use futures::future::{self, BoxFuture, Either, FutureExt};
use futures::future::{self, BoxFuture, Either, FutureExt, TryFutureExt};
use grpc_util::prost::MessageExt;
use hashing::{Digest, Fingerprint};
use local::ByteStore;
Expand Down Expand Up @@ -388,11 +388,6 @@ impl Store {
default_cache_path().join("lmdb_store")
}

/// Return the device ID that the local Store is hosted on.
pub fn local_filesystem_device(&self) -> u64 {
self.local.filesystem_device()
}

///
/// Remove a file locally, returning true if it existed, or false otherwise.
///
Expand Down Expand Up @@ -1183,14 +1178,24 @@ impl Store {
/// an existing destination directory, meaning that directory and file creation must be
/// idempotent.
///
/// If the destination (more specifically, the given parent directory of the destination, for
/// memoization purposes) is hardlinkable from the local store, and `!force_mutable`, hardlinks
/// may be used for large files which are not listed in `mutable_paths`.
///
pub async fn materialize_directory(
&self,
destination: PathBuf,
destination_root: &Path,
digest: DirectoryDigest,
force_mutable: bool,
mutable_paths: &BTreeSet<RelativePath>,
perms: Permissions,
) -> Result<(), StoreError> {
debug_assert!(
destination.starts_with(destination_root),
"The destination root must be a parent directory of the destination."
);

// Load the DigestTrie for the digest, and convert it into a mapping between a fully qualified
// parent path and its children.
let mut parent_to_child = HashMap::new();
Expand All @@ -1210,41 +1215,21 @@ impl Store {
}

// Create the root, and determine what filesystem it and the store are on.
let materializing_to_same_filesystem = {
let store_filesystem_device = self.local_filesystem_device();
self
.local
.executor()
.spawn_blocking(
{
let destination = destination.clone();
move || {
std::fs::create_dir_all(&destination).map_err(|e| {
format!("Failed to create directory {}: {e}", destination.display())
})?;
let dest_device = destination
.metadata()
.map_err(|e| {
format!(
"Failed to get metadata for destination {}: {e}",
destination.display()
)
})?
.dev();
Ok(dest_device == store_filesystem_device)
}
},
|e| Err(format!("Directory creation task failed: {e}")),
)
.await?
let destination_is_hardlinkable = {
let (_, destination_is_hardlinkable) = tokio::try_join!(
tokio::fs::create_dir_all(&destination)
.map_err(|e| format!("Failed to create directory {}: {e}", destination.display())),
self.local.is_hardlinkable_destination(destination_root)
)?;
destination_is_hardlinkable
};

self
.materialize_directory_children(
destination.clone(),
destination,
true,
force_mutable,
materializing_to_same_filesystem,
destination_is_hardlinkable,
&parent_to_child,
&mutable_path_ancestors,
perms,
Expand Down
88 changes: 66 additions & 22 deletions src/rust/engine/fs/store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use super::{EntryType, ShrinkBehavior};

use core::future::Future;
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
Expand All @@ -21,6 +20,7 @@ use sharded_lmdb::ShardedLmdb;
use std::os::unix::fs::PermissionsExt;
use task_executor::Executor;
use tempfile::Builder;
use tokio::fs::hard_link;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use workunit_store::ObservationMetric;

Expand Down Expand Up @@ -144,12 +144,16 @@ impl UnderlyingByteStore for ShardedLmdb {
}

// We shard so there isn't a plethora of entries in one single dir.
//
// TODO: Add Arc'd inner struct to reduce clone costs.
#[derive(Debug, Clone)]
pub(crate) struct ShardedFSDB {
root: PathBuf,
executor: Executor,
lease_time: Duration,
dest_initializer: Arc<Mutex<HashMap<Fingerprint, Arc<OnceCell<()>>>>>,
// A cache of whether destination root directories are hardlinkable from the fsdb.
hardlinkable_destinations: Arc<Mutex<HashMap<PathBuf, Arc<OnceCell<bool>>>>>,
}

enum VerifiedCopyError {
Expand All @@ -169,6 +173,57 @@ impl ShardedFSDB {
self.root.join(hex.get(0..2).unwrap()).join(hex)
}

async fn is_hardlinkable_destination(&self, destination: &Path) -> Result<bool, String> {
let cell = {
let mut cells = self.hardlinkable_destinations.lock();
if let Some(cell) = cells.get(destination) {
cell.clone()
} else {
let cell = Arc::new(OnceCell::new());
cells.insert(destination.to_owned(), cell.clone());
cell
}
};

if let Some(res) = cell.get() {
return Ok(*res);
}

let fsdb = self.clone();
let dst_parent_dir = destination.to_owned();
cell
.get_or_try_init(async move {
let src_display = fsdb.root.display().to_string();
let dst_display = dst_parent_dir.display().to_string();
tokio::fs::create_dir_all(&dst_parent_dir)
.await
.map_err(|e| format!("Failed to create directory: {e}"))?;
let (src_file, dst_dir) = fsdb
.executor
.spawn_blocking(
move || {
let src_file = Builder::new()
.suffix(".hardlink_canary")
.tempfile_in(&fsdb.root)
.map_err(|e| format!("Failed to create hardlink canary file: {e}"))?;
let dst_dir = Builder::new()
.suffix(".hardlink_canary")
.tempdir_in(dst_parent_dir)
.map_err(|e| format!("Failed to create hardlink canary dir: {e}"))?;
Ok((src_file, dst_dir))
},
|e| Err(format!("hardlink canary temp files task failed: {e}")),
)
.await?;
let dst_file = dst_dir.path().join("hard_link");
let is_hardlinkable = hard_link(src_file, dst_file).await.is_ok();
log::debug!("{src_display} -> {dst_display} hardlinkable: {is_hardlinkable}");
Ok(is_hardlinkable)
})
.await
.copied()
}

async fn bytes_writer(
mut file: tokio::fs::File,
bytes: &Bytes,
Expand Down Expand Up @@ -477,8 +532,6 @@ struct InnerStore {
file_lmdb: Result<Arc<ShardedLmdb>, String>,
directory_lmdb: Result<Arc<ShardedLmdb>, String>,
file_fsdb: ShardedFSDB,
executor: task_executor::Executor,
filesystem_device: u64,
}

impl ByteStore {
Expand All @@ -501,16 +554,8 @@ impl ByteStore {

std::fs::create_dir_all(root)
.map_err(|e| format!("Failed to create {}: {e}", root.display()))?;

let filesystem_device = root
.metadata()
.map_err(|e| {
format!(
"Failed to get metadata for store root {}: {e}",
root.display()
)
})?
.dev();
std::fs::create_dir_all(&fsdb_files_root)
.map_err(|e| format!("Failed to create {}: {e}", fsdb_files_root.display()))?;

Ok(ByteStore {
inner: Arc::new(InnerStore {
Expand All @@ -531,23 +576,22 @@ impl ByteStore {
)
.map(Arc::new),
file_fsdb: ShardedFSDB {
executor: executor.clone(),
executor: executor,
root: fsdb_files_root,
lease_time: options.lease_time,
dest_initializer: Arc::new(Mutex::default()),
hardlinkable_destinations: Arc::new(Mutex::default()),
},
executor,
filesystem_device,
}),
})
}

pub fn executor(&self) -> &task_executor::Executor {
&self.inner.executor
}

pub fn filesystem_device(&self) -> u64 {
self.inner.filesystem_device
pub async fn is_hardlinkable_destination(&self, destination: &Path) -> Result<bool, String> {
self
.inner
.file_fsdb
.is_hardlinkable_destination(destination)
.await
}

pub async fn entry_type(&self, fingerprint: Fingerprint) -> Result<Option<EntryType>, String> {
Expand Down
8 changes: 6 additions & 2 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,7 @@ async fn materialize_missing_directory() {
store
.materialize_directory(
materialize_dir.path().to_owned(),
materialize_dir.path(),
TestDirectory::recursive().directory_digest(),
false,
&BTreeSet::new(),
Expand Down Expand Up @@ -1181,6 +1182,7 @@ async fn materialize_directory(perms: Permissions, executable_file: bool) {
store
.materialize_directory(
materialize_dir.path().to_owned(),
materialize_dir.path(),
recursive_testdir.directory_digest(),
false,
&BTreeSet::new(),
Expand Down Expand Up @@ -1644,11 +1646,13 @@ async fn explicitly_overwrites_already_existing_file() {
.directory(&contents_dir)
.file(&cas_file)
.build();
let store = new_store(tempfile::tempdir().unwrap(), &cas.address()).await;
let store_dir = tempfile::tempdir().unwrap();
let store = new_store(store_dir.path(), &cas.address()).await;

store
.materialize_directory(
dir_to_write_to.path().to_owned(),
dir_to_write_to.path(),
contents_dir.directory_digest(),
false,
&BTreeSet::new(),
Expand All @@ -1661,7 +1665,6 @@ async fn explicitly_overwrites_already_existing_file() {
assert_eq!(file_contents, b"abc123".to_vec());
}

#[ignore] // see #17754
#[tokio::test]
async fn big_file_immutable_link() {
let materialize_dir = TempDir::new().unwrap();
Expand Down Expand Up @@ -1724,6 +1727,7 @@ async fn big_file_immutable_link() {
store
.materialize_directory(
materialize_dir.path().to_owned(),
materialize_dir.path(),
directory_digest,
false,
&BTreeSet::from([
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/docker/src/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ impl<'a> process_execution::CommandRunner for CommandRunner<'a> {
// DOCKER-NOTE: The input root will be bind mounted into the container.
let exclusive_spawn = prepare_workdir(
workdir.path().to_owned(),
&self.work_dir_base,
&req,
req.input_digests.inputs.clone(),
&self.store,
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/pe_nailgun/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl process_execution::CommandRunner for CommandRunner {
// Prepare the workdir.
let exclusive_spawn = prepare_workdir(
nailgun_process.workdir_path().to_owned(),
self.nailgun_pool.workdir_base(),
&client_req,
client_req.input_digests.inputs.clone(),
&self.store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ impl NailgunPool {
}
}

pub fn workdir_base(&self) -> &Path {
&self.workdir_base
}

///
/// Given a name and a `Process` configuration, return a port of a nailgun server running
/// under that name and configuration.
Expand Down Expand Up @@ -367,6 +371,7 @@ impl NailgunProcess {
// simpler.
prepare_workdir(
workdir.path().to_owned(),
workdir_base,
&startup_options,
startup_options.input_digests.inputs.clone(),
store,
Expand Down
Loading

0 comments on commit c878d3b

Please sign in to comment.