Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip loading of local cache data when possible #17495

Merged
merged 10 commits into from
Nov 10, 2022
189 changes: 109 additions & 80 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,59 @@ impl RemoteStore {
.await
.map(|&()| ())
}

/// Download the digest to the local byte store from this remote store. The function `f_remote`
/// can be used to validate/transform the bytes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// can be used to validate/transform the bytes.
/// can be used to validate the bytes.

Bytes is immutable, so the function can only validate.

async fn download_digest_to_local<
FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static,
Comment on lines +235 to +236
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was simply extracted from load_bytes_with

>(
&self,
local_store: local::ByteStore,
digest: Digest,
entry_type: EntryType,
f_remote: FRemote,
) -> Result<(), StoreError> {
let remote_store = self.store.clone();
self
.maybe_download(digest, async move {
// TODO: Now that we always copy from the remote store to the local store before executing
// the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no
// longer needs to accept a function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a pointer to #17065 here? I have some more ideas that I will likely put on that ticket today.

let bytes = retry_call(
remote_store,
|remote_store| async move { remote_store.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;

f_remote(bytes.clone())?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this clone seems like wasted performance in some cases, but will leave it for a followup

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bytes.clone() is very cheap, because it is reference counted. But yea, I think you're right that it is no longer necessary here.

let stored_digest = local_store
.store_bytes(entry_type, None, bytes, true)
.await?;
if digest == stored_digest {
Ok(())
} else {
Err(StoreError::Unclassified(format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
)))
}
})
.await
}
}

///
Expand Down Expand Up @@ -674,9 +727,6 @@ impl Store {
f_local: FLocal,
f_remote: FRemote,
) -> Result<T, StoreError> {
let local = self.local.clone();
let maybe_remote = self.remote.clone();

if let Some(bytes_res) = self
.local
.load_bytes_with(entry_type, digest, f_local.clone())
Expand All @@ -685,47 +735,11 @@ impl Store {
return Ok(bytes_res?);
}

let remote = maybe_remote.ok_or_else(|| {
let remote = self.remote.clone().ok_or_else(|| {
StoreError::MissingDigest("Was not present in the local store".to_owned(), digest)
})?;
let remote_store = remote.store.clone();

remote
.maybe_download(digest, async move {
// TODO: Now that we always copy from the remote store to the local store before executing
// the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no
// longer needs to accept a function.
let bytes = retry_call(
remote_store,
|remote_store| async move { remote_store.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;

f_remote(bytes.clone())?;
let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?;
if digest == stored_digest {
Ok(())
} else {
Err(StoreError::Unclassified(format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
)))
}
})
.download_digest_to_local(self.local.clone(), digest, entry_type, f_remote)
.await?;

Ok(
Expand Down Expand Up @@ -951,55 +965,70 @@ impl Store {
Ok(missing.is_empty())
}

///
/// Ensure that a directory is locally loadable, which will download it from the Remote store as
/// a sideeffect (if one is configured).
///
pub async fn ensure_local_has_recursive_directory(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was inlined into the new function ensure_local_has_files

/// Ensure that the files are locally loadable. This will download them from the remote store as
/// a side effect, if one is configured.
pub async fn ensure_local_has_files(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to: ensure_local_has_digests, ensure_local_has, ensure_downloaded, etc.

Somewhat implicit in here is that we also want to guarantee that the Directory protos have been persisted locally... so this method should probably also call record_digest_trie on each loaded tree to ensure that they exist locally as well. It's likely a bug that we weren't doing that before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea that at the end of callling download_digest_to_local on every missing file digest, we then also use a future::try_join_all that calls ensure_directory_digest_persisted on every directory digest inputted?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we then also use a future::try_join_all that calls ensure_directory_digest_persisted on every directory digest inputted?

Yea, either at the beginning or the end is fine. It's also possible to use try_join! to do it concurrently.

But you should be able to use record_digest_trie rather than ensure_directory_digest_persisted, because you will have already called load_digest_trie at the beginning of the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stuhood I want to make sure I got it right why it's safe to use record_digest_trie before we hit the remote store. That function is only about storing the directory entries, which don't need to come from the remote? We only download file entries from the remote?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That function is only about storing the directory entries, which don't need to come from the remote? We only download file entries from the remote?

Right: record_digest_trie only stores the Directory entries. Unlike files, Directory entries may be in one of three places: 1. memory, 2. local, 3. remote... record_digest_trie ensures that they are actually on disk locally, rather than potentially only in memory or remote.

&self,
dir_digest: DirectoryDigest,
mut file_digests: HashSet<Digest>,
directory_digests: HashSet<DirectoryDigest>,
) -> Result<(), StoreError> {
let mut file_digests = Vec::new();
self
.load_digest_trie(dir_digest)
.await?
.walk(SymlinkBehavior::Aware, &mut |_, entry| match entry {
directory::Entry::File(f) => file_digests.push(f.digest()),
directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (),
});
let file_digests_from_directories =
future::try_join_all(directory_digests.into_iter().map(|dir_digest| async move {
let mut maybe_file_digest = None;
self
.load_digest_trie(dir_digest)
.await?
.walk(SymlinkBehavior::Aware, &mut |_, entry| match entry {
directory::Entry::File(f) => {
maybe_file_digest = Some(f.digest());
}
directory::Entry::Symlink(_) | directory::Entry::Directory(_) => (),
});
Ok::<_, StoreError>(maybe_file_digest)
}))
.await?;
file_digests.extend(file_digests_from_directories.into_iter().flatten());

let missing_file_digests = self
.local
.get_missing_digests(EntryType::File, file_digests)
.await?;
if missing_file_digests.is_empty() {
return Ok(());
}
Comment on lines +990 to +996
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is new


let remote = &self.remote.clone().ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in the local store".to_owned(),
*missing_file_digests.iter().next().unwrap(),
)
})?;
let _ = future::try_join_all(
file_digests
missing_file_digests
.into_iter()
.map(|file_digest| self.ensure_local_has_file(file_digest))
.collect::<Vec<_>>(),
.map(|file_digest| async move {
if let Err(e) = remote
.download_digest_to_local(self.local.clone(), file_digest, EntryType::File, |_| Ok(()))
.await
Comment on lines -974 to +1010
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key change. We now only download, don't load.

{
log::debug!("Missing file digest from remote store: {:?}", file_digest);
in_workunit!(
"missing_file_counter",
Level::Trace,
|workunit| async move {
workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1);
},
)
.await;
return Err(e);
}
Ok(())
}),
)
.await?;
Ok(())
}

/// Ensure that a file is locally loadable, which will download it from the Remote store as
/// a side effect (if one is configured). Called only with the Digest of a File.
pub async fn ensure_local_has_file(&self, file_digest: Digest) -> Result<(), StoreError> {
if let Err(e) = self
.load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(()))
.await
{
log::debug!("Missing file digest from remote store: {:?}", file_digest);
in_workunit!(
"missing_file_counter",
Level::Trace,
|workunit| async move {
workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1);
},
)
.await;
Err(e)
} else {
Ok(())
}
}

/// Load a REv2 Tree from a remote CAS _without_ persisting the embedded Directory protos in
/// the local store. Tree is used by the REv2 protocol as an optimization for encoding the
/// the Directory protos that comprise the output directories from a remote execution
Expand Down
7 changes: 5 additions & 2 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs::File;
use std::io::Read;
use std::os::unix::fs::PermissionsExt;
Expand Down Expand Up @@ -219,7 +219,10 @@ async fn load_recursive_directory() {
.build();

new_store(dir.path(), &cas.address())
.ensure_local_has_recursive_directory(recursive_testdir_digest.clone())
.ensure_local_has_files(
HashSet::new(),
HashSet::from([recursive_testdir_digest.clone().collect()]),
)
.await
.expect("Downloading recursive directory should have succeeded.");

Expand Down
21 changes: 6 additions & 15 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#[macro_use]
extern crate derivative;

use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::convert::{TryFrom, TryInto};
use std::fmt::{self, Debug, Display};
use std::path::PathBuf;
Expand Down Expand Up @@ -833,20 +833,11 @@ pub(crate) async fn check_cache_content(
match cache_content_behavior {
CacheContentBehavior::Fetch => {
let response = response.clone();
let fetch_result = in_workunit!(
"eager_fetch_action_cache",
Level::Trace,
|_workunit| async move {
try_join_all(vec![
store.ensure_local_has_file(response.stdout_digest).boxed(),
store.ensure_local_has_file(response.stderr_digest).boxed(),
store
.ensure_local_has_recursive_directory(response.output_directory)
.boxed(),
])
.await
}
)
let fetch_result = in_workunit!("eager_fetch_action_cache", Level::Trace, |_workunit| store
.ensure_local_has_files(
HashSet::from([response.stdout_digest, response.stderr_digest]),
HashSet::from([response.output_directory])
))
.await;
match fetch_result {
Err(StoreError::MissingDigest { .. }) => Ok(false),
Expand Down