Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream large blobs to remote cache directly from local cache file #19711

Merged
merged 20 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ lmdb-rkv = { git = "https://github.com/pantsbuild/lmdb-rs.git", rev = "6ae7a552a
log = "0.4.17"
madvise = "0.1"
maplit = "1.0.1"
memmap = "0.7"
nails = "0.13"
nix = "0.25"
notify = { git = "https://github.com/pantsbuild/notify", rev = "276af0f3c5f300bfd534941386ba2f3b3a022df7" }
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/fs/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ itertools = { workspace = true }
lmdb-rkv = { workspace = true }
log = { workspace = true }
madvise = { workspace = true }
memmap = { workspace = true }
parking_lot = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
Expand All @@ -36,6 +35,7 @@ task_executor = { path = "../../task_executor" }
tempfile = { workspace = true }
tokio-rustls = { workspace = true }
tokio = { workspace = true, features = ["fs"] }
tokio-util = { workspace = true, features = ["io"] }
tonic = { workspace = true, features = ["transport", "codegen", "tls", "tls-roots", "prost"] }
tower-service = { workspace = true }
tryfuture = { path = "../../tryfuture" }
Expand Down
54 changes: 21 additions & 33 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,15 +806,16 @@ impl Store {
remote_store
.clone()
.maybe_upload(digest, async move {
// TODO(John Sirois): Consider allowing configuration of when to buffer large blobs
// to disk to be independent of the remote store wire chunk size.
if digest.size_bytes > remote_store.store.chunk_size_bytes() {
Self::store_large_blob_remote(local, remote_store.store, entry_type, digest)
.await
} else {
Self::store_small_blob_remote(local, remote_store.store, entry_type, digest)
.await
}
match local.load_from_fs(digest).await? {
Some(path) => {
Self::store_fsdb_blob_remote(remote_store.store, digest, path).await?
}
None => {
Self::store_lmdb_blob_remote(local, remote_store.store, entry_type, digest)
Comment on lines +813 to +814
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comment in store_lmdb_blob_remote now notes, this code makes the assumption that anything in LMDB is small enough to load into memory. AIUI, LARGE_FILE_SIZE_LIMIT is 512KB, i.e. these blobs should be that large at most.

Do you think that's reasonable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I do think that that is reasonable.

.await?
}
};
Ok(())
})
.await
}
Expand All @@ -837,15 +838,17 @@ impl Store {
.boxed()
}

async fn store_small_blob_remote(
async fn store_lmdb_blob_remote(
local: local::ByteStore,
remote: remote::ByteStore,
entry_type: EntryType,
digest: Digest,
) -> Result<(), StoreError> {
// We need to copy the bytes into memory so that they may be used safely in an async
// future. While this unfortunately increases memory consumption, we prioritize
// being able to run `remote.store_bytes()` as async.
// future. While this unfortunately increases memory consumption, we prioritize being
// able to run `remote.store_bytes()` as async. In addition, this is only used for
// blobs in the LMDB store, most of which are small: large blobs end up in the FSDB
// store.
//
// See https://github.com/pantsbuild/pants/pull/9793 for an earlier implementation
// that used `Executor.block_on`, which avoided the clone but was blocking.
Expand All @@ -863,31 +866,16 @@ impl Store {
}
}

async fn store_large_blob_remote(
local: local::ByteStore,
async fn store_fsdb_blob_remote(
remote: remote::ByteStore,
entry_type: EntryType,
digest: Digest,
path: PathBuf,
) -> Result<(), StoreError> {
remote
.store_buffered(digest, |mut buffer| async {
let result = local
.load_bytes_with(entry_type, digest, move |bytes| {
buffer.write_all(bytes).map_err(|e| {
format!("Failed to write {entry_type:?} {digest:?} to temporary buffer: {e}")
})
})
.await?;
match result {
None => Err(StoreError::MissingDigest(
format!("Failed to upload {entry_type:?}: Not found in local store",),
digest,
)),
Some(Err(err)) => Err(err.into()),
Some(Ok(())) => Ok(()),
}
})
let file = tokio::fs::File::open(&path)
.await
.map_err(|e| format!("failed to read {digest:?} from {path:?}: {e}"))?;
remote.store(digest, Box::new(file)).await?;
Ok(())
}

///
Expand Down
95 changes: 32 additions & 63 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::collections::{BTreeMap, HashSet};
use std::fmt;
use std::ops::Range;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand All @@ -14,33 +13,37 @@ use hashing::Digest;
use log::Level;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remexec::ServerCapabilities;
use tokio::io::{AsyncSeekExt, AsyncWrite};
use tokio::io::{AsyncRead, AsyncSeekExt, AsyncWrite};
use workunit_store::{in_workunit, ObservationMetric};

use crate::StoreError;

mod reapi;
#[cfg(test)]
mod reapi_tests;

pub type ByteSource = Arc<(dyn Fn(Range<usize>) -> Bytes + Send + Sync + 'static)>;
pub type StoreSource = Box<dyn AsyncRead + Send + Sync + Unpin + 'static>;

#[async_trait]
pub trait ByteStoreProvider: Sync + Send + 'static {
async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String>;
/// Store the bytes readable from `source` into the remote store
async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String>;

/// Store the bytes in `bytes` into the remote store, as an optimisation of `store` when the bytes
/// are already in memory
async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a fair argument that there's no need for store_bytes and everything can go through store, using Cursor::new(bytes) to create an appropriate AsyncRead from Bytes.

My thinking is that passing Bytes in directly saves some memory copies for the batch case, where that object can be splatted into store_bytes_batch and its batch upload request directly, without copying or slicing or anything (whereas using source would require reading it into a separate Bytes).

Maybe that optimisation is irrelevant when this code does network IO anyway, and it'd be better to just have this trait be store, load and list_missing_digests.

Thoughts?

Copy link
Member

@stuhood stuhood Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, you're right that it is one fewer copy currently to pass in Bytes. As mentioned in the comment on store_lmdb_blob_remote though, we used to write in a streaming fashion while blocking a spawn_blocking task (with block_on): that meant that we were copying directly from a MMAP into a protobuf to use with gRPC.

It's possible that we could re-introduce that optimization at some point, which would make the streaming batch store_bytes API superior again. But at the same time, these are fairly small blobs, so the benefits of streaming are definitely reduced.


/// Load the data stored (if any) in the remote store for `digest` into `destination`. Returns
/// true when found, false when not.
async fn load(
&self,
digest: Digest,
destination: &mut dyn LoadDestination,
) -> Result<bool, String>;

/// Return any digests from `digests` that are not (currently) available in the remote store.
async fn list_missing_digests(
&self,
digests: &mut (dyn Iterator<Item = Digest> + Send),
) -> Result<HashSet<Digest>, String>;

fn chunk_size_bytes(&self) -> usize;
}

// TODO: Consider providing `impl Default`, similar to `super::LocalOptions`.
Expand Down Expand Up @@ -110,74 +113,40 @@ impl ByteStore {
Ok(ByteStore::new(instance_name, provider))
}

pub(crate) fn chunk_size_bytes(&self) -> usize {
self.provider.chunk_size_bytes()
}

pub async fn store_buffered<WriteToBuffer, WriteResult>(
&self,
digest: Digest,
write_to_buffer: WriteToBuffer,
) -> Result<(), StoreError>
where
WriteToBuffer: FnOnce(std::fs::File) -> WriteResult,
WriteResult: Future<Output = Result<(), StoreError>>,
{
let write_buffer = tempfile::tempfile().map_err(|e| {
format!("Failed to create a temporary blob upload buffer for {digest:?}: {e}")
})?;
let read_buffer = write_buffer.try_clone().map_err(|e| {
format!("Failed to create a read handle for the temporary upload buffer for {digest:?}: {e}")
})?;
write_to_buffer(write_buffer).await?;

// Unsafety: Mmap presents an immutable slice of bytes, but the underlying file that is mapped
// could be mutated by another process. We guard against this by creating an anonymous
// temporary file and ensuring it is written to and closed via the only other handle to it in
// the code just above.
let mmap = Arc::new(unsafe {
let mapping = memmap::Mmap::map(&read_buffer).map_err(|e| {
format!("Failed to memory map the temporary file buffer for {digest:?}: {e}")
})?;
if let Err(err) = madvise::madvise(
mapping.as_ptr(),
mapping.len(),
madvise::AccessPattern::Sequential,
) {
log::warn!(
"Failed to madvise(MADV_SEQUENTIAL) for the memory map of the temporary file buffer for \
{digest:?}. Continuing with possible reduced performance: {err}",
digest = digest,
err = err
)
}
Ok(mapping) as Result<memmap::Mmap, String>
}?);

/// Store the bytes readable from `source` into the remote store
pub async fn store(&self, digest: Digest, source: StoreSource) -> Result<(), String> {
self
.store_bytes_source(
digest,
Arc::new(move |range| Bytes::copy_from_slice(&mmap[range])),
)
.await?;

Ok(())
.store_tracking("store", digest, || self.provider.store(digest, source))
.await
}

/// Store the bytes in `bytes` into the remote store, as an optimisation of `store` when the bytes
/// are already in memory
pub async fn store_bytes(&self, bytes: Bytes) -> Result<(), String> {
let digest = Digest::of_bytes(&bytes);
self
.store_bytes_source(digest, Arc::new(move |range| bytes.slice(range)))
.store_tracking("store_bytes", digest, || {
self.provider.store_bytes(digest, bytes)
})
.await
}

async fn store_bytes_source(&self, digest: Digest, bytes: ByteSource) -> Result<(), String> {
async fn store_tracking<DoStore, Fut>(
&self,
workunit: &'static str,
digest: Digest,
do_store: DoStore,
) -> Result<(), String>
where
DoStore: FnOnce() -> Fut + Send,
Fut: Future<Output = Result<(), String>> + Send,
{
in_workunit!(
"store_bytes",
workunit,
Level::Trace,
desc = Some(format!("Storing {digest:?}")),
|workunit| async move {
let result = self.provider.store_bytes(digest, bytes).await;
let result = do_store().await;

if result.is_ok() {
workunit.record_observation(
Expand Down
Loading