diff --git a/src/rust/engine/fs/fs_util/src/main.rs b/src/rust/engine/fs/fs_util/src/main.rs index 50a2321e180..3d567aef111 100644 --- a/src/rust/engine/fs/fs_util/src/main.rs +++ b/src/rust/engine/fs/fs_util/src/main.rs @@ -505,6 +505,9 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { ("tree", sub_match) => match expect_subcommand(sub_match) { ("materialize", args) => { let destination = PathBuf::from(args.value_of("destination").unwrap()); + // NB: We use `destination` as the root directory, because there is no need to + // memoize a check for whether some other parent directory is hardlinkable. + let destination_root = destination.clone(); let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?; let size_bytes = args .value_of("size_bytes") @@ -522,6 +525,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { store .materialize_directory( destination, + &destination_root, output_digest, false, &BTreeSet::new(), @@ -535,6 +539,9 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { ("directory", sub_match) => match expect_subcommand(sub_match) { ("materialize", args) => { let destination = PathBuf::from(args.value_of("destination").unwrap()); + // NB: We use `destination` as the root directory, because there is no need to + // memoize a check for whether some other parent directory is hardlinkable. + let destination_root = destination.clone(); let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?; let size_bytes = args .value_of("size_bytes") @@ -546,6 +553,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { store .materialize_directory( destination, + &destination_root, digest, false, &BTreeSet::new(), diff --git a/src/rust/engine/fs/store/benches/store.rs b/src/rust/engine/fs/store/benches/store.rs index 4a8243d14fb..555c6b7b4c0 100644 --- a/src/rust/engine/fs/store/benches/store.rs +++ b/src/rust/engine/fs/store/benches/store.rs @@ -77,6 +77,7 @@ pub fn criterion_benchmark_materialize(c: &mut Criterion) { let _ = executor .block_on(store.materialize_directory( dest, + parent_dest_path, digest.clone(), false, &BTreeSet::new(), diff --git a/src/rust/engine/fs/store/src/immutable_inputs.rs b/src/rust/engine/fs/store/src/immutable_inputs.rs index f916f75c6bc..6e9a8c1960e 100644 --- a/src/rust/engine/fs/store/src/immutable_inputs.rs +++ b/src/rust/engine/fs/store/src/immutable_inputs.rs @@ -105,6 +105,7 @@ impl ImmutableInputs { .store .materialize_directory( dest.clone(), + self.0.workdir.path(), directory_digest, false, &BTreeSet::new(), diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 240e368691a..b66b7855705 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -43,7 +43,7 @@ use std::fs::OpenOptions; use std::fs::Permissions as FSPermissions; use std::future::Future; use std::io::Write; -use std::os::unix::fs::{MetadataExt, OpenOptionsExt, PermissionsExt}; +use std::os::unix::fs::{OpenOptionsExt, PermissionsExt}; use std::path::{Path, PathBuf}; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; @@ -56,7 +56,7 @@ use fs::{ FileEntry, Link, PathStat, Permissions, RelativePath, SymlinkBehavior, SymlinkEntry, EMPTY_DIRECTORY_DIGEST, }; -use futures::future::{self, BoxFuture, Either, FutureExt}; +use futures::future::{self, BoxFuture, Either, FutureExt, TryFutureExt}; use grpc_util::prost::MessageExt; use hashing::{Digest, Fingerprint}; use local::ByteStore; @@ -388,11 +388,6 @@ impl Store { default_cache_path().join("lmdb_store") } - /// Return the device ID that the local Store is hosted on. - pub fn local_filesystem_device(&self) -> u64 { - self.local.filesystem_device() - } - /// /// Remove a file locally, returning true if it existed, or false otherwise. /// @@ -1183,14 +1178,24 @@ impl Store { /// an existing destination directory, meaning that directory and file creation must be /// idempotent. /// + /// If the destination (more specifically, the given parent directory of the destination, for + /// memoization purposes) is hardlinkable from the local store, and `!force_mutable`, hardlinks + /// may be used for large files which are not listed in `mutable_paths`. + /// pub async fn materialize_directory( &self, destination: PathBuf, + destination_root: &Path, digest: DirectoryDigest, force_mutable: bool, mutable_paths: &BTreeSet, perms: Permissions, ) -> Result<(), StoreError> { + debug_assert!( + destination.starts_with(destination_root), + "The destination root must be a parent directory of the destination." + ); + // Load the DigestTrie for the digest, and convert it into a mapping between a fully qualified // parent path and its children. let mut parent_to_child = HashMap::new(); @@ -1210,41 +1215,21 @@ impl Store { } // Create the root, and determine what filesystem it and the store are on. - let materializing_to_same_filesystem = { - let store_filesystem_device = self.local_filesystem_device(); - self - .local - .executor() - .spawn_blocking( - { - let destination = destination.clone(); - move || { - std::fs::create_dir_all(&destination).map_err(|e| { - format!("Failed to create directory {}: {e}", destination.display()) - })?; - let dest_device = destination - .metadata() - .map_err(|e| { - format!( - "Failed to get metadata for destination {}: {e}", - destination.display() - ) - })? - .dev(); - Ok(dest_device == store_filesystem_device) - } - }, - |e| Err(format!("Directory creation task failed: {e}")), - ) - .await? + let destination_is_hardlinkable = { + let (_, destination_is_hardlinkable) = tokio::try_join!( + tokio::fs::create_dir_all(&destination) + .map_err(|e| format!("Failed to create directory {}: {e}", destination.display())), + self.local.is_hardlinkable_destination(destination_root) + )?; + destination_is_hardlinkable }; self .materialize_directory_children( - destination.clone(), + destination, true, force_mutable, - materializing_to_same_filesystem, + destination_is_hardlinkable, &parent_to_child, &mutable_path_ancestors, perms, diff --git a/src/rust/engine/fs/store/src/local.rs b/src/rust/engine/fs/store/src/local.rs index 0ca9ef16cae..8bec5494576 100644 --- a/src/rust/engine/fs/store/src/local.rs +++ b/src/rust/engine/fs/store/src/local.rs @@ -4,7 +4,6 @@ use super::{EntryType, ShrinkBehavior}; use core::future::Future; use std::collections::{BinaryHeap, HashMap, HashSet}; -use std::os::unix::fs::MetadataExt; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; @@ -21,6 +20,7 @@ use sharded_lmdb::ShardedLmdb; use std::os::unix::fs::PermissionsExt; use task_executor::Executor; use tempfile::Builder; +use tokio::fs::hard_link; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; use workunit_store::ObservationMetric; @@ -144,12 +144,16 @@ impl UnderlyingByteStore for ShardedLmdb { } // We shard so there isn't a plethora of entries in one single dir. +// +// TODO: Add Arc'd inner struct to reduce clone costs. #[derive(Debug, Clone)] pub(crate) struct ShardedFSDB { root: PathBuf, executor: Executor, lease_time: Duration, dest_initializer: Arc>>>>, + // A cache of whether destination root directories are hardlinkable from the fsdb. + hardlinkable_destinations: Arc>>>>, } enum VerifiedCopyError { @@ -169,6 +173,57 @@ impl ShardedFSDB { self.root.join(hex.get(0..2).unwrap()).join(hex) } + async fn is_hardlinkable_destination(&self, destination: &Path) -> Result { + let cell = { + let mut cells = self.hardlinkable_destinations.lock(); + if let Some(cell) = cells.get(destination) { + cell.clone() + } else { + let cell = Arc::new(OnceCell::new()); + cells.insert(destination.to_owned(), cell.clone()); + cell + } + }; + + if let Some(res) = cell.get() { + return Ok(*res); + } + + let fsdb = self.clone(); + let dst_parent_dir = destination.to_owned(); + cell + .get_or_try_init(async move { + let src_display = fsdb.root.display().to_string(); + let dst_display = dst_parent_dir.display().to_string(); + tokio::fs::create_dir_all(&dst_parent_dir) + .await + .map_err(|e| format!("Failed to create directory: {e}"))?; + let (src_file, dst_dir) = fsdb + .executor + .spawn_blocking( + move || { + let src_file = Builder::new() + .suffix(".hardlink_canary") + .tempfile_in(&fsdb.root) + .map_err(|e| format!("Failed to create hardlink canary file: {e}"))?; + let dst_dir = Builder::new() + .suffix(".hardlink_canary") + .tempdir_in(dst_parent_dir) + .map_err(|e| format!("Failed to create hardlink canary dir: {e}"))?; + Ok((src_file, dst_dir)) + }, + |e| Err(format!("hardlink canary temp files task failed: {e}")), + ) + .await?; + let dst_file = dst_dir.path().join("hard_link"); + let is_hardlinkable = hard_link(src_file, dst_file).await.is_ok(); + log::debug!("{src_display} -> {dst_display} hardlinkable: {is_hardlinkable}"); + Ok(is_hardlinkable) + }) + .await + .copied() + } + async fn bytes_writer( mut file: tokio::fs::File, bytes: &Bytes, @@ -477,8 +532,6 @@ struct InnerStore { file_lmdb: Result, String>, directory_lmdb: Result, String>, file_fsdb: ShardedFSDB, - executor: task_executor::Executor, - filesystem_device: u64, } impl ByteStore { @@ -501,16 +554,8 @@ impl ByteStore { std::fs::create_dir_all(root) .map_err(|e| format!("Failed to create {}: {e}", root.display()))?; - - let filesystem_device = root - .metadata() - .map_err(|e| { - format!( - "Failed to get metadata for store root {}: {e}", - root.display() - ) - })? - .dev(); + std::fs::create_dir_all(&fsdb_files_root) + .map_err(|e| format!("Failed to create {}: {e}", fsdb_files_root.display()))?; Ok(ByteStore { inner: Arc::new(InnerStore { @@ -531,23 +576,22 @@ impl ByteStore { ) .map(Arc::new), file_fsdb: ShardedFSDB { - executor: executor.clone(), + executor: executor, root: fsdb_files_root, lease_time: options.lease_time, dest_initializer: Arc::new(Mutex::default()), + hardlinkable_destinations: Arc::new(Mutex::default()), }, - executor, - filesystem_device, }), }) } - pub fn executor(&self) -> &task_executor::Executor { - &self.inner.executor - } - - pub fn filesystem_device(&self) -> u64 { - self.inner.filesystem_device + pub async fn is_hardlinkable_destination(&self, destination: &Path) -> Result { + self + .inner + .file_fsdb + .is_hardlinkable_destination(destination) + .await } pub async fn entry_type(&self, fingerprint: Fingerprint) -> Result, String> { diff --git a/src/rust/engine/fs/store/src/tests.rs b/src/rust/engine/fs/store/src/tests.rs index 693b0723938..29ba60a35b2 100644 --- a/src/rust/engine/fs/store/src/tests.rs +++ b/src/rust/engine/fs/store/src/tests.rs @@ -1147,6 +1147,7 @@ async fn materialize_missing_directory() { store .materialize_directory( materialize_dir.path().to_owned(), + materialize_dir.path(), TestDirectory::recursive().directory_digest(), false, &BTreeSet::new(), @@ -1181,6 +1182,7 @@ async fn materialize_directory(perms: Permissions, executable_file: bool) { store .materialize_directory( materialize_dir.path().to_owned(), + materialize_dir.path(), recursive_testdir.directory_digest(), false, &BTreeSet::new(), @@ -1644,11 +1646,13 @@ async fn explicitly_overwrites_already_existing_file() { .directory(&contents_dir) .file(&cas_file) .build(); - let store = new_store(tempfile::tempdir().unwrap(), &cas.address()).await; + let store_dir = tempfile::tempdir().unwrap(); + let store = new_store(store_dir.path(), &cas.address()).await; store .materialize_directory( dir_to_write_to.path().to_owned(), + dir_to_write_to.path(), contents_dir.directory_digest(), false, &BTreeSet::new(), @@ -1661,7 +1665,6 @@ async fn explicitly_overwrites_already_existing_file() { assert_eq!(file_contents, b"abc123".to_vec()); } -#[ignore] // see #17754 #[tokio::test] async fn big_file_immutable_link() { let materialize_dir = TempDir::new().unwrap(); @@ -1724,6 +1727,7 @@ async fn big_file_immutable_link() { store .materialize_directory( materialize_dir.path().to_owned(), + materialize_dir.path(), directory_digest, false, &BTreeSet::from([ diff --git a/src/rust/engine/process_execution/docker/src/docker.rs b/src/rust/engine/process_execution/docker/src/docker.rs index 9679bcc14d5..2167f655290 100644 --- a/src/rust/engine/process_execution/docker/src/docker.rs +++ b/src/rust/engine/process_execution/docker/src/docker.rs @@ -462,6 +462,7 @@ impl<'a> process_execution::CommandRunner for CommandRunner<'a> { // DOCKER-NOTE: The input root will be bind mounted into the container. let exclusive_spawn = prepare_workdir( workdir.path().to_owned(), + &self.work_dir_base, &req, req.input_digests.inputs.clone(), &self.store, diff --git a/src/rust/engine/process_execution/pe_nailgun/src/lib.rs b/src/rust/engine/process_execution/pe_nailgun/src/lib.rs index 122d18218ee..6ea8afbc5df 100644 --- a/src/rust/engine/process_execution/pe_nailgun/src/lib.rs +++ b/src/rust/engine/process_execution/pe_nailgun/src/lib.rs @@ -207,6 +207,7 @@ impl process_execution::CommandRunner for CommandRunner { // Prepare the workdir. let exclusive_spawn = prepare_workdir( nailgun_process.workdir_path().to_owned(), + self.nailgun_pool.workdir_base(), &client_req, client_req.input_digests.inputs.clone(), &self.store, diff --git a/src/rust/engine/process_execution/pe_nailgun/src/nailgun_pool.rs b/src/rust/engine/process_execution/pe_nailgun/src/nailgun_pool.rs index 4b05135bffe..90fad7a9a4f 100644 --- a/src/rust/engine/process_execution/pe_nailgun/src/nailgun_pool.rs +++ b/src/rust/engine/process_execution/pe_nailgun/src/nailgun_pool.rs @@ -77,6 +77,10 @@ impl NailgunPool { } } + pub fn workdir_base(&self) -> &Path { + &self.workdir_base + } + /// /// Given a name and a `Process` configuration, return a port of a nailgun server running /// under that name and configuration. @@ -367,6 +371,7 @@ impl NailgunProcess { // simpler. prepare_workdir( workdir.path().to_owned(), + workdir_base, &startup_options, startup_options.input_digests.inputs.clone(), store, diff --git a/src/rust/engine/process_execution/pe_nailgun/src/tests.rs b/src/rust/engine/process_execution/pe_nailgun/src/tests.rs index 3d131b29668..7148ee9c26c 100644 --- a/src/rust/engine/process_execution/pe_nailgun/src/tests.rs +++ b/src/rust/engine/process_execution/pe_nailgun/src/tests.rs @@ -11,23 +11,24 @@ use workunit_store::WorkunitStore; use crate::NailgunPool; use crate::{NamedCaches, Process}; -fn pool(size: usize) -> (NailgunPool, NamedCaches, ImmutableInputs) { +fn pool(size: usize) -> (NailgunPool, NamedCaches, ImmutableInputs, TempDir) { let _ = WorkunitStore::setup_for_tests(); - let named_caches_dir = TempDir::new().unwrap(); - let store_dir = TempDir::new().unwrap(); + let base_dir = TempDir::new().unwrap(); + let named_caches_dir = base_dir.path().join("named"); + let store_dir = base_dir.path().join("store"); let executor = Executor::new(); - let store = Store::local_only(executor.clone(), store_dir.path()).unwrap(); - let base_dir = std::env::temp_dir(); + let store = Store::local_only(executor.clone(), &store_dir).unwrap(); - let pool = NailgunPool::new(base_dir.clone(), size, store.clone(), executor); + let pool = NailgunPool::new(base_dir.path().to_owned(), size, store.clone(), executor); ( pool, - NamedCaches::new_local(named_caches_dir.path().to_owned()), - ImmutableInputs::new(store, &base_dir).unwrap(), + NamedCaches::new_local(named_caches_dir), + ImmutableInputs::new(store, base_dir.path()).unwrap(), + base_dir, ) } -async fn run(pool: &(NailgunPool, NamedCaches, ImmutableInputs), port: u16) -> PathBuf { +async fn run(pool: &(NailgunPool, NamedCaches, ImmutableInputs, TempDir), port: u16) -> PathBuf { let mut p = pool .0 .acquire( diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 7bde6ce71a0..cf26608765b 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -211,6 +211,7 @@ impl super::CommandRunner for CommandRunner { // Prepare the workdir. let exclusive_spawn = prepare_workdir( workdir.path().to_owned(), + &self.work_dir_base, &req, req.input_digests.inputs.clone(), &self.store, @@ -677,6 +678,7 @@ pub async fn prepare_workdir_digest( /// pub async fn prepare_workdir( workdir_path: PathBuf, + workdir_root_path: &Path, req: &Process, materialized_input_digest: DirectoryDigest, store: &Store, @@ -717,6 +719,7 @@ pub async fn prepare_workdir( store .materialize_directory( workdir_path, + workdir_root_path, complete_input_digest, false, &mutable_paths, diff --git a/src/rust/engine/process_execution/src/local_tests.rs b/src/rust/engine/process_execution/src/local_tests.rs index 9b92d39a69d..8a0fc90d0ed 100644 --- a/src/rust/engine/process_execution/src/local_tests.rs +++ b/src/rust/engine/process_execution/src/local_tests.rs @@ -732,6 +732,7 @@ async fn prepare_workdir_exclusive_relative() { let exclusive_spawn = local::prepare_workdir( work_dir.path().to_owned(), + work_dir.path(), &process, TestDirectory::recursive().directory_digest(), &store, diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index de24a70e225..b8e44e2ab9f 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -372,9 +372,13 @@ async fn main() { .expect("Error executing"); if let Some(output) = args.materialize_output_to { + // NB: We use `output` as the root directory, because there is no need to + // memoize a check for whether some other parent directory is hardlinkable. + let output_root = output.clone(); store .materialize_directory( output, + &output_root, result.output_directory, false, &BTreeSet::new(), diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index d237fb7327f..e9e0c0061f9 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -1660,7 +1660,7 @@ fn write_digest( // Python will have already validated that path_prefix is a relative path. let mut destination = PathBuf::new(); - destination.push(core.build_root.clone()); + destination.push(&core.build_root); destination.push(path_prefix); for subpath in &clear_paths { @@ -1678,6 +1678,7 @@ fn write_digest( .store() .materialize_directory( destination.clone(), + &core.build_root, lifted_digest, true, // Force everything we write to be mutable &BTreeSet::new(), diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index dcdbdcb09ad..d33df568de5 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -626,6 +626,7 @@ fn interactive_process( )?; prepare_workdir( tempdir.path().to_owned(), + &context.core.local_execution_root_dir, &process, process.input_digests.inputs.clone(), &context.core.store(),