-
-
Notifications
You must be signed in to change notification settings - Fork 646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stream large remote cache downloads directly to disk #18054
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, thanks a lot!
this is a bit of an awkward design, but I wasn't sure of a better one in the presence of retries:
Given the desire to do #18048, I think that this design is great: the large file store will move/hardlink from this file into the local store, which assuming that the temporary directory is chosen well, will mean that this doesn't involve another pass over the data.
I've plumbed through a configuration parameter for the threshold (although not allowed setting it outside Rust), but maybe it should reuse
IMMUTABLE_FILE_SIZE_LIMIT
, on the assumption that'll tie into
#18048 better?
Yea, I think that tying these constants together and renaming them would make sense to help prepare for #18048. cc @thejcannon
Thanks!
👍 After reflecting for a few days, I realised that a potentially better API is to move the file vs. bytes decision to the parent by having different functions, especially given the pub async fn load_bytes(&self, digest: Digest) -> Result<Option<Bytes>, String> { ... }
pub async fn load_file(&self, digest: Digest) -> Result<Option<tokio::fs::File>, String> { ... } Then the if digest.size_bytes <= IMMUTABLE_FILE_SIZE_LIMIT || f_remote.is_some() {
let bytes = store.load_bytes(digest).await?;
if let Some(f_remote) = f_remote { f_remote(bytes.clone()) }
local_store.store_bytes(...).await?;
} else {
assert!(f_remote.is_none());
let file = store.load_file(digest).await?;
local_store.store(...).await?;
} Which seems much nicer than the current clunky approach (and avoids the I'll try to find a moment to refactor along these lines over the next day. |
This reverts commit 90a62b8.
I've got halfway through fixing things up, but ran out of time. I'm on leave for a bit so I'll have to pick this up when I'm back (or someone else can take over, I'm not picky). |
I believe that the fetch-depth issue was fixed by some CI config changes on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Feel free to merge whenever you're ready =)
ByteStoreError::Other(msg) => fmt::Display::fmt(msg, f), | ||
} | ||
} | ||
} | ||
|
||
impl std::error::Error for ByteStoreError {} | ||
|
||
/// Places that write the result of a remote `load` | ||
#[async_trait] | ||
trait LoadDestination: AsyncWrite + Send + Sync + Unpin + 'static { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it seems like rather than a reset
method, this could potentially be generic in terms of new() -> Self
instead, and create a new tempfile
or new Vec
? I think that that would avoid needing to put this in a Mutex
, because you could create it on the stack before the stream attempt.
But in general: I like this interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, mutating to reinitialise via reset
is more than a bit awkward, especially with how it forces the Arc<Mutex<>>
overhead 😦.
In writing this comment, I discovered that the load_monomorphic
call can at least just be &mut dyn LoadDestination
, which makes the implementation of load
much nicer, and avoids 'infecting' the interface, so I've made that adjustment. (However, due to the retry_call
in the body, the current REAPI implementation needs to wrap that &mut
in an Arc<Mutex<>>
still, so that it can be cloned for each call. I suspect this can be improved, but I assume it'd require adjusting retry_call
somehow, and I haven't worked that out yet...)
Having new() -> Self
would be a possibility, but I don't think it works with my desired next steps for this code: separating the actually-downloading parts of load_monomorphic
into a separate trait to support swapping out the remote store for #17840. Something like:
#[async_trait]
pub trait ByteStoreProvider: Sync + Send + 'static {
async fn store_bytes(&self, digest: Digest, bytes: ByteSource) -> Result<(), String>;
async fn load(&self, digest: Digest, destination: &mut dyn LoadDestination) -> Result<bool, String>;
}
That trait is best used behind a trait object (Arc<dyn ByteStoreProvider>
), and thus load
needs to be object-safe/cannot be generic. That is, it cannot be async fn load<D: LoadDestination>(...)
to allow calling D::new()
internally.
(Even without that goal, it's nice for compilation time and binary size for the complicated/significant load_monomorphic
code to be non-generic so that it's only compiled once, rather than monomorphised for each different LoadDestination
parameter it is used with.)
This fixes #17065 by having remote cache loads be able to be streamed to disk. In essence, the remote store now has a
load_file
method in addition toload_bytes
, and thus the caller can decide to download to a file instead.This doesn't make progress towards #18048 (this PR doesn't touch the local store at all), but I think it will help with integrating the remote store with that code: in theory the
File
could be provided in a way that can be part of the "large file pool" directly (and indeed, the decision about whether to download to a file or into memory ties into that).This also does a theoretically unnecessary extra pass over the data (as discussed in #18231) to verify the digest, but I think it'd make sense to do that as a future optimisation, since it'll require refactoring more deeply (down into
sharded_lmdb
andhashing
, I think) and is best to build #18153 once that lands.