Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip loading of local cache data when possible #17495

Merged
merged 10 commits into from
Nov 10, 2022
187 changes: 107 additions & 80 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,59 @@ impl RemoteStore {
.await
.map(|&()| ())
}

/// Download the digest to the local byte store from this remote store. The function `f_remote`
/// can be used to validate the bytes.
async fn download_digest_to_local<
FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static,
Comment on lines +235 to +236
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was simply extracted from load_bytes_with

>(
&self,
local_store: local::ByteStore,
digest: Digest,
entry_type: EntryType,
f_remote: FRemote,
) -> Result<(), StoreError> {
let remote_store = self.store.clone();
self
.maybe_download(digest, async move {
// TODO(#17065): Now that we always copy from the remote store to the local store before
// executing the caller's logic against the local store,
// `remote::ByteStore::load_bytes_with` no longer needs to accept a function.
let bytes = retry_call(
remote_store,
|remote_store| async move { remote_store.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;

f_remote(bytes.clone())?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this clone seems like wasted performance in some cases, but will leave it for a followup

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bytes.clone() is very cheap, because it is reference counted. But yea, I think you're right that it is no longer necessary here.

let stored_digest = local_store
.store_bytes(entry_type, None, bytes, true)
.await?;
if digest == stored_digest {
Ok(())
} else {
Err(StoreError::Unclassified(format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
)))
}
})
.await
}
}

///
Expand Down Expand Up @@ -674,9 +727,6 @@ impl Store {
f_local: FLocal,
f_remote: FRemote,
) -> Result<T, StoreError> {
let local = self.local.clone();
let maybe_remote = self.remote.clone();

if let Some(bytes_res) = self
.local
.load_bytes_with(entry_type, digest, f_local.clone())
Expand All @@ -685,47 +735,11 @@ impl Store {
return Ok(bytes_res?);
}

let remote = maybe_remote.ok_or_else(|| {
let remote = self.remote.clone().ok_or_else(|| {
StoreError::MissingDigest("Was not present in the local store".to_owned(), digest)
})?;
let remote_store = remote.store.clone();

remote
.maybe_download(digest, async move {
// TODO: Now that we always copy from the remote store to the local store before executing
// the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no
// longer needs to accept a function.
let bytes = retry_call(
remote_store,
|remote_store| async move { remote_store.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;

f_remote(bytes.clone())?;
let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?;
if digest == stored_digest {
Ok(())
} else {
Err(StoreError::Unclassified(format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
)))
}
})
.download_digest_to_local(self.local.clone(), digest, entry_type, f_remote)
.await?;

Ok(
Expand Down Expand Up @@ -951,55 +965,68 @@ impl Store {
Ok(missing.is_empty())
}

///
/// Ensure that a directory is locally loadable, which will download it from the Remote store as
/// a sideeffect (if one is configured).
///
pub async fn ensure_local_has_recursive_directory(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was inlined into the new function ensure_local_has_files

/// Ensure that the files are locally loadable. This will download them from the remote store as
/// a side effect, if one is configured.
pub async fn ensure_downloaded(
&self,
dir_digest: DirectoryDigest,
mut file_digests: HashSet<Digest>,
directory_digests: HashSet<DirectoryDigest>,
) -> Result<(), StoreError> {
let mut file_digests = Vec::new();
self
.load_digest_trie(dir_digest)
.await?
.walk(SymlinkBehavior::Aware, &mut |_, entry| match entry {
directory::Entry::File(f) => file_digests.push(f.digest()),
directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (),
});
let file_digests_from_directories =
future::try_join_all(directory_digests.into_iter().map(|dir_digest| async move {
let mut file_digests_for_dir = Vec::new();
let trie = self.load_digest_trie(dir_digest).await?;
trie.walk(SymlinkBehavior::Aware, &mut |_, entry| match entry {
directory::Entry::File(f) => file_digests_for_dir.push(f.digest()),
directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (),
});
// Also ensure that the directory trie is persisted to disk, not only its file entries.
self.record_digest_trie(trie, true).await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stuhood this sets initial_lease to true. I wasn't sure what that was about

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... me neither. I'm pretty sure that we should remove that parameter, but haven't spent the time to confirm.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment here to explain why we do this (or in the method signature) would be great.

Ok::<_, StoreError>(file_digests_for_dir)
}))
.await?;
file_digests.extend(file_digests_from_directories.into_iter().flatten());

let missing_file_digests = self
.local
.get_missing_digests(EntryType::File, file_digests)
.await?;
if missing_file_digests.is_empty() {
return Ok(());
}
Comment on lines +990 to +996
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is new


let remote = &self.remote.clone().ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in the local store".to_owned(),
*missing_file_digests.iter().next().unwrap(),
)
})?;
let _ = future::try_join_all(
file_digests
missing_file_digests
.into_iter()
.map(|file_digest| self.ensure_local_has_file(file_digest))
.collect::<Vec<_>>(),
.map(|file_digest| async move {
if let Err(e) = remote
.download_digest_to_local(self.local.clone(), file_digest, EntryType::File, |_| Ok(()))
.await
Comment on lines -974 to +1010
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key change. We now only download, don't load.

{
log::debug!("Missing file digest from remote store: {:?}", file_digest);
in_workunit!(
"missing_file_counter",
Level::Trace,
|workunit| async move {
workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1);
},
)
.await;
return Err(e);
}
Ok(())
}),
)
.await?;
Ok(())
}

/// Ensure that a file is locally loadable, which will download it from the Remote store as
/// a side effect (if one is configured). Called only with the Digest of a File.
pub async fn ensure_local_has_file(&self, file_digest: Digest) -> Result<(), StoreError> {
if let Err(e) = self
.load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(()))
.await
{
log::debug!("Missing file digest from remote store: {:?}", file_digest);
in_workunit!(
"missing_file_counter",
Level::Trace,
|workunit| async move {
workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1);
},
)
.await;
Err(e)
} else {
Ok(())
}
}

/// Load a REv2 Tree from a remote CAS _without_ persisting the embedded Directory protos in
/// the local store. Tree is used by the REv2 protocol as an optimization for encoding the
/// the Directory protos that comprise the output directories from a remote execution
Expand Down
7 changes: 5 additions & 2 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs::File;
use std::io::Read;
use std::os::unix::fs::PermissionsExt;
Expand Down Expand Up @@ -219,7 +219,10 @@ async fn load_recursive_directory() {
.build();

new_store(dir.path(), &cas.address())
.ensure_local_has_recursive_directory(recursive_testdir_digest.clone())
.ensure_downloaded(
HashSet::new(),
HashSet::from([recursive_testdir_digest.clone()]),
)
.await
.expect("Downloading recursive directory should have succeeded.");

Expand Down
21 changes: 6 additions & 15 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#[macro_use]
extern crate derivative;

use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::convert::{TryFrom, TryInto};
use std::fmt::{self, Debug, Display};
use std::path::PathBuf;
Expand Down Expand Up @@ -833,20 +833,11 @@ pub(crate) async fn check_cache_content(
match cache_content_behavior {
CacheContentBehavior::Fetch => {
let response = response.clone();
let fetch_result = in_workunit!(
"eager_fetch_action_cache",
Level::Trace,
|_workunit| async move {
try_join_all(vec![
store.ensure_local_has_file(response.stdout_digest).boxed(),
store.ensure_local_has_file(response.stderr_digest).boxed(),
store
.ensure_local_has_recursive_directory(response.output_directory)
.boxed(),
])
.await
}
)
let fetch_result = in_workunit!("eager_fetch_action_cache", Level::Trace, |_workunit| store
.ensure_downloaded(
HashSet::from([response.stdout_digest, response.stderr_digest]),
HashSet::from([response.output_directory])
))
.await;
match fetch_result {
Err(StoreError::MissingDigest { .. }) => Ok(false),
Expand Down