Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream large remote cache downloads directly to disk #18054

Merged
merged 9 commits into from
Feb 12, 2023
Merged
2 changes: 2 additions & 0 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
top_match
.value_of_t::<usize>("batch-api-size-limit")
.expect("Bad batch-api-size-limit flag"),
// TODO: add parameter
4 * 1024 * 1024,
),
true,
)
Expand Down
75 changes: 47 additions & 28 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use std::fmt::{self, Debug, Display};
use std::fs::hard_link;
use std::fs::OpenOptions;
use std::future::Future;
use std::io::{self, Read, Write};
use std::io::{self, Read, Seek, Write};
use std::os::unix::fs::{symlink, OpenOptionsExt, PermissionsExt};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -238,29 +238,51 @@ impl RemoteStore {

/// Download the digest to the local byte store from this remote store. The function `f_remote`
/// can be used to validate the bytes.
async fn download_digest_to_local<
FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static,
>(
async fn download_digest_to_local(
&self,
local_store: local::ByteStore,
digest: Digest,
entry_type: EntryType,
f_remote: FRemote,
f_remote: Option<&(dyn Fn(Bytes) -> Result<(), String> + Send + Sync + 'static)>,
) -> Result<(), StoreError> {
let remote_store = self.store.clone();
self
.maybe_download(digest, async move {
let bytes = remote_store.load_bytes(digest).await?.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;
// if there's a function to call, always just buffer fully into memory
huonw marked this conversation as resolved.
Show resolved Hide resolved
let data = remote_store
.load(digest, f_remote.is_some())
.await?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;

f_remote(bytes.clone())?;
let stored_digest = local_store
.store_bytes(entry_type, None, bytes, true)
.await?;
if let Some(f_remote) = f_remote {
// if there's a function, just slurp the contents back into memory
match data {
Either::Left(ref bytes) => f_remote(bytes.clone())?,
Either::Right(_) => panic!("unexpectedly got file output when using f_remote"),
huonw marked this conversation as resolved.
Show resolved Hide resolved
};
}
let stored_digest = match data {
Either::Left(bytes) => {
local_store
.store_bytes(entry_type, None, bytes, true)
.await?
}
Either::Right(file) => {
let file = file.into_std().await;
local_store
.store(entry_type, true, true, move || {
let mut file = file.try_clone()?;
file.rewind()?;
Ok(file)
})
.await?
}
};
if digest == stored_digest {
Ok(())
} else {
Expand Down Expand Up @@ -373,6 +395,7 @@ impl Store {
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
batch_api_size_limit: usize,
load_in_memory_size_limit: usize,
) -> Result<Store, String> {
Ok(Store {
local: self.local,
Expand All @@ -387,6 +410,7 @@ impl Store {
rpc_concurrency_limit,
capabilities_cell_opt,
batch_api_size_limit,
load_in_memory_size_limit,
)?)),
immutable_inputs_base: self.immutable_inputs_base,
})
Expand Down Expand Up @@ -511,12 +535,7 @@ impl Store {
) -> Result<T, StoreError> {
// No transformation or verification is needed for files.
self
.load_bytes_with(
EntryType::File,
digest,
move |v: &[u8]| Ok(f(v)),
|_: Bytes| Ok(()),
)
.load_bytes_with(EntryType::File, digest, move |v: &[u8]| Ok(f(v)), None)
.await
}

Expand Down Expand Up @@ -691,7 +710,7 @@ impl Store {
},
// Eagerly verify that CAS-returned Directories are canonical, so that we don't write them
// into our local store.
move |bytes: Bytes| {
Some(&move |bytes| {
let directory = remexec::Directory::decode(bytes).map_err(|e| {
format!(
"CAS returned Directory proto for {:?} which was not valid: {:?}",
Expand All @@ -700,7 +719,7 @@ impl Store {
})?;
protos::verify_directory_canonical(digest, &directory)?;
Ok(())
},
}),
)
.await
}
Expand Down Expand Up @@ -728,13 +747,12 @@ impl Store {
async fn load_bytes_with<
T: Send + 'static,
FLocal: Fn(&[u8]) -> Result<T, String> + Clone + Send + Sync + 'static,
FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static,
>(
&self,
entry_type: EntryType,
digest: Digest,
f_local: FLocal,
f_remote: FRemote,
f_remote: Option<&(dyn Fn(Bytes) -> Result<(), String> + Send + Sync + 'static)>,
) -> Result<T, StoreError> {
if let Some(bytes_res) = self
.local
Expand Down Expand Up @@ -1015,7 +1033,7 @@ impl Store {
.into_iter()
.map(|file_digest| async move {
if let Err(e) = remote
.download_digest_to_local(self.local.clone(), file_digest, EntryType::File, |_| Ok(()))
.download_digest_to_local(self.local.clone(), file_digest, EntryType::File, None)
.await
{
log::debug!("Missing file digest from remote store: {:?}", file_digest);
Expand Down Expand Up @@ -1056,12 +1074,13 @@ impl Store {
return Err("Cannot load Trees from a remote without a remote".to_owned());
};

match remote.store.load_bytes(tree_digest).await? {
Some(b) => {
match remote.store.load(tree_digest, true).await? {
Some(Either::Left(b)) => {
let tree = Tree::decode(b).map_err(|e| format!("protobuf decode error: {:?}", e))?;
let trie = DigestTrie::try_from(tree)?;
Ok(Some(trie.into()))
}
Some(Either::Right(_)) => panic!("unexpectedly got file output"),
None => Ok(None),
}
}
Expand Down
Loading