Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "failed to create hardlink" error due to multiple mounts on the same device (Cherry-pick of #19894) #19910

Merged
merged 1 commit into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
("tree", sub_match) => match expect_subcommand(sub_match) {
("materialize", args) => {
let destination = PathBuf::from(args.value_of("destination").unwrap());
// NB: We use `destination` as the root directory, because there is no need to
// memoize a check for whether some other parent directory is hardlinkable.
let destination_root = destination.clone();
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
let size_bytes = args
.value_of("size_bytes")
Expand All @@ -522,6 +525,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
store
.materialize_directory(
destination,
&destination_root,
output_digest,
false,
&BTreeSet::new(),
Expand All @@ -535,6 +539,9 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
("directory", sub_match) => match expect_subcommand(sub_match) {
("materialize", args) => {
let destination = PathBuf::from(args.value_of("destination").unwrap());
// NB: We use `destination` as the root directory, because there is no need to
// memoize a check for whether some other parent directory is hardlinkable.
let destination_root = destination.clone();
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
let size_bytes = args
.value_of("size_bytes")
Expand All @@ -546,6 +553,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
store
.materialize_directory(
destination,
&destination_root,
digest,
false,
&BTreeSet::new(),
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/fs/store/benches/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ pub fn criterion_benchmark_materialize(c: &mut Criterion) {
let _ = executor
.block_on(store.materialize_directory(
dest,
parent_dest_path,
digest.clone(),
false,
&BTreeSet::new(),
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/fs/store/src/immutable_inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl ImmutableInputs {
.store
.materialize_directory(
dest.clone(),
self.0.workdir.path(),
directory_digest,
false,
&BTreeSet::new(),
Expand Down
57 changes: 21 additions & 36 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use std::fs::OpenOptions;
use std::fs::Permissions as FSPermissions;
use std::future::Future;
use std::io::Write;
use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt};
use std::os::unix::fs::{OpenOptionsExt, PermissionsExt};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
Expand All @@ -56,7 +56,7 @@ use fs::{
FileEntry, Link, PathStat, Permissions, RelativePath, SymlinkBehavior, SymlinkEntry,
EMPTY_DIRECTORY_DIGEST,
};
use futures::future::{self, BoxFuture, Either, FutureExt};
use futures::future::{self, BoxFuture, Either, FutureExt, TryFutureExt};
use grpc_util::prost::MessageExt;
use hashing::{Digest, Fingerprint};
use local::ByteStore;
Expand Down Expand Up @@ -388,11 +388,6 @@ impl Store {
default_cache_path().join("lmdb_store")
}

/// Return the device ID that the local Store is hosted on.
pub fn local_filesystem_device(&self) -> u64 {
self.local.filesystem_device()
}

///
/// Remove a file locally, returning true if it existed, or false otherwise.
///
Expand Down Expand Up @@ -1183,14 +1178,24 @@ impl Store {
/// an existing destination directory, meaning that directory and file creation must be
/// idempotent.
///
/// If the destination (more specifically, the given parent directory of the destination, for
/// memoization purposes) is hardlinkable from the local store, and `!force_mutable`, hardlinks
/// may be used for large files which are not listed in `mutable_paths`.
///
pub async fn materialize_directory(
&self,
destination: PathBuf,
destination_root: &Path,
digest: DirectoryDigest,
force_mutable: bool,
mutable_paths: &BTreeSet<RelativePath>,
perms: Permissions,
) -> Result<(), StoreError> {
debug_assert!(
destination.starts_with(destination_root),
"The destination root must be a parent directory of the destination."
);

// Load the DigestTrie for the digest, and convert it into a mapping between a fully qualified
// parent path and its children.
let mut parent_to_child = HashMap::new();
Expand All @@ -1210,41 +1215,21 @@ impl Store {
}

// Create the root, and determine what filesystem it and the store are on.
let materializing_to_same_filesystem = {
let store_filesystem_device = self.local_filesystem_device();
self
.local
.executor()
.spawn_blocking(
{
let destination = destination.clone();
move || {
std::fs::create_dir_all(&destination).map_err(|e| {
format!("Failed to create directory {}: {e}", destination.display())
})?;
let dest_device = destination
.metadata()
.map_err(|e| {
format!(
"Failed to get metadata for destination {}: {e}",
destination.display()
)
})?
.dev();
Ok(dest_device == store_filesystem_device)
}
},
|e| Err(format!("Directory creation task failed: {e}")),
)
.await?
let destination_is_hardlinkable = {
let (_, destination_is_hardlinkable) = tokio::try_join!(
tokio::fs::create_dir_all(&destination)
.map_err(|e| format!("Failed to create directory {}: {e}", destination.display())),
self.local.is_hardlinkable_destination(destination_root)
)?;
destination_is_hardlinkable
};

self
.materialize_directory_children(
destination.clone(),
destination,
true,
force_mutable,
materializing_to_same_filesystem,
destination_is_hardlinkable,
&parent_to_child,
&mutable_path_ancestors,
perms,
Expand Down
88 changes: 66 additions & 22 deletions src/rust/engine/fs/store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use super::{EntryType, ShrinkBehavior};

use core::future::Future;
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
Expand All @@ -21,6 +20,7 @@ use sharded_lmdb::ShardedLmdb;
use std::os::unix::fs::PermissionsExt;
use task_executor::Executor;
use tempfile::Builder;
use tokio::fs::hard_link;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use workunit_store::ObservationMetric;

Expand Down Expand Up @@ -144,12 +144,16 @@ impl UnderlyingByteStore for ShardedLmdb {
}

// We shard so there isn't a plethora of entries in one single dir.
//
// TODO: Add Arc'd inner struct to reduce clone costs.
#[derive(Debug, Clone)]
pub(crate) struct ShardedFSDB {
root: PathBuf,
executor: Executor,
lease_time: Duration,
dest_initializer: Arc<Mutex<HashMap<Fingerprint, Arc<OnceCell<()>>>>>,
// A cache of whether destination root directories are hardlinkable from the fsdb.
hardlinkable_destinations: Arc<Mutex<HashMap<PathBuf, Arc<OnceCell<bool>>>>>,
}

enum VerifiedCopyError {
Expand All @@ -169,6 +173,57 @@ impl ShardedFSDB {
self.root.join(hex.get(0..2).unwrap()).join(hex)
}

async fn is_hardlinkable_destination(&self, destination: &Path) -> Result<bool, String> {
let cell = {
let mut cells = self.hardlinkable_destinations.lock();
if let Some(cell) = cells.get(destination) {
cell.clone()
} else {
let cell = Arc::new(OnceCell::new());
cells.insert(destination.to_owned(), cell.clone());
cell
}
};

if let Some(res) = cell.get() {
return Ok(*res);
}

let fsdb = self.clone();
let dst_parent_dir = destination.to_owned();
cell
.get_or_try_init(async move {
let src_display = fsdb.root.display().to_string();
let dst_display = dst_parent_dir.display().to_string();
tokio::fs::create_dir_all(&dst_parent_dir)
.await
.map_err(|e| format!("Failed to create directory: {e}"))?;
let (src_file, dst_dir) = fsdb
.executor
.spawn_blocking(
move || {
let src_file = Builder::new()
.suffix(".hardlink_canary")
.tempfile_in(&fsdb.root)
.map_err(|e| format!("Failed to create hardlink canary file: {e}"))?;
let dst_dir = Builder::new()
.suffix(".hardlink_canary")
.tempdir_in(dst_parent_dir)
.map_err(|e| format!("Failed to create hardlink canary dir: {e}"))?;
Ok((src_file, dst_dir))
},
|e| Err(format!("hardlink canary temp files task failed: {e}")),
)
.await?;
let dst_file = dst_dir.path().join("hard_link");
let is_hardlinkable = hard_link(src_file, dst_file).await.is_ok();
log::debug!("{src_display} -> {dst_display} hardlinkable: {is_hardlinkable}");
Ok(is_hardlinkable)
})
.await
.copied()
}

async fn bytes_writer(
mut file: tokio::fs::File,
bytes: &Bytes,
Expand Down Expand Up @@ -477,8 +532,6 @@ struct InnerStore {
file_lmdb: Result<Arc<ShardedLmdb>, String>,
directory_lmdb: Result<Arc<ShardedLmdb>, String>,
file_fsdb: ShardedFSDB,
executor: task_executor::Executor,
filesystem_device: u64,
}

impl ByteStore {
Expand All @@ -501,16 +554,8 @@ impl ByteStore {

std::fs::create_dir_all(root)
.map_err(|e| format!("Failed to create {}: {e}", root.display()))?;

let filesystem_device = root
.metadata()
.map_err(|e| {
format!(
"Failed to get metadata for store root {}: {e}",
root.display()
)
})?
.dev();
std::fs::create_dir_all(&fsdb_files_root)
.map_err(|e| format!("Failed to create {}: {e}", fsdb_files_root.display()))?;

Ok(ByteStore {
inner: Arc::new(InnerStore {
Expand All @@ -531,23 +576,22 @@ impl ByteStore {
)
.map(Arc::new),
file_fsdb: ShardedFSDB {
executor: executor.clone(),
executor: executor,
root: fsdb_files_root,
lease_time: options.lease_time,
dest_initializer: Arc::new(Mutex::default()),
hardlinkable_destinations: Arc::new(Mutex::default()),
},
executor,
filesystem_device,
}),
})
}

pub fn executor(&self) -> &task_executor::Executor {
&self.inner.executor
}

pub fn filesystem_device(&self) -> u64 {
self.inner.filesystem_device
pub async fn is_hardlinkable_destination(&self, destination: &Path) -> Result<bool, String> {
self
.inner
.file_fsdb
.is_hardlinkable_destination(destination)
.await
}

pub async fn entry_type(&self, fingerprint: Fingerprint) -> Result<Option<EntryType>, String> {
Expand Down
8 changes: 6 additions & 2 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,7 @@ async fn materialize_missing_directory() {
store
.materialize_directory(
materialize_dir.path().to_owned(),
materialize_dir.path(),
TestDirectory::recursive().directory_digest(),
false,
&BTreeSet::new(),
Expand Down Expand Up @@ -1181,6 +1182,7 @@ async fn materialize_directory(perms: Permissions, executable_file: bool) {
store
.materialize_directory(
materialize_dir.path().to_owned(),
materialize_dir.path(),
recursive_testdir.directory_digest(),
false,
&BTreeSet::new(),
Expand Down Expand Up @@ -1644,11 +1646,13 @@ async fn explicitly_overwrites_already_existing_file() {
.directory(&contents_dir)
.file(&cas_file)
.build();
let store = new_store(tempfile::tempdir().unwrap(), &cas.address()).await;
let store_dir = tempfile::tempdir().unwrap();
let store = new_store(store_dir.path(), &cas.address()).await;

store
.materialize_directory(
dir_to_write_to.path().to_owned(),
dir_to_write_to.path(),
contents_dir.directory_digest(),
false,
&BTreeSet::new(),
Expand All @@ -1661,7 +1665,6 @@ async fn explicitly_overwrites_already_existing_file() {
assert_eq!(file_contents, b"abc123".to_vec());
}

#[ignore] // see #17754
#[tokio::test]
async fn big_file_immutable_link() {
let materialize_dir = TempDir::new().unwrap();
Expand Down Expand Up @@ -1724,6 +1727,7 @@ async fn big_file_immutable_link() {
store
.materialize_directory(
materialize_dir.path().to_owned(),
materialize_dir.path(),
directory_digest,
false,
&BTreeSet::from([
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/docker/src/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ impl<'a> process_execution::CommandRunner for CommandRunner<'a> {
// DOCKER-NOTE: The input root will be bind mounted into the container.
let exclusive_spawn = prepare_workdir(
workdir.path().to_owned(),
&self.work_dir_base,
&req,
req.input_digests.inputs.clone(),
&self.store,
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/pe_nailgun/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl process_execution::CommandRunner for CommandRunner {
// Prepare the workdir.
let exclusive_spawn = prepare_workdir(
nailgun_process.workdir_path().to_owned(),
self.nailgun_pool.workdir_base(),
&client_req,
client_req.input_digests.inputs.clone(),
&self.store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ impl NailgunPool {
}
}

pub fn workdir_base(&self) -> &Path {
&self.workdir_base
}

///
/// Given a name and a `Process` configuration, return a port of a nailgun server running
/// under that name and configuration.
Expand Down Expand Up @@ -367,6 +371,7 @@ impl NailgunProcess {
// simpler.
prepare_workdir(
workdir.path().to_owned(),
workdir_base,
&startup_options,
startup_options.input_digests.inputs.clone(),
store,
Expand Down
Loading