-
-
Notifications
You must be signed in to change notification settings - Fork 647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream large blobs to remote cache directly from local cache file #19711
Changes from all commits
9e4da77
d97b832
bf25157
bbefe4b
0554d9d
3d67be5
80acaaa
708d6a8
704d75f
ba5e409
cf5aa46
3935016
3a481f5
4a49edf
972255c
2ca968b
f0813a2
22fc621
0363e6b
b775f51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,6 @@ | |
// Licensed under the Apache License, Version 2.0 (see LICENSE). | ||
use std::collections::{BTreeMap, HashSet}; | ||
use std::fmt; | ||
use std::ops::Range; | ||
use std::sync::Arc; | ||
use std::time::{Duration, Instant}; | ||
|
||
|
@@ -14,33 +13,36 @@ use hashing::Digest; | |
use log::Level; | ||
use protos::gen::build::bazel::remote::execution::v2 as remexec; | ||
use remexec::ServerCapabilities; | ||
use tokio::fs::File; | ||
use tokio::io::{AsyncSeekExt, AsyncWrite}; | ||
use workunit_store::{in_workunit, ObservationMetric}; | ||
|
||
use crate::StoreError; | ||
|
||
mod reapi; | ||
#[cfg(test)] | ||
mod reapi_tests; | ||
|
||
pub type ByteSource = Arc<(dyn Fn(Range<usize>) -> Bytes + Send + Sync + 'static)>; | ||
|
||
#[async_trait] | ||
pub trait ByteStoreProvider: Sync + Send + 'static { | ||
async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String>; | ||
/// Store the bytes readable from `file` into the remote store | ||
async fn store_file(&self, digest: Digest, file: File) -> Result<(), String>; | ||
|
||
/// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the | ||
/// bytes are already in memory | ||
async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there's a fair argument that there's no need for My thinking is that passing Maybe that optimisation is irrelevant when this code does network IO anyway, and it'd be better to just have this trait be Thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAICT, you're right that it is one fewer copy currently to pass in It's possible that we could re-introduce that optimization at some point, which would make the |
||
|
||
/// Load the data stored (if any) in the remote store for `digest` into `destination`. Returns | ||
/// true when found, false when not. | ||
async fn load( | ||
&self, | ||
digest: Digest, | ||
destination: &mut dyn LoadDestination, | ||
) -> Result<bool, String>; | ||
|
||
/// Return any digests from `digests` that are not (currently) available in the remote store. | ||
async fn list_missing_digests( | ||
&self, | ||
digests: &mut (dyn Iterator<Item = Digest> + Send), | ||
) -> Result<HashSet<Digest>, String>; | ||
|
||
fn chunk_size_bytes(&self) -> usize; | ||
} | ||
|
||
// TODO: Consider providing `impl Default`, similar to `super::LocalOptions`. | ||
|
@@ -110,74 +112,40 @@ impl ByteStore { | |
Ok(ByteStore::new(instance_name, provider)) | ||
} | ||
|
||
pub(crate) fn chunk_size_bytes(&self) -> usize { | ||
self.provider.chunk_size_bytes() | ||
} | ||
|
||
pub async fn store_buffered<WriteToBuffer, WriteResult>( | ||
&self, | ||
digest: Digest, | ||
write_to_buffer: WriteToBuffer, | ||
) -> Result<(), StoreError> | ||
where | ||
WriteToBuffer: FnOnce(std::fs::File) -> WriteResult, | ||
WriteResult: Future<Output = Result<(), StoreError>>, | ||
{ | ||
let write_buffer = tempfile::tempfile().map_err(|e| { | ||
format!("Failed to create a temporary blob upload buffer for {digest:?}: {e}") | ||
})?; | ||
let read_buffer = write_buffer.try_clone().map_err(|e| { | ||
format!("Failed to create a read handle for the temporary upload buffer for {digest:?}: {e}") | ||
})?; | ||
write_to_buffer(write_buffer).await?; | ||
|
||
// Unsafety: Mmap presents an immutable slice of bytes, but the underlying file that is mapped | ||
// could be mutated by another process. We guard against this by creating an anonymous | ||
// temporary file and ensuring it is written to and closed via the only other handle to it in | ||
// the code just above. | ||
let mmap = Arc::new(unsafe { | ||
let mapping = memmap::Mmap::map(&read_buffer).map_err(|e| { | ||
format!("Failed to memory map the temporary file buffer for {digest:?}: {e}") | ||
})?; | ||
if let Err(err) = madvise::madvise( | ||
mapping.as_ptr(), | ||
mapping.len(), | ||
madvise::AccessPattern::Sequential, | ||
) { | ||
log::warn!( | ||
"Failed to madvise(MADV_SEQUENTIAL) for the memory map of the temporary file buffer for \ | ||
{digest:?}. Continuing with possible reduced performance: {err}", | ||
digest = digest, | ||
err = err | ||
) | ||
} | ||
Ok(mapping) as Result<memmap::Mmap, String> | ||
}?); | ||
|
||
/// Store the bytes readable from `file` into the remote store | ||
pub async fn store_file(&self, digest: Digest, file: File) -> Result<(), String> { | ||
self | ||
.store_bytes_source( | ||
digest, | ||
Arc::new(move |range| Bytes::copy_from_slice(&mmap[range])), | ||
) | ||
.await?; | ||
|
||
Ok(()) | ||
.store_tracking("store", digest, || self.provider.store_file(digest, file)) | ||
.await | ||
} | ||
|
||
/// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the | ||
/// bytes are already in memory | ||
pub async fn store_bytes(&self, bytes: Bytes) -> Result<(), String> { | ||
let digest = Digest::of_bytes(&bytes); | ||
self | ||
.store_bytes_source(digest, Arc::new(move |range| bytes.slice(range))) | ||
.store_tracking("store_bytes", digest, || { | ||
self.provider.store_bytes(digest, bytes) | ||
}) | ||
.await | ||
} | ||
|
||
async fn store_bytes_source(&self, digest: Digest, bytes: ByteSource) -> Result<(), String> { | ||
async fn store_tracking<DoStore, Fut>( | ||
&self, | ||
workunit: &'static str, | ||
digest: Digest, | ||
do_store: DoStore, | ||
) -> Result<(), String> | ||
where | ||
DoStore: FnOnce() -> Fut + Send, | ||
Fut: Future<Output = Result<(), String>> + Send, | ||
{ | ||
in_workunit!( | ||
"store_bytes", | ||
workunit, | ||
Level::Trace, | ||
desc = Some(format!("Storing {digest:?}")), | ||
|workunit| async move { | ||
let result = self.provider.store_bytes(digest, bytes).await; | ||
let result = do_store().await; | ||
|
||
if result.is_ok() { | ||
workunit.record_observation( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the comment in
store_lmdb_blob_remote
now notes, this code makes the assumption that anything in LMDB is small enough to load into memory. AIUI,LARGE_FILE_SIZE_LIMIT
is 512KB, i.e. these blobs should be that large at most.Do you think that's reasonable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I do think that that is reasonable.