Skip to content

Commit

Permalink
Skip loading of local cache data when possible (Cherry-pick of pantsb…
Browse files Browse the repository at this point in the history
…uild#17495)

Closes pantsbuild#17448.
# Conflicts:
#	src/rust/engine/fs/store/src/lib.rs
  • Loading branch information
Eric-Arellano committed Nov 10, 2022
1 parent f8d576b commit 5c4e307
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 97 deletions.
187 changes: 107 additions & 80 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,59 @@ impl RemoteStore {
.await
.map(|&()| ())
}

/// Download the digest to the local byte store from this remote store. The function `f_remote`
/// can be used to validate the bytes.
async fn download_digest_to_local<
FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static,
>(
&self,
local_store: local::ByteStore,
digest: Digest,
entry_type: EntryType,
f_remote: FRemote,
) -> Result<(), StoreError> {
let remote_store = self.store.clone();
self
.maybe_download(digest, async move {
// TODO(#17065): Now that we always copy from the remote store to the local store before
// executing the caller's logic against the local store,
// `remote::ByteStore::load_bytes_with` no longer needs to accept a function.
let bytes = retry_call(
remote_store,
|remote_store| async move { remote_store.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;

f_remote(bytes.clone())?;
let stored_digest = local_store
.store_bytes(entry_type, None, bytes, true)
.await?;
if digest == stored_digest {
Ok(())
} else {
Err(StoreError::Unclassified(format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
)))
}
})
.await
}
}

///
Expand Down Expand Up @@ -666,9 +719,6 @@ impl Store {
f_local: FLocal,
f_remote: FRemote,
) -> Result<T, StoreError> {
let local = self.local.clone();
let maybe_remote = self.remote.clone();

if let Some(bytes_res) = self
.local
.load_bytes_with(entry_type, digest, f_local.clone())
Expand All @@ -677,47 +727,11 @@ impl Store {
return Ok(bytes_res?);
}

let remote = maybe_remote.ok_or_else(|| {
let remote = self.remote.clone().ok_or_else(|| {
StoreError::MissingDigest("Was not present in the local store".to_owned(), digest)
})?;
let remote_store = remote.store.clone();

remote
.maybe_download(digest, async move {
// TODO: Now that we always copy from the remote store to the local store before executing
// the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no
// longer needs to accept a function.
let bytes = retry_call(
remote_store,
|remote_store| async move { remote_store.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;

f_remote(bytes.clone())?;
let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?;
if digest == stored_digest {
Ok(())
} else {
Err(StoreError::Unclassified(format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
)))
}
})
.download_digest_to_local(self.local.clone(), digest, entry_type, f_remote)
.await?;

Ok(
Expand Down Expand Up @@ -943,55 +957,68 @@ impl Store {
Ok(missing.is_empty())
}

///
/// Ensure that a directory is locally loadable, which will download it from the Remote store as
/// a sideeffect (if one is configured).
///
pub async fn ensure_local_has_recursive_directory(
/// Ensure that the files are locally loadable. This will download them from the remote store as
/// a side effect, if one is configured.
pub async fn ensure_downloaded(
&self,
dir_digest: DirectoryDigest,
mut file_digests: HashSet<Digest>,
directory_digests: HashSet<DirectoryDigest>,
) -> Result<(), StoreError> {
let mut file_digests = Vec::new();
self
.load_digest_trie(dir_digest)
.await?
.walk(&mut |_, entry| match entry {
directory::Entry::File(f) => file_digests.push(f.digest()),
directory::Entry::Directory(_) => (),
});
let file_digests_from_directories =
future::try_join_all(directory_digests.into_iter().map(|dir_digest| async move {
let mut file_digests_for_dir = Vec::new();
let trie = self.load_digest_trie(dir_digest).await?;
trie.walk(&mut |_, entry| match entry {
directory::Entry::File(f) => file_digests_for_dir.push(f.digest()),
directory::Entry::Directory(_) => (),
});
// Also ensure that the directory trie is persisted to disk, not only its file entries.
self.record_digest_trie(trie, true).await?;
Ok::<_, StoreError>(file_digests_for_dir)
}))
.await?;
file_digests.extend(file_digests_from_directories.into_iter().flatten());

let missing_file_digests = self
.local
.get_missing_digests(EntryType::File, file_digests)
.await?;
if missing_file_digests.is_empty() {
return Ok(());
}

let remote = &self.remote.clone().ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in the local store".to_owned(),
*missing_file_digests.iter().next().unwrap(),
)
})?;
let _ = future::try_join_all(
file_digests
missing_file_digests
.into_iter()
.map(|file_digest| self.ensure_local_has_file(file_digest))
.collect::<Vec<_>>(),
.map(|file_digest| async move {
if let Err(e) = remote
.download_digest_to_local(self.local.clone(), file_digest, EntryType::File, |_| Ok(()))
.await
{
log::debug!("Missing file digest from remote store: {:?}", file_digest);
in_workunit!(
"missing_file_counter",
Level::Trace,
|workunit| async move {
workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1);
},
)
.await;
return Err(e);
}
Ok(())
}),
)
.await?;
Ok(())
}

/// Ensure that a file is locally loadable, which will download it from the Remote store as
/// a side effect (if one is configured). Called only with the Digest of a File.
pub async fn ensure_local_has_file(&self, file_digest: Digest) -> Result<(), StoreError> {
if let Err(e) = self
.load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(()))
.await
{
log::debug!("Missing file digest from remote store: {:?}", file_digest);
in_workunit!(
"missing_file_counter",
Level::Trace,
|workunit| async move {
workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1);
},
)
.await;
Err(e)
} else {
Ok(())
}
}

/// Load a REv2 Tree from a remote CAS _without_ persisting the embedded Directory protos in
/// the local store. Tree is used by the REv2 protocol as an optimization for encoding the
/// the Directory protos that comprise the output directories from a remote execution
Expand Down
7 changes: 5 additions & 2 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs::File;
use std::io::Read;
use std::os::unix::fs::PermissionsExt;
Expand Down Expand Up @@ -219,7 +219,10 @@ async fn load_recursive_directory() {
.build();

new_store(dir.path(), &cas.address())
.ensure_local_has_recursive_directory(recursive_testdir_digest.clone())
.ensure_downloaded(
HashSet::new(),
HashSet::from([recursive_testdir_digest.clone()]),
)
.await
.expect("Downloading recursive directory should have succeeded.");

Expand Down
21 changes: 6 additions & 15 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#[macro_use]
extern crate derivative;

use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::convert::{TryFrom, TryInto};
use std::fmt::{self, Debug, Display};
use std::path::PathBuf;
Expand Down Expand Up @@ -833,20 +833,11 @@ pub(crate) async fn check_cache_content(
match cache_content_behavior {
CacheContentBehavior::Fetch => {
let response = response.clone();
let fetch_result = in_workunit!(
"eager_fetch_action_cache",
Level::Trace,
|_workunit| async move {
try_join_all(vec![
store.ensure_local_has_file(response.stdout_digest).boxed(),
store.ensure_local_has_file(response.stderr_digest).boxed(),
store
.ensure_local_has_recursive_directory(response.output_directory)
.boxed(),
])
.await
}
)
let fetch_result = in_workunit!("eager_fetch_action_cache", Level::Trace, |_workunit| store
.ensure_downloaded(
HashSet::from([response.stdout_digest, response.stderr_digest]),
HashSet::from([response.output_directory])
))
.await;
match fetch_result {
Err(StoreError::MissingDigest { .. }) => Ok(false),
Expand Down

0 comments on commit 5c4e307

Please sign in to comment.