Skip to content

Commit

Permalink
Only download, don't load the bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric-Arellano committed Nov 9, 2022
1 parent 2bba68c commit e5f6a4d
Showing 1 changed file with 66 additions and 43 deletions.
109 changes: 66 additions & 43 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,59 @@ impl RemoteStore {
.await
.map(|&()| ())
}

/// Download the digest to the local byte store from this remote store. The function `f_remote`
/// can be used to validate/transform the bytes.
async fn download_digest_to_local<
FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static,
>(
&self,
local_store: local::ByteStore,
digest: Digest,
entry_type: EntryType,
f_remote: FRemote,
) -> Result<(), StoreError> {
let remote_store = self.store.clone();
self
.maybe_download(digest, async move {
// TODO: Now that we always copy from the remote store to the local store before executing
// the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no
// longer needs to accept a function.
let bytes = retry_call(
remote_store,
|remote_store| async move { remote_store.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;

f_remote(bytes.clone())?;
let stored_digest = local_store
.store_bytes(entry_type, None, bytes, true)
.await?;
if digest == stored_digest {
Ok(())
} else {
Err(StoreError::Unclassified(format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
)))
}
})
.await
}
}

///
Expand Down Expand Up @@ -674,9 +727,6 @@ impl Store {
f_local: FLocal,
f_remote: FRemote,
) -> Result<T, StoreError> {
let local = self.local.clone();
let maybe_remote = self.remote.clone();

if let Some(bytes_res) = self
.local
.load_bytes_with(entry_type, digest, f_local.clone())
Expand All @@ -685,47 +735,11 @@ impl Store {
return Ok(bytes_res?);
}

let remote = maybe_remote.ok_or_else(|| {
let remote = self.remote.clone().ok_or_else(|| {
StoreError::MissingDigest("Was not present in the local store".to_owned(), digest)
})?;
let remote_store = remote.store.clone();

remote
.maybe_download(digest, async move {
// TODO: Now that we always copy from the remote store to the local store before executing
// the caller's logic against the local store, `remote::ByteStore::load_bytes_with` no
// longer needs to accept a function.
let bytes = retry_call(
remote_store,
|remote_store| async move { remote_store.load_bytes_with(digest, Ok).await },
|err| match err {
ByteStoreError::Grpc(status) => status_is_retryable(status),
_ => false,
},
)
.await
.map_err(|err| match err {
ByteStoreError::Grpc(status) => status_to_str(status),
ByteStoreError::Other(msg) => msg,
})?
.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;

f_remote(bytes.clone())?;
let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?;
if digest == stored_digest {
Ok(())
} else {
Err(StoreError::Unclassified(format!(
"CAS gave wrong digest: expected {:?}, got {:?}",
digest, stored_digest
)))
}
})
.download_digest_to_local(self.local.clone(), digest, entry_type, f_remote)
.await?;

Ok(
Expand Down Expand Up @@ -979,10 +993,19 @@ impl Store {
.local
.get_missing_digests(EntryType::File, file_digests)
.await?;
if missing_file_digests.is_empty() {
return Ok(());
}

let remote = self.remote.clone().ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in the local store".to_owned(),
*missing_file_digests.iter().next().unwrap(),
)
})?;
for file_digest in missing_file_digests {
if let Err(e) = self
.load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(()))
if let Err(e) = remote
.download_digest_to_local(self.local.clone(), file_digest, EntryType::File, |_| Ok(()))
.await
{
log::debug!("Missing file digest from remote store: {:?}", file_digest);
Expand Down

0 comments on commit e5f6a4d

Please sign in to comment.