Skip to content

Commit

Permalink
Split remote providers into their own crates (#19958)
Browse files Browse the repository at this point in the history
This does a no-functionality-change refactoring of the remote _store_
providers (backed by REAPI and OpenDAL). This splits the concept of a
remote provider out into their own internal "surface" crate
`remote_provider`, plus three crates that are (mostly)
implementation-details of that one:

- `remote_provider/remote_provider_traits` for the traits (and structs!)
that the various providers have to implement
- `remote_provider/remote_provider_reapi` for the gRPC/REAPI-backed
provider
- `remote_provider/remote_provider_opendal` for the OpenDAL-backed
provider

They're arranged like this:

```mermaid
graph BT
   remote("process_execution/remote (existing)")
   store("fs/store (existing)")
   traits("remote_provider_traits (new)")
   grpc("remote_provider_reapi (new)")
   opendal("remote_provider_opendal (new)")
   selector("remote_provider (new)")
   grpc --> traits
   opendal --> traits
   selector --> grpc
   selector --> opendal
   remote -- for remote execution --> grpc
   remote --> selector
   store --> selector
```

Theoretically the new crates other than `remote_provider` are an
implementation detail... except there's a helper in
`remote_provider_reapi` that's used for the remote _execution_ in
`process_execution/remote`, in addition to the byte store and action
cache, hence the dependency there.


This is one point of #19902,
following up on
#19827 (comment).

The commits are individually reviewable, although the overall PR view
gives a slightly more useful view of the overall file renames for some
files.
  • Loading branch information
huonw authored Oct 5, 2023
1 parent b17171f commit 76f8103
Show file tree
Hide file tree
Showing 30 changed files with 689 additions and 439 deletions.
73 changes: 73 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ members = [
"process_execution/pe_nailgun",
"process_execution/remote",
"process_executor",
"remote_provider",
"remote_provider/remote_provider_opendal",
"remote_provider/remote_provider_reapi",
"remote_provider/remote_provider_traits",
"rule_graph",
"sharded_lmdb",
"task_executor",
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/fs/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ madvise = { workspace = true }
parking_lot = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
remote_provider = { path = "../../remote_provider" }
serde = { workspace = true }
serde_derive = { workspace = true }
sharded_lmdb = { path = "../../sharded_lmdb" }
Expand Down
5 changes: 4 additions & 1 deletion src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,13 @@ mod local;
pub mod local_tests;

pub mod remote;
pub use remote::RemoteOptions;
#[cfg(test)]
mod remote_tests;

// Consumers of this crate shouldn't need to worry about the exact crate structure that comes
// together to make a store.
pub use remote_provider::RemoteOptions;

pub struct LocalOptions {
pub files_max_size_bytes: usize,
pub directories_max_size_bytes: usize,
Expand Down
118 changes: 6 additions & 112 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
@@ -1,103 +1,20 @@
// Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::collections::{BTreeMap, HashSet};
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;

use async_oncecell::OnceCell;
use async_trait::async_trait;
use bytes::Bytes;
use futures::Future;
use hashing::Digest;
use log::Level;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remexec::ServerCapabilities;
use remote_provider::{
choose_byte_store_provider, ByteStoreProvider, LoadDestination, RemoteOptions,
};
use tokio::fs::File;
use tokio::io::{AsyncSeekExt, AsyncWrite};
use workunit_store::{in_workunit, ObservationMetric};

mod reapi;
#[cfg(test)]
mod reapi_tests;

pub mod base_opendal;
#[cfg(test)]
mod base_opendal_tests;

#[async_trait]
pub trait ByteStoreProvider: Sync + Send + 'static {
/// Store the bytes readable from `file` into the remote store
async fn store_file(&self, digest: Digest, file: File) -> Result<(), String>;

/// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the
/// bytes are already in memory
async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>;

/// Load the data stored (if any) in the remote store for `digest` into `destination`. Returns
/// true when found, false when not.
async fn load(
&self,
digest: Digest,
destination: &mut dyn LoadDestination,
) -> Result<bool, String>;

/// Return any digests from `digests` that are not (currently) available in the remote store.
async fn list_missing_digests(
&self,
digests: &mut (dyn Iterator<Item = Digest> + Send),
) -> Result<HashSet<Digest>, String>;
}

// TODO: Consider providing `impl Default`, similar to `super::LocalOptions`.
#[derive(Clone)]
pub struct RemoteOptions {
// TODO: this is currently framed for the REAPI provider, with some options used by others, would
// be good to generalise
pub cas_address: String,
pub instance_name: Option<String>,
pub headers: BTreeMap<String, String>,
pub tls_config: grpc_util::tls::Config,
pub chunk_size_bytes: usize,
pub rpc_timeout: Duration,
pub rpc_retries: usize,
pub rpc_concurrency_limit: usize,
pub capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
pub batch_api_size_limit: usize,
}

// TODO: this is probably better positioned somewhere else
pub const REAPI_ADDRESS_SCHEMAS: [&str; 4] = ["grpc://", "grpcs://", "http://", "https://"];

async fn choose_provider(options: RemoteOptions) -> Result<Arc<dyn ByteStoreProvider>, String> {
let address = options.cas_address.clone();
if REAPI_ADDRESS_SCHEMAS.iter().any(|s| address.starts_with(s)) {
Ok(Arc::new(reapi::Provider::new(options).await?))
} else if let Some(path) = address.strip_prefix("file://") {
// It's a bit weird to support local "file://" for a 'remote' store... but this is handy for
// testing.
Ok(Arc::new(base_opendal::Provider::fs(
path,
"byte-store".to_owned(),
options,
)?))
} else if let Some(url) = address.strip_prefix("github-actions-cache+") {
// This is relying on python validating that it was set as `github-actions-cache+https://...` so
// incorrect values could easily slip through here and cause downstream confusion. We're
// intending to change the approach (https://github.com/pantsbuild/pants/issues/19902) so this
// is tolerable for now.
Ok(Arc::new(base_opendal::Provider::github_actions_cache(
url,
"byte-store".to_owned(),
options,
)?))
} else {
Err(format!(
"Cannot initialise remote byte store provider with address {address}, as the scheme is not supported",
))
}
}

#[derive(Clone)]
pub struct ByteStore {
instance_name: Option<String>,
Expand All @@ -110,29 +27,6 @@ impl fmt::Debug for ByteStore {
}
}

/// Places that write the result of a remote `load`
#[async_trait]
pub trait LoadDestination: AsyncWrite + Send + Sync + Unpin + 'static {
/// Clear out the writer and start again, if there's been previous contents written
async fn reset(&mut self) -> std::io::Result<()>;
}

#[async_trait]
impl LoadDestination for tokio::fs::File {
async fn reset(&mut self) -> std::io::Result<()> {
self.rewind().await?;
self.set_len(0).await
}
}

#[async_trait]
impl LoadDestination for Vec<u8> {
async fn reset(&mut self) -> std::io::Result<()> {
self.clear();
Ok(())
}
}

impl ByteStore {
pub fn new(
instance_name: Option<String>,
Expand All @@ -146,7 +40,7 @@ impl ByteStore {

pub async fn from_options(options: RemoteOptions) -> Result<ByteStore, String> {
let instance_name = options.instance_name.clone();
let provider = choose_provider(options).await?;
let provider = choose_byte_store_provider(options).await?;
Ok(ByteStore::new(instance_name, provider))
}

Expand Down
6 changes: 4 additions & 2 deletions src/rust/engine/fs/store/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ use bytes::Bytes;
use grpc_util::tls;
use hashing::{Digest, Fingerprint};
use parking_lot::Mutex;
use remote_provider::{ByteStoreProvider, LoadDestination, RemoteOptions};
use tempfile::TempDir;
use testutil::data::TestData;
use testutil::file::mk_tempfile;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use workunit_store::WorkunitStore;

use crate::remote::{ByteStore, ByteStoreProvider, LoadDestination, RemoteOptions};
use crate::tests::{mk_tempfile, new_cas};
use crate::remote::ByteStore;
use crate::tests::new_cas;
use crate::MEGABYTES;

#[tokio::test]
Expand Down
Loading

0 comments on commit 76f8103

Please sign in to comment.