Skip to content

Commit

Permalink
Only materialize immutable files once per process (#18600)
Browse files Browse the repository at this point in the history
This PR removes `TempImmutableLargeFile` in favor of having the caller
provide a writer function which writes to the `tokio::fs::File`, and we
wrap the writing in the necessary setup/teardown code.

This has a few benefits:
- We can use a `OnceCell` to ensure the file is only trying to
materialized by one thread
- We can cleanup on error now (whoops that was embarrassing)
- Eliminates a heisenbug I was seeing where we'd fail to hardlink with
"No such file or directory".
- I still don't understand the heisenbug, and I'm still worried we'd see
it cross-process :cry:

All in all, big win
  • Loading branch information
thejcannon authored Mar 29, 2023
1 parent abeabac commit c398899
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 91 deletions.
40 changes: 23 additions & 17 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,19 @@ impl RemoteStore {
.map(|&()| ())
}

async fn remote_writer(
remote_store: &remote::ByteStore,
digest: Digest,
file: tokio::fs::File,
) -> Result<tokio::fs::File, StoreError> {
remote_store.load_file(digest, file).await?.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})
}

/// Download the digest to the local byte store from this remote store. The function `f_remote`
/// can be used to validate the bytes (NB. if provided, the whole value will be buffered into
/// memory to provide the `Bytes` argument, and thus `f_remote` should only be used for small digests).
Expand All @@ -247,31 +260,24 @@ impl RemoteStore {
f_remote: Option<&(dyn Fn(Bytes) -> Result<(), String> + Send + Sync + 'static)>,
) -> Result<(), StoreError> {
let remote_store = self.store.clone();
let create_missing = || {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
};
self
.maybe_download(digest, async move {
let store_into_fsdb =
f_remote.is_none() && ByteStore::should_use_fsdb(entry_type, digest.size_bytes);
if store_into_fsdb {
let tempfile = local_store
local_store
.get_file_fsdb()
.get_tempfile(digest.hash)
.write_using(digest.hash, |file| {
Self::remote_writer(&remote_store, digest, file)
})
.await?;
remote_store
.load_file(digest, tempfile.open().await?)
.await?
.ok_or_else(create_missing)?;
tempfile.persist().await?;
} else {
let bytes = remote_store
.load_bytes(digest)
.await?
.ok_or_else(create_missing)?;
let bytes = remote_store.load_bytes(digest).await?.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;
if let Some(f_remote) = f_remote {
f_remote(bytes.clone())?;
}
Expand Down
197 changes: 129 additions & 68 deletions src/rust/engine/fs/store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,26 @@
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use super::{EntryType, ShrinkBehavior};

use std::collections::{BinaryHeap, HashSet};
use std::fmt::Debug;
use core::future::Future;
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::os::unix::fs::MetadataExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};

use async_oncecell::OnceCell;
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::{self, join_all, try_join, try_join_all};
use hashing::{
async_copy_and_hash, async_verified_copy, AgedFingerprint, Digest, Fingerprint, EMPTY_DIGEST,
};
use parking_lot::Mutex;
use sharded_lmdb::ShardedLmdb;
use std::os::unix::fs::PermissionsExt;
use task_executor::Executor;
use tempfile::NamedTempFile;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use workunit_store::ObservationMetric;

/// How big a file must be to be stored as a file on disk.
Expand All @@ -28,28 +30,6 @@ use workunit_store::ObservationMetric;
// for somewhere between 2 and 3 uses of the corresponding entry to "break even".
const LARGE_FILE_SIZE_LIMIT: usize = 512 * 1024;

#[derive(Debug, Clone)]
pub(crate) struct TempImmutableLargeFile {
tmp_path: PathBuf,
final_path: PathBuf,
}

impl TempImmutableLargeFile {
pub async fn open(&self) -> tokio::io::Result<tokio::fs::File> {
tokio::fs::File::create(self.tmp_path.clone()).await
}

pub async fn persist(&self) -> Result<(), String> {
tokio::fs::rename(self.tmp_path.clone(), self.final_path.clone())
.await
.map_err(|e| format!("Error while renaming: {e}."))?;
tokio::fs::set_permissions(&self.final_path, std::fs::Permissions::from_mode(0o555))
.await
.map_err(|e| e.to_string())?;
Ok(())
}
}

/// Trait for the underlying storage, which is either a ShardedLMDB or a ShardedFS.
#[async_trait]
trait UnderlyingByteStore {
Expand Down Expand Up @@ -169,6 +149,18 @@ pub(crate) struct ShardedFSDB {
root: PathBuf,
executor: Executor,
lease_time: Duration,
dest_initializer: Arc<Mutex<HashMap<Fingerprint, Arc<OnceCell<()>>>>>,
}

enum VerifiedCopyError {
CopyFailure(String),
DoesntMatch,
}

impl From<String> for VerifiedCopyError {
fn from(err: String) -> Self {
Self::CopyFailure(err)
}
}

impl ShardedFSDB {
Expand All @@ -177,33 +169,100 @@ impl ShardedFSDB {
self.root.join(hex.get(0..2).unwrap()).join(hex)
}

pub(crate) async fn get_tempfile(
async fn bytes_writer(
mut file: tokio::fs::File,
bytes: &Bytes,
) -> Result<tokio::fs::File, String> {
file
.write_all(bytes)
.await
.map_err(|e| format!("Failed to write bytes: {e}"))?;
Ok(file)
}

async fn verified_copier<R>(
mut file: tokio::fs::File,
expected_digest: Digest,
src_is_immutable: bool,
mut reader: R,
) -> Result<tokio::fs::File, VerifiedCopyError>
where
R: AsyncRead + Unpin,
{
let matches = async_verified_copy(expected_digest, src_is_immutable, &mut reader, &mut file)
.await
.map_err(|e| VerifiedCopyError::CopyFailure(format!("Failed to copy bytes: {e}")))?;
if matches {
Ok(file)
} else {
Err(VerifiedCopyError::DoesntMatch)
}
}

pub(crate) async fn write_using<E, F, Fut>(
&self,
fingerprint: Fingerprint,
) -> Result<TempImmutableLargeFile, String> {
let dest_path = self.get_path(fingerprint);
tokio::fs::create_dir_all(dest_path.parent().unwrap())
writer_func: F,
) -> Result<(), E>
where
F: FnOnce(tokio::fs::File) -> Fut,
Fut: Future<Output = Result<tokio::fs::File, E>>,
// NB: The error type must be convertible from a string
E: std::convert::From<std::string::String>,
{
let cell = self
.dest_initializer
.lock()
.entry(fingerprint)
.or_default()
.clone();
cell
.get_or_try_init(async {
let dest_path = self.get_path(fingerprint);
tokio::fs::create_dir_all(dest_path.parent().unwrap())
.await
.map_err(|e| format! {"Failed to create local store subdirectory {dest_path:?}: {e}"})?;

let dest_path2 = dest_path.clone();
// Make the tempfile in the same dir as the final file so that materializing the final file doesn't
// have to worry about parent dirs.
let named_temp_file = self
.executor
.spawn_blocking(
move || {
NamedTempFile::new_in(dest_path2.parent().unwrap())
.map_err(|e| format!("Failed to create temp file: {e}"))
},
|e| Err(format!("temp file creation task failed: {e}")),
)
.await?;
let (std_file, tmp_path) = named_temp_file
.keep()
.map_err(|e| format!("Failed to keep temp file: {e}"))?;

match writer_func(std_file.into()).await {
Ok(mut tokio_file) => {
tokio_file
.shutdown()
.await
.map_err(|e| format!("Failed to shutdown {tmp_path:?}: {e}"))?;

tokio::fs::set_permissions(&tmp_path, std::fs::Permissions::from_mode(0o555))
.await
.map_err(|e| format!("Failed to set permissions on {:?}: {e}", tmp_path))?;
tokio::fs::rename(tmp_path.clone(), dest_path.clone())
.await
.map_err(|e| format!("Error while renaming: {e}."))?;
Ok(())
}
Err(e) => {
let _ = tokio::fs::remove_file(tmp_path).await;
Err(e)
}
}
})
.await
.map_err(|e| format! {"Failed to create local store subdirectory {dest_path:?}: {e}"})?;

let dest_path2 = dest_path.clone();
// Make the tempfile in the same dir as the final file so that materializing the final file doesn't
// have to worry about parent dirs.
let named_temp_file = self
.executor
.spawn_blocking(
move || {
NamedTempFile::new_in(dest_path2.parent().unwrap())
.map_err(|e| format!("Failed to create temp file: {e}"))
},
|e| Err(format!("temp file creation task failed: {e}")),
)
.await?;
let (_, tmp_path) = named_temp_file.keep().map_err(|e| e.to_string())?;
Ok(TempImmutableLargeFile {
tmp_path,
final_path: dest_path,
})
.cloned()
}
}

Expand Down Expand Up @@ -262,13 +321,9 @@ impl UnderlyingByteStore for ShardedFSDB {
_initial_lease: bool,
) -> Result<(), String> {
try_join_all(items.iter().map(|(fingerprint, bytes)| async move {
let tempfile = self.get_tempfile(*fingerprint).await?;
let mut dest = tempfile
.open()
.await
.map_err(|e| format!("Failed to open {tempfile:?}: {e}"))?;
dest.write_all(bytes).await.map_err(|e| e.to_string())?;
tempfile.persist().await?;
self
.write_using(*fingerprint, |file| Self::bytes_writer(file, bytes))
.await?;
Ok::<(), String>(())
}))
.await?;
Expand All @@ -283,29 +338,34 @@ impl UnderlyingByteStore for ShardedFSDB {
expected_digest: Digest,
src: PathBuf,
) -> Result<(), String> {
let dest = self.get_tempfile(expected_digest.hash).await?;
let mut attempts = 0;
loop {
let (mut reader, mut writer) = try_join(tokio::fs::File::open(src.clone()), dest.open())
let reader = tokio::fs::File::open(src.clone())
.await
.map_err(|e| e.to_string())?;
// TODO: Consider using `fclonefileat` on macOS, which would skip actual copying (read+write), and
// instead just require verifying the resulting content after the syscall (read only).
let should_retry =
!async_verified_copy(expected_digest, src_is_immutable, &mut reader, &mut writer)
.await
.map_err(|e| e.to_string())?;
.map_err(|e| format!("Failed to open {src:?}: {e}"))?;

// TODO: Consider using `fclonefileat` on macOS or checking for same filesystem+rename on Linux,
// which would skip actual copying (read+write), and instead just require verifying the
// resulting content after the syscall (read only).
let copy_result = self
.write_using(expected_digest.hash, |file| {
Self::verified_copier(file, expected_digest, src_is_immutable, reader)
})
.await;
let should_retry = match copy_result {
Ok(()) => Ok(false),
Err(VerifiedCopyError::CopyFailure(s)) => Err(s),
Err(VerifiedCopyError::DoesntMatch) => Ok(true),
};

if should_retry {
if should_retry? {
attempts += 1;
let msg = format!("Input {src:?} changed while reading.");
log::debug!("{}", msg);
if attempts > 10 {
return Err(format!("Failed to store {src:?}."));
}
} else {
writer.flush().await.map_err(|e| e.to_string())?;
dest.persist().await?;
break;
}
}
Expand Down Expand Up @@ -460,6 +520,7 @@ impl ByteStore {
executor: executor.clone(),
root: fsdb_files_root,
lease_time: options.lease_time,
dest_initializer: Arc::new(Mutex::default()),
},
executor,
filesystem_device,
Expand Down
6 changes: 1 addition & 5 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,11 +493,7 @@ impl ByteStore {
digest: Digest,
file: tokio::fs::File,
) -> Result<Option<tokio::fs::File>, String> {
let mut result = self.load(digest, file).await;
if let Ok(Some(ref mut file)) = result {
file.rewind().await.map_err(|e| e.to_string())?;
}
result
self.load(digest, file).await
}

///
Expand Down
3 changes: 2 additions & 1 deletion src/rust/engine/fs/store/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use grpc_util::tls;
use hashing::Digest;
use mock::StubCAS;
use testutil::data::{TestData, TestDirectory};
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use workunit_store::WorkunitStore;

use crate::remote::ByteStore;
Expand Down Expand Up @@ -50,6 +50,7 @@ async fn loads_huge_file_via_temp_file() {
.await
.unwrap()
.unwrap();
file.rewind().await.unwrap();

let mut buf = String::new();
file.read_to_string(&mut buf).await.unwrap();
Expand Down

0 comments on commit c398899

Please sign in to comment.