From c3988999680e9d57ba7c7dfad23b4e3329eee908 Mon Sep 17 00:00:00 2001 From: Joshua Cannon Date: Tue, 28 Mar 2023 19:08:49 -0500 Subject: [PATCH] Only materialize immutable files once per process (#18600) This PR removes `TempImmutableLargeFile` in favor of having the caller provide a writer function which writes to the `tokio::fs::File`, and we wrap the writing in the necessary setup/teardown code. This has a few benefits: - We can use a `OnceCell` to ensure the file is only trying to materialized by one thread - We can cleanup on error now (whoops that was embarrassing) - Eliminates a heisenbug I was seeing where we'd fail to hardlink with "No such file or directory". - I still don't understand the heisenbug, and I'm still worried we'd see it cross-process :cry: All in all, big win --- src/rust/engine/fs/store/src/lib.rs | 40 ++-- src/rust/engine/fs/store/src/local.rs | 197 ++++++++++++------- src/rust/engine/fs/store/src/remote.rs | 6 +- src/rust/engine/fs/store/src/remote_tests.rs | 3 +- 4 files changed, 155 insertions(+), 91 deletions(-) diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index 46c69ef2a6e..4650164a651 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -236,6 +236,19 @@ impl RemoteStore { .map(|&()| ()) } + async fn remote_writer( + remote_store: &remote::ByteStore, + digest: Digest, + file: tokio::fs::File, + ) -> Result { + remote_store.load_file(digest, file).await?.ok_or_else(|| { + StoreError::MissingDigest( + "Was not present in either the local or remote store".to_owned(), + digest, + ) + }) + } + /// Download the digest to the local byte store from this remote store. The function `f_remote` /// can be used to validate the bytes (NB. if provided, the whole value will be buffered into /// memory to provide the `Bytes` argument, and thus `f_remote` should only be used for small digests). @@ -247,31 +260,24 @@ impl RemoteStore { f_remote: Option<&(dyn Fn(Bytes) -> Result<(), String> + Send + Sync + 'static)>, ) -> Result<(), StoreError> { let remote_store = self.store.clone(); - let create_missing = || { - StoreError::MissingDigest( - "Was not present in either the local or remote store".to_owned(), - digest, - ) - }; self .maybe_download(digest, async move { let store_into_fsdb = f_remote.is_none() && ByteStore::should_use_fsdb(entry_type, digest.size_bytes); if store_into_fsdb { - let tempfile = local_store + local_store .get_file_fsdb() - .get_tempfile(digest.hash) + .write_using(digest.hash, |file| { + Self::remote_writer(&remote_store, digest, file) + }) .await?; - remote_store - .load_file(digest, tempfile.open().await?) - .await? - .ok_or_else(create_missing)?; - tempfile.persist().await?; } else { - let bytes = remote_store - .load_bytes(digest) - .await? - .ok_or_else(create_missing)?; + let bytes = remote_store.load_bytes(digest).await?.ok_or_else(|| { + StoreError::MissingDigest( + "Was not present in either the local or remote store".to_owned(), + digest, + ) + })?; if let Some(f_remote) = f_remote { f_remote(bytes.clone())?; } diff --git a/src/rust/engine/fs/store/src/local.rs b/src/rust/engine/fs/store/src/local.rs index b5b7d8631b1..2978607eaa3 100644 --- a/src/rust/engine/fs/store/src/local.rs +++ b/src/rust/engine/fs/store/src/local.rs @@ -2,24 +2,26 @@ // Licensed under the Apache License, Version 2.0 (see LICENSE). use super::{EntryType, ShrinkBehavior}; -use std::collections::{BinaryHeap, HashSet}; -use std::fmt::Debug; +use core::future::Future; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::os::unix::fs::MetadataExt; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; +use async_oncecell::OnceCell; use async_trait::async_trait; use bytes::Bytes; use futures::future::{self, join_all, try_join, try_join_all}; use hashing::{ async_copy_and_hash, async_verified_copy, AgedFingerprint, Digest, Fingerprint, EMPTY_DIGEST, }; +use parking_lot::Mutex; use sharded_lmdb::ShardedLmdb; use std::os::unix::fs::PermissionsExt; use task_executor::Executor; use tempfile::NamedTempFile; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; use workunit_store::ObservationMetric; /// How big a file must be to be stored as a file on disk. @@ -28,28 +30,6 @@ use workunit_store::ObservationMetric; // for somewhere between 2 and 3 uses of the corresponding entry to "break even". const LARGE_FILE_SIZE_LIMIT: usize = 512 * 1024; -#[derive(Debug, Clone)] -pub(crate) struct TempImmutableLargeFile { - tmp_path: PathBuf, - final_path: PathBuf, -} - -impl TempImmutableLargeFile { - pub async fn open(&self) -> tokio::io::Result { - tokio::fs::File::create(self.tmp_path.clone()).await - } - - pub async fn persist(&self) -> Result<(), String> { - tokio::fs::rename(self.tmp_path.clone(), self.final_path.clone()) - .await - .map_err(|e| format!("Error while renaming: {e}."))?; - tokio::fs::set_permissions(&self.final_path, std::fs::Permissions::from_mode(0o555)) - .await - .map_err(|e| e.to_string())?; - Ok(()) - } -} - /// Trait for the underlying storage, which is either a ShardedLMDB or a ShardedFS. #[async_trait] trait UnderlyingByteStore { @@ -169,6 +149,18 @@ pub(crate) struct ShardedFSDB { root: PathBuf, executor: Executor, lease_time: Duration, + dest_initializer: Arc>>>>, +} + +enum VerifiedCopyError { + CopyFailure(String), + DoesntMatch, +} + +impl From for VerifiedCopyError { + fn from(err: String) -> Self { + Self::CopyFailure(err) + } } impl ShardedFSDB { @@ -177,33 +169,100 @@ impl ShardedFSDB { self.root.join(hex.get(0..2).unwrap()).join(hex) } - pub(crate) async fn get_tempfile( + async fn bytes_writer( + mut file: tokio::fs::File, + bytes: &Bytes, + ) -> Result { + file + .write_all(bytes) + .await + .map_err(|e| format!("Failed to write bytes: {e}"))?; + Ok(file) + } + + async fn verified_copier( + mut file: tokio::fs::File, + expected_digest: Digest, + src_is_immutable: bool, + mut reader: R, + ) -> Result + where + R: AsyncRead + Unpin, + { + let matches = async_verified_copy(expected_digest, src_is_immutable, &mut reader, &mut file) + .await + .map_err(|e| VerifiedCopyError::CopyFailure(format!("Failed to copy bytes: {e}")))?; + if matches { + Ok(file) + } else { + Err(VerifiedCopyError::DoesntMatch) + } + } + + pub(crate) async fn write_using( &self, fingerprint: Fingerprint, - ) -> Result { - let dest_path = self.get_path(fingerprint); - tokio::fs::create_dir_all(dest_path.parent().unwrap()) + writer_func: F, + ) -> Result<(), E> + where + F: FnOnce(tokio::fs::File) -> Fut, + Fut: Future>, + // NB: The error type must be convertible from a string + E: std::convert::From, + { + let cell = self + .dest_initializer + .lock() + .entry(fingerprint) + .or_default() + .clone(); + cell + .get_or_try_init(async { + let dest_path = self.get_path(fingerprint); + tokio::fs::create_dir_all(dest_path.parent().unwrap()) + .await + .map_err(|e| format! {"Failed to create local store subdirectory {dest_path:?}: {e}"})?; + + let dest_path2 = dest_path.clone(); + // Make the tempfile in the same dir as the final file so that materializing the final file doesn't + // have to worry about parent dirs. + let named_temp_file = self + .executor + .spawn_blocking( + move || { + NamedTempFile::new_in(dest_path2.parent().unwrap()) + .map_err(|e| format!("Failed to create temp file: {e}")) + }, + |e| Err(format!("temp file creation task failed: {e}")), + ) + .await?; + let (std_file, tmp_path) = named_temp_file + .keep() + .map_err(|e| format!("Failed to keep temp file: {e}"))?; + + match writer_func(std_file.into()).await { + Ok(mut tokio_file) => { + tokio_file + .shutdown() + .await + .map_err(|e| format!("Failed to shutdown {tmp_path:?}: {e}"))?; + + tokio::fs::set_permissions(&tmp_path, std::fs::Permissions::from_mode(0o555)) + .await + .map_err(|e| format!("Failed to set permissions on {:?}: {e}", tmp_path))?; + tokio::fs::rename(tmp_path.clone(), dest_path.clone()) + .await + .map_err(|e| format!("Error while renaming: {e}."))?; + Ok(()) + } + Err(e) => { + let _ = tokio::fs::remove_file(tmp_path).await; + Err(e) + } + } + }) .await - .map_err(|e| format! {"Failed to create local store subdirectory {dest_path:?}: {e}"})?; - - let dest_path2 = dest_path.clone(); - // Make the tempfile in the same dir as the final file so that materializing the final file doesn't - // have to worry about parent dirs. - let named_temp_file = self - .executor - .spawn_blocking( - move || { - NamedTempFile::new_in(dest_path2.parent().unwrap()) - .map_err(|e| format!("Failed to create temp file: {e}")) - }, - |e| Err(format!("temp file creation task failed: {e}")), - ) - .await?; - let (_, tmp_path) = named_temp_file.keep().map_err(|e| e.to_string())?; - Ok(TempImmutableLargeFile { - tmp_path, - final_path: dest_path, - }) + .cloned() } } @@ -262,13 +321,9 @@ impl UnderlyingByteStore for ShardedFSDB { _initial_lease: bool, ) -> Result<(), String> { try_join_all(items.iter().map(|(fingerprint, bytes)| async move { - let tempfile = self.get_tempfile(*fingerprint).await?; - let mut dest = tempfile - .open() - .await - .map_err(|e| format!("Failed to open {tempfile:?}: {e}"))?; - dest.write_all(bytes).await.map_err(|e| e.to_string())?; - tempfile.persist().await?; + self + .write_using(*fingerprint, |file| Self::bytes_writer(file, bytes)) + .await?; Ok::<(), String>(()) })) .await?; @@ -283,20 +338,27 @@ impl UnderlyingByteStore for ShardedFSDB { expected_digest: Digest, src: PathBuf, ) -> Result<(), String> { - let dest = self.get_tempfile(expected_digest.hash).await?; let mut attempts = 0; loop { - let (mut reader, mut writer) = try_join(tokio::fs::File::open(src.clone()), dest.open()) + let reader = tokio::fs::File::open(src.clone()) .await - .map_err(|e| e.to_string())?; - // TODO: Consider using `fclonefileat` on macOS, which would skip actual copying (read+write), and - // instead just require verifying the resulting content after the syscall (read only). - let should_retry = - !async_verified_copy(expected_digest, src_is_immutable, &mut reader, &mut writer) - .await - .map_err(|e| e.to_string())?; + .map_err(|e| format!("Failed to open {src:?}: {e}"))?; + + // TODO: Consider using `fclonefileat` on macOS or checking for same filesystem+rename on Linux, + // which would skip actual copying (read+write), and instead just require verifying the + // resulting content after the syscall (read only). + let copy_result = self + .write_using(expected_digest.hash, |file| { + Self::verified_copier(file, expected_digest, src_is_immutable, reader) + }) + .await; + let should_retry = match copy_result { + Ok(()) => Ok(false), + Err(VerifiedCopyError::CopyFailure(s)) => Err(s), + Err(VerifiedCopyError::DoesntMatch) => Ok(true), + }; - if should_retry { + if should_retry? { attempts += 1; let msg = format!("Input {src:?} changed while reading."); log::debug!("{}", msg); @@ -304,8 +366,6 @@ impl UnderlyingByteStore for ShardedFSDB { return Err(format!("Failed to store {src:?}.")); } } else { - writer.flush().await.map_err(|e| e.to_string())?; - dest.persist().await?; break; } } @@ -460,6 +520,7 @@ impl ByteStore { executor: executor.clone(), root: fsdb_files_root, lease_time: options.lease_time, + dest_initializer: Arc::new(Mutex::default()), }, executor, filesystem_device, diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index da6e186e946..6cb6257d43f 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -493,11 +493,7 @@ impl ByteStore { digest: Digest, file: tokio::fs::File, ) -> Result, String> { - let mut result = self.load(digest, file).await; - if let Ok(Some(ref mut file)) = result { - file.rewind().await.map_err(|e| e.to_string())?; - } - result + self.load(digest, file).await } /// diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 726f044d428..e21ff69c124 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -8,7 +8,7 @@ use grpc_util::tls; use hashing::Digest; use mock::StubCAS; use testutil::data::{TestData, TestDirectory}; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; use workunit_store::WorkunitStore; use crate::remote::ByteStore; @@ -50,6 +50,7 @@ async fn loads_huge_file_via_temp_file() { .await .unwrap() .unwrap(); + file.rewind().await.unwrap(); let mut buf = String::new(); file.read_to_string(&mut buf).await.unwrap();