diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index f347a4d271b..31c5e751519 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -2978,6 +2978,8 @@ dependencies = [ "prost-types", "protos", "rand", + "remote_provider", + "remote_provider_reapi", "sharded_lmdb", "store", "strum", @@ -2992,6 +2994,75 @@ dependencies = [ "workunit_store", ] +[[package]] +name = "remote_provider" +version = "0.0.1" +dependencies = [ + "grpc_util", + "remote_provider_opendal", + "remote_provider_reapi", + "remote_provider_traits", +] + +[[package]] +name = "remote_provider_opendal" +version = "0.0.1" +dependencies = [ + "async-trait", + "bytes", + "futures", + "grpc_util", + "hashing", + "http", + "mock", + "opendal", + "parking_lot 0.12.1", + "prost", + "protos", + "remote_provider_traits", + "tempfile", + "testutil", + "tokio", + "workunit_store", +] + +[[package]] +name = "remote_provider_reapi" +version = "0.0.1" +dependencies = [ + "async-oncecell", + "async-stream", + "async-trait", + "bytes", + "futures", + "grpc_util", + "hashing", + "mock", + "parking_lot 0.12.1", + "protos", + "remote_provider_traits", + "tempfile", + "testutil", + "tokio", + "tokio-util 0.7.8", + "tonic", + "uuid", + "workunit_store", +] + +[[package]] +name = "remote_provider_traits" +version = "0.0.1" +dependencies = [ + "async-oncecell", + "async-trait", + "bytes", + "grpc_util", + "hashing", + "protos", + "tokio", +] + [[package]] name = "reqwest" version = "0.11.18" @@ -3457,6 +3528,7 @@ dependencies = [ "prost", "prost-types", "protos", + "remote_provider", "serde", "serde_derive", "sharded_lmdb", @@ -3628,6 +3700,7 @@ dependencies = [ "hashing", "prost", "protos", + "tempfile", "tokio", ] diff --git a/src/rust/engine/Cargo.toml b/src/rust/engine/Cargo.toml index 4d6b825a1f8..9778210b134 100644 --- a/src/rust/engine/Cargo.toml +++ b/src/rust/engine/Cargo.toml @@ -132,6 +132,10 @@ members = [ "process_execution/pe_nailgun", "process_execution/remote", "process_executor", + "remote_provider", + "remote_provider/remote_provider_opendal", + "remote_provider/remote_provider_reapi", + "remote_provider/remote_provider_traits", "rule_graph", "sharded_lmdb", "task_executor", diff --git a/src/rust/engine/fs/store/Cargo.toml b/src/rust/engine/fs/store/Cargo.toml index 70a1cb227e3..84e19872e2e 100644 --- a/src/rust/engine/fs/store/Cargo.toml +++ b/src/rust/engine/fs/store/Cargo.toml @@ -28,6 +28,7 @@ madvise = { workspace = true } parking_lot = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +remote_provider = { path = "../../remote_provider" } serde = { workspace = true } serde_derive = { workspace = true } sharded_lmdb = { path = "../../sharded_lmdb" } diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index c95bb5f2ff6..4e80586a661 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -84,10 +84,13 @@ mod local; pub mod local_tests; pub mod remote; -pub use remote::RemoteOptions; #[cfg(test)] mod remote_tests; +// Consumers of this crate shouldn't need to worry about the exact crate structure that comes +// together to make a store. +pub use remote_provider::RemoteOptions; + pub struct LocalOptions { pub files_max_size_bytes: usize, pub directories_max_size_bytes: usize, diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 1a006d77b93..26edd23ea68 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -1,103 +1,20 @@ // Copyright 2022 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::collections::{BTreeMap, HashSet}; +use std::collections::HashSet; use std::fmt; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; -use async_oncecell::OnceCell; -use async_trait::async_trait; use bytes::Bytes; use futures::Future; use hashing::Digest; use log::Level; -use protos::gen::build::bazel::remote::execution::v2 as remexec; -use remexec::ServerCapabilities; +use remote_provider::{ + choose_byte_store_provider, ByteStoreProvider, LoadDestination, RemoteOptions, +}; use tokio::fs::File; -use tokio::io::{AsyncSeekExt, AsyncWrite}; use workunit_store::{in_workunit, ObservationMetric}; -mod reapi; -#[cfg(test)] -mod reapi_tests; - -pub mod base_opendal; -#[cfg(test)] -mod base_opendal_tests; - -#[async_trait] -pub trait ByteStoreProvider: Sync + Send + 'static { - /// Store the bytes readable from `file` into the remote store - async fn store_file(&self, digest: Digest, file: File) -> Result<(), String>; - - /// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the - /// bytes are already in memory - async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>; - - /// Load the data stored (if any) in the remote store for `digest` into `destination`. Returns - /// true when found, false when not. - async fn load( - &self, - digest: Digest, - destination: &mut dyn LoadDestination, - ) -> Result; - - /// Return any digests from `digests` that are not (currently) available in the remote store. - async fn list_missing_digests( - &self, - digests: &mut (dyn Iterator + Send), - ) -> Result, String>; -} - -// TODO: Consider providing `impl Default`, similar to `super::LocalOptions`. -#[derive(Clone)] -pub struct RemoteOptions { - // TODO: this is currently framed for the REAPI provider, with some options used by others, would - // be good to generalise - pub cas_address: String, - pub instance_name: Option, - pub headers: BTreeMap, - pub tls_config: grpc_util::tls::Config, - pub chunk_size_bytes: usize, - pub rpc_timeout: Duration, - pub rpc_retries: usize, - pub rpc_concurrency_limit: usize, - pub capabilities_cell_opt: Option>>, - pub batch_api_size_limit: usize, -} - -// TODO: this is probably better positioned somewhere else -pub const REAPI_ADDRESS_SCHEMAS: [&str; 4] = ["grpc://", "grpcs://", "http://", "https://"]; - -async fn choose_provider(options: RemoteOptions) -> Result, String> { - let address = options.cas_address.clone(); - if REAPI_ADDRESS_SCHEMAS.iter().any(|s| address.starts_with(s)) { - Ok(Arc::new(reapi::Provider::new(options).await?)) - } else if let Some(path) = address.strip_prefix("file://") { - // It's a bit weird to support local "file://" for a 'remote' store... but this is handy for - // testing. - Ok(Arc::new(base_opendal::Provider::fs( - path, - "byte-store".to_owned(), - options, - )?)) - } else if let Some(url) = address.strip_prefix("github-actions-cache+") { - // This is relying on python validating that it was set as `github-actions-cache+https://...` so - // incorrect values could easily slip through here and cause downstream confusion. We're - // intending to change the approach (https://github.com/pantsbuild/pants/issues/19902) so this - // is tolerable for now. - Ok(Arc::new(base_opendal::Provider::github_actions_cache( - url, - "byte-store".to_owned(), - options, - )?)) - } else { - Err(format!( - "Cannot initialise remote byte store provider with address {address}, as the scheme is not supported", - )) - } -} - #[derive(Clone)] pub struct ByteStore { instance_name: Option, @@ -110,29 +27,6 @@ impl fmt::Debug for ByteStore { } } -/// Places that write the result of a remote `load` -#[async_trait] -pub trait LoadDestination: AsyncWrite + Send + Sync + Unpin + 'static { - /// Clear out the writer and start again, if there's been previous contents written - async fn reset(&mut self) -> std::io::Result<()>; -} - -#[async_trait] -impl LoadDestination for tokio::fs::File { - async fn reset(&mut self) -> std::io::Result<()> { - self.rewind().await?; - self.set_len(0).await - } -} - -#[async_trait] -impl LoadDestination for Vec { - async fn reset(&mut self) -> std::io::Result<()> { - self.clear(); - Ok(()) - } -} - impl ByteStore { pub fn new( instance_name: Option, @@ -146,7 +40,7 @@ impl ByteStore { pub async fn from_options(options: RemoteOptions) -> Result { let instance_name = options.instance_name.clone(); - let provider = choose_provider(options).await?; + let provider = choose_byte_store_provider(options).await?; Ok(ByteStore::new(instance_name, provider)) } diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 73532f8ae81..677de395b54 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -8,14 +8,16 @@ use bytes::Bytes; use grpc_util::tls; use hashing::{Digest, Fingerprint}; use parking_lot::Mutex; +use remote_provider::{ByteStoreProvider, LoadDestination, RemoteOptions}; use tempfile::TempDir; use testutil::data::TestData; +use testutil::file::mk_tempfile; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use workunit_store::WorkunitStore; -use crate::remote::{ByteStore, ByteStoreProvider, LoadDestination, RemoteOptions}; -use crate::tests::{mk_tempfile, new_cas}; +use crate::remote::ByteStore; +use crate::tests::new_cas; use crate::MEGABYTES; #[tokio::test] diff --git a/src/rust/engine/fs/store/src/tests.rs b/src/rust/engine/fs/store/src/tests.rs index 2f57d1c79a6..0b25c842082 100644 --- a/src/rust/engine/fs/store/src/tests.rs +++ b/src/rust/engine/fs/store/src/tests.rs @@ -1,7 +1,6 @@ // Copyright 2022 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; -use std::fs::File; use std::io::Read; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; @@ -9,17 +8,16 @@ use std::time::Duration; use tempfile::TempDir; use testutil::data::{TestData, TestDirectory}; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use fs::{ DigestEntry, DirectoryDigest, FileEntry, Link, PathStat, Permissions, RelativePath, EMPTY_DIRECTORY_DIGEST, }; use grpc_util::prost::MessageExt; use grpc_util::tls; -use hashing::{Digest, Fingerprint}; +use hashing::Digest; use mock::{RequestType, StubCAS}; use protos::gen::build::bazel::remote::execution::v2 as remexec; -use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use workunit_store::WorkunitStore; use crate::local::ByteStore; @@ -30,62 +28,12 @@ use crate::{ pub(crate) const STORE_BATCH_API_SIZE_LIMIT: usize = 4 * 1024 * 1024; -pub fn big_file_fingerprint() -> Fingerprint { - Fingerprint::from_hex_string("8dfba0adc29389c63062a68d76b2309b9a2486f1ab610c4720beabbdc273301f") - .unwrap() -} - -pub fn big_file_bytes() -> Bytes { - let mut f = File::open( - PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("testdata") - .join("all_the_henries"), - ) - .expect("Error opening all_the_henries"); - let mut bytes = Vec::new(); - f.read_to_end(&mut bytes) - .expect("Error reading all_the_henries"); - Bytes::from(bytes) -} - -pub fn extra_big_file_fingerprint() -> Fingerprint { - Fingerprint::from_hex_string("8ae6924fa104396614b99ce1f6aa3b4d85273ef158191b3784c6dbbdb47055cd") - .unwrap() -} - -pub fn extra_big_file_digest() -> Digest { - Digest::new(extra_big_file_fingerprint(), extra_big_file_bytes().len()) -} - -pub fn extra_big_file_bytes() -> Bytes { - let bfb = big_file_bytes(); - let mut bytes = BytesMut::with_capacity(2 * bfb.len()); - bytes.extend_from_slice(&bfb); - bytes.extend_from_slice(&bfb); - bytes.freeze() -} - pub async fn load_file_bytes(store: &Store, digest: Digest) -> Result { store .load_file_bytes_with(digest, Bytes::copy_from_slice) .await } -pub async fn mk_tempfile(contents: Option<&[u8]>) -> tokio::fs::File { - let file = tokio::task::spawn_blocking(tempfile::tempfile) - .await - .unwrap() - .unwrap(); - let mut file = tokio::fs::File::from_std(file); - - if let Some(contents) = contents { - file.write_all(contents).await.unwrap(); - file.rewind().await.unwrap(); - } - - file -} - /// /// Create a StubCas with a file and a directory inside. /// @@ -896,33 +844,35 @@ async fn does_not_reupload_big_file_already_in_cas() { let dir = TempDir::new().unwrap(); let cas = new_empty_cas(); + let testdata = TestData::double_all_the_henries(); + new_local_store(dir.path()) - .store_file_bytes(extra_big_file_bytes(), false) + .store_file_bytes(testdata.bytes(), false) .await .expect("Error storing file locally"); new_store(dir.path(), &cas.address()) .await - .ensure_remote_has_recursive(vec![extra_big_file_digest()]) + .ensure_remote_has_recursive(vec![testdata.digest()]) .await .expect("Error uploading directory"); assert_eq!(cas.write_message_sizes.lock().len(), 1); assert_eq!( - cas.blobs.lock().get(&extra_big_file_fingerprint()), - Some(&extra_big_file_bytes()) + cas.blobs.lock().get(&testdata.fingerprint()), + Some(&testdata.bytes()) ); new_store(dir.path(), &cas.address()) .await - .ensure_remote_has_recursive(vec![extra_big_file_digest()]) + .ensure_remote_has_recursive(vec![testdata.digest()]) .await .expect("Error uploading directory"); assert_eq!(cas.write_message_sizes.lock().len(), 1); assert_eq!( - cas.blobs.lock().get(&extra_big_file_fingerprint()), - Some(&extra_big_file_bytes()) + cas.blobs.lock().get(&testdata.fingerprint()), + Some(&testdata.bytes()) ); } @@ -1728,8 +1678,9 @@ async fn big_file_immutable_link() { let output_dir = materialize_dir.path().join("output_dir"); let nested_output_file = output_dir.join("file"); - let file_bytes = extra_big_file_bytes(); - let file_digest = extra_big_file_digest(); + let testdata = TestData::double_all_the_henries(); + let file_bytes = testdata.bytes(); + let file_digest = testdata.digest(); let nested_directory = remexec::Directory { files: vec![remexec::FileNode { diff --git a/src/rust/engine/process_execution/remote/Cargo.toml b/src/rust/engine/process_execution/remote/Cargo.toml index ae1a87e95e7..ac5f04ea549 100644 --- a/src/rust/engine/process_execution/remote/Cargo.toml +++ b/src/rust/engine/process_execution/remote/Cargo.toml @@ -37,6 +37,8 @@ strum = { workspace = true } strum_macros = { workspace = true } parking_lot = { workspace = true } opendal = { workspace = true } +remote_provider = { path = "../../remote_provider" } +remote_provider_reapi = { path = "../../remote_provider/remote_provider_reapi" } [dev-dependencies] env_logger = { workspace = true } diff --git a/src/rust/engine/process_execution/remote/src/remote.rs b/src/rust/engine/process_execution/remote/src/remote.rs index 7ba931d3f04..4981f09b880 100644 --- a/src/rust/engine/process_execution/remote/src/remote.rs +++ b/src/rust/engine/process_execution/remote/src/remote.rs @@ -25,7 +25,6 @@ use remexec::{ execution_stage::Value as ExecutionStageValue, Action, Command, ExecuteRequest, ExecuteResponse, ExecutedActionMetadata, ServerCapabilities, WaitExecutionRequest, }; -use tonic::metadata::BinaryMetadataValue; use tonic::{Code, Request, Status}; use concrete_time::TimeSpan; @@ -34,6 +33,7 @@ use grpc_util::headers_to_http_header_map; use grpc_util::prost::MessageExt; use grpc_util::{layered_service, status_to_str, LayeredService}; use hashing::{Digest, Fingerprint}; +use remote_provider_reapi::apply_headers; use store::{Store, StoreError}; use task_executor::Executor; use workunit_store::{ @@ -1051,26 +1051,6 @@ async fn populate_fallible_execution_result_for_timeout( }) } -/// Apply REAPI request metadata header to a `tonic::Request`. -pub(crate) fn apply_headers(mut request: Request, build_id: &str) -> Request { - let reapi_request_metadata = remexec::RequestMetadata { - tool_details: Some(remexec::ToolDetails { - tool_name: "pants".into(), - ..remexec::ToolDetails::default() - }), - tool_invocation_id: build_id.to_string(), - ..remexec::RequestMetadata::default() - }; - - let md = request.metadata_mut(); - md.insert_bin( - "google.devtools.remoteexecution.v1test.requestmetadata-bin", - BinaryMetadataValue::try_from(reapi_request_metadata.to_bytes()).unwrap(), - ); - - request -} - pub async fn store_proto_locally( store: &Store, proto: &P, diff --git a/src/rust/engine/process_execution/remote/src/remote_cache.rs b/src/rust/engine/process_execution/remote/src/remote_cache.rs index 32a48c25f12..605d030a956 100644 --- a/src/rust/engine/process_execution/remote/src/remote_cache.rs +++ b/src/rust/engine/process_execution/remote/src/remote_cache.rs @@ -3,20 +3,19 @@ use std::collections::{BTreeMap, HashSet}; use std::fmt::{self, Debug}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use async_trait::async_trait; use fs::{directory, DigestTrie, RelativePath, SymlinkBehavior}; use futures::future::{BoxFuture, TryFutureExt}; use futures::FutureExt; -use grpc_util::tls; use hashing::Digest; use parking_lot::Mutex; use protos::gen::build::bazel::remote::execution::v2 as remexec; use protos::require_digest; use remexec::{ActionResult, Command, Tree}; -use store::remote::REAPI_ADDRESS_SCHEMAS; -use store::{RemoteOptions, Store, StoreError}; +use remote_provider::{choose_action_cache_provider, ActionCacheProvider}; +use store::{Store, StoreError}; use workunit_store::{ in_workunit, Level, Metric, ObservationMetric, RunningWorkunit, WorkunitMetadata, }; @@ -28,12 +27,9 @@ use process_execution::{ }; use process_execution::{make_execute_request, EntireExecuteRequest}; -mod base_opendal; -#[cfg(test)] -mod base_opendal_tests; -mod reapi; -#[cfg(test)] -mod reapi_tests; +// Consumers of this crate shouldn't need to worry about the exact crate structure that comes +// together to make a remote cache command runner. +pub use remote_provider::RemoteCacheProviderOptions; #[derive(Clone, Copy, Debug, strum_macros::EnumString)] #[strum(serialize_all = "snake_case")] @@ -43,86 +39,6 @@ pub enum RemoteCacheWarningsBehavior { Backoff, } -/// This `ActionCacheProvider` trait captures the operations required to be able to cache command -/// executions remotely. -#[async_trait] -pub trait ActionCacheProvider: Sync + Send + 'static { - async fn update_action_result( - &self, - action_digest: Digest, - action_result: ActionResult, - ) -> Result<(), String>; - - async fn get_action_result( - &self, - action_digest: Digest, - context: &Context, - ) -> Result, String>; -} - -#[derive(Clone)] -pub struct RemoteCacheProviderOptions { - // TODO: this is currently framed for the REAPI provider, with some options used by others, would - // be good to generalise - // TODO: this is structurally very similar to `RemoteOptions`: maybe they should be the same? (see - // comment in `choose_provider` too) - pub instance_name: Option, - pub action_cache_address: String, - pub root_ca_certs: Option>, - pub mtls_data: Option<(Vec, Vec)>, - pub headers: BTreeMap, - pub concurrency_limit: usize, - pub rpc_timeout: Duration, -} - -async fn choose_provider( - options: RemoteCacheProviderOptions, -) -> Result, String> { - let address = options.action_cache_address.clone(); - - // TODO: we shouldn't need to gin up a whole copy of this struct; it'd be better to have the two - // set of remoting options managed together. - let remote_options = RemoteOptions { - cas_address: address.clone(), - instance_name: options.instance_name.clone(), - headers: options.headers.clone(), - tls_config: tls::Config::new(options.root_ca_certs.clone(), options.mtls_data.clone())?, - rpc_timeout: options.rpc_timeout, - rpc_concurrency_limit: options.concurrency_limit, - // TODO: these should either be passed through or not synthesized here - chunk_size_bytes: 0, - rpc_retries: 0, - capabilities_cell_opt: None, - batch_api_size_limit: 0, - }; - - if REAPI_ADDRESS_SCHEMAS.iter().any(|s| address.starts_with(s)) { - Ok(Arc::new(reapi::Provider::new(options).await?)) - } else if let Some(path) = address.strip_prefix("file://") { - // It's a bit weird to support local "file://" for a 'remote' store... but this is handy for - // testing. - Ok(Arc::new(base_opendal::Provider::fs( - path, - "action-cache".to_owned(), - remote_options, - )?)) - } else if let Some(url) = address.strip_prefix("github-actions-cache+") { - // This is relying on python validating that it was set as `github-actions-cache+https://...` so - // incorrect values could easily slip through here and cause downstream confusion. We're - // intending to change the approach (https://github.com/pantsbuild/pants/issues/19902) so this - // is tolerable for now. - Ok(Arc::new(base_opendal::Provider::github_actions_cache( - url, - "action-cache".to_owned(), - remote_options, - )?)) - } else { - Err(format!( - "Cannot initialise remote action cache provider with address {address}, as the scheme is not supported", - )) - } -} - pub struct RemoteCacheRunnerOptions { pub inner: Arc, pub instance_name: Option, @@ -198,7 +114,7 @@ impl CommandRunner { runner_options: RemoteCacheRunnerOptions, provider_options: RemoteCacheProviderOptions, ) -> Result { - let provider = choose_provider(provider_options).await?; + let provider = choose_action_cache_provider(provider_options).await?; Ok(Self::new(runner_options, provider)) } @@ -645,7 +561,7 @@ async fn check_action_cache( let start = Instant::now(); let response = provider - .get_action_result(action_digest, context) + .get_action_result(action_digest, &context.build_id) .and_then(|action_result| async move { let Some(action_result) = action_result else { return Ok(None); diff --git a/src/rust/engine/process_execution/remote/src/remote_cache/base_opendal.rs b/src/rust/engine/process_execution/remote/src/remote_cache/base_opendal.rs deleted file mode 100644 index 52c9660b150..00000000000 --- a/src/rust/engine/process_execution/remote/src/remote_cache/base_opendal.rs +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). -// Licensed under the Apache License, Version 2.0 (see LICENSE). -#![allow(dead_code)] - -use async_trait::async_trait; -use bytes::Bytes; -use grpc_util::prost::MessageExt; -use hashing::Digest; -use prost::Message; -use protos::gen::build::bazel::remote::execution::v2 as remexec; -use remexec::ActionResult; - -use super::ActionCacheProvider; -use process_execution::Context; - -pub use store::remote::{base_opendal::Provider, ByteStoreProvider}; - -#[async_trait] -impl ActionCacheProvider for Provider { - async fn update_action_result( - &self, - action_digest: Digest, - action_result: ActionResult, - ) -> Result<(), String> { - let bytes = action_result.to_bytes(); - self.store_bytes(action_digest, bytes).await - } - async fn get_action_result( - &self, - action_digest: Digest, - _context: &Context, - ) -> Result, String> { - let mut destination = Vec::new(); - - match self - .load_without_validation(action_digest, &mut destination) - .await? - { - false => Ok(None), - true => { - let bytes = Bytes::from(destination); - Ok(Some(ActionResult::decode(bytes).map_err(|e| { - format!("failed to decode action result for digest {action_digest:?}: {e}") - })?)) - } - } - } -} diff --git a/src/rust/engine/process_execution/remote/src/remote_cache_tests.rs b/src/rust/engine/process_execution/remote/src/remote_cache_tests.rs index 51ee7aa9a30..3b59b4a21a6 100644 --- a/src/rust/engine/process_execution/remote/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/remote/src/remote_cache_tests.rs @@ -16,14 +16,13 @@ use grpc_util::tls; use hashing::{Digest, EMPTY_DIGEST}; use mock::StubCAS; use protos::gen::build::bazel::remote::execution::v2 as remexec; +use remote_provider::RemoteCacheProviderOptions; use store::{RemoteOptions, Store}; use testutil::data::{TestData, TestDirectory, TestTree}; use workunit_store::{RunId, RunningWorkunit, WorkunitStore}; use crate::remote::ensure_action_stored_locally; -use crate::remote_cache::{ - RemoteCacheProviderOptions, RemoteCacheRunnerOptions, RemoteCacheWarningsBehavior, -}; +use crate::remote_cache::{RemoteCacheRunnerOptions, RemoteCacheWarningsBehavior}; use process_execution::{ make_execute_request, CacheContentBehavior, CommandRunner as CommandRunnerTrait, Context, EntireExecuteRequest, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, diff --git a/src/rust/engine/remote_provider/Cargo.toml b/src/rust/engine/remote_provider/Cargo.toml new file mode 100644 index 00000000000..1264fa176b5 --- /dev/null +++ b/src/rust/engine/remote_provider/Cargo.toml @@ -0,0 +1,12 @@ +[package] +version = "0.0.1" +edition = "2021" +name = "remote_provider" +authors = ["Pants Build "] +publish = false + +[dependencies] +grpc_util = { path = "../grpc_util" } +remote_provider_opendal = { path = "./remote_provider_opendal" } +remote_provider_reapi = { path = "./remote_provider_reapi" } +remote_provider_traits = { path = "./remote_provider_traits" } diff --git a/src/rust/engine/remote_provider/remote_provider_opendal/Cargo.toml b/src/rust/engine/remote_provider/remote_provider_opendal/Cargo.toml new file mode 100644 index 00000000000..6737238d40d --- /dev/null +++ b/src/rust/engine/remote_provider/remote_provider_opendal/Cargo.toml @@ -0,0 +1,26 @@ +[package] +version = "0.0.1" +edition = "2021" +name = "remote_provider_opendal" +authors = ["Pants Build "] +publish = false + +[dependencies] +async-trait = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +grpc_util = { path = "../../grpc_util" } +hashing = { path = "../../hashing" } +http = { workspace = true } +parking_lot = { workspace = true } +prost = { workspace = true } +protos = { path = "../../protos" } +opendal = { workspace = true } +remote_provider_traits = { path = "../remote_provider_traits" } +tokio = { workspace = true } +workunit_store = { path = "../../workunit_store" } + +[dev-dependencies] +mock = { path = "../../testutil/mock" } +tempfile = { workspace = true } +testutil = { path = "../../testutil" } diff --git a/src/rust/engine/process_execution/remote/src/remote_cache/base_opendal_tests.rs b/src/rust/engine/remote_provider/remote_provider_opendal/src/action_cache_tests.rs similarity index 88% rename from src/rust/engine/process_execution/remote/src/remote_cache/base_opendal_tests.rs rename to src/rust/engine/remote_provider/remote_provider_opendal/src/action_cache_tests.rs index 38edeed3ac8..5af8b14deb6 100644 --- a/src/rust/engine/process_execution/remote/src/remote_cache/base_opendal_tests.rs +++ b/src/rust/engine/remote_provider/remote_provider_opendal/src/action_cache_tests.rs @@ -9,13 +9,11 @@ use grpc_util::prost::MessageExt; use grpc_util::tls; use hashing::Digest; use opendal::services::Memory; -use process_execution::Context; use prost::Message; use protos::gen::build::bazel::remote::execution::v2 as remexec; -use store::remote::RemoteOptions; +use remote_provider_traits::{ActionCacheProvider, RemoteOptions}; -use super::base_opendal::Provider; -use super::ActionCacheProvider; +use super::Provider; const BASE: &str = "opendal-testing-base"; @@ -69,9 +67,7 @@ async fn get_action_result_existing() { write_test_data(&provider, action_digest, action_result.clone()).await; assert_eq!( - provider - .get_action_result(action_digest, &Context::default()) - .await, + provider.get_action_result(action_digest, "").await, Ok(Some(action_result)) ); } @@ -83,9 +79,7 @@ async fn get_action_result_missing() { let action_digest = Digest::of_bytes(b"update_action_cache test"); assert_eq!( - provider - .get_action_result(action_digest, &Context::default()) - .await, + provider.get_action_result(action_digest, "").await, Ok(None) ); } diff --git a/src/rust/engine/fs/store/src/remote/base_opendal_tests.rs b/src/rust/engine/remote_provider/remote_provider_opendal/src/byte_store_tests.rs similarity index 85% rename from src/rust/engine/fs/store/src/remote/base_opendal_tests.rs rename to src/rust/engine/remote_provider/remote_provider_opendal/src/byte_store_tests.rs index d24e810fd4e..d23e623ac22 100644 --- a/src/rust/engine/fs/store/src/remote/base_opendal_tests.rs +++ b/src/rust/engine/remote_provider/remote_provider_opendal/src/byte_store_tests.rs @@ -5,19 +5,17 @@ use std::time::Duration; use bytes::Bytes; use grpc_util::tls; -use hashing::{Digest, Fingerprint}; use opendal::services::Memory; +use remote_provider_traits::{ByteStoreProvider, RemoteOptions}; use testutil::data::TestData; +use testutil::file::mk_tempfile; -use crate::tests::{big_file_bytes, big_file_fingerprint, mk_tempfile}; - -use super::base_opendal::Provider; -use super::{ByteStoreProvider, RemoteOptions}; +use crate::Provider; const BASE: &str = "opendal-testing-base"; -fn test_path_fingerprint(fingerprint: Fingerprint) -> String { - let fingerprint = fingerprint.to_string(); +fn test_path(data: &TestData) -> String { + let fingerprint = data.fingerprint().to_string(); format!( "{}/{}/{}/{}", BASE, @@ -26,9 +24,6 @@ fn test_path_fingerprint(fingerprint: Fingerprint) -> String { fingerprint ) } -fn test_path(data: &TestData) -> String { - test_path_fingerprint(data.fingerprint()) -} fn remote_options() -> RemoteOptions { RemoteOptions { cas_address: "".to_owned(), @@ -156,13 +151,9 @@ async fn load_without_validation_missing() { assert!(destination.is_empty()) } -async fn assert_store(provider: &Provider, fingerprint: Fingerprint, bytes: Bytes) { - let result = provider - .operator - .read(&test_path_fingerprint(fingerprint)) - .await - .unwrap(); - assert_eq!(result, bytes); +async fn assert_store(provider: &Provider, testdata: &TestData) { + let result = provider.operator.read(&test_path(testdata)).await.unwrap(); + assert_eq!(result, testdata.bytes()); } #[tokio::test] @@ -175,7 +166,7 @@ async fn store_bytes_data() { .await .unwrap(); - assert_store(&provider, testdata.fingerprint(), testdata.bytes()).await; + assert_store(&provider, &testdata).await; } #[tokio::test] @@ -208,25 +199,26 @@ async fn store_file_one_chunk() { ) .await .unwrap(); - assert_store(&provider, testdata.fingerprint(), testdata.bytes()).await; + assert_store(&provider, &testdata).await; } #[tokio::test] async fn store_file_multiple_chunks() { + let testdata = TestData::all_the_henries(); let provider = new_provider(); - let all_the_henries = big_file_bytes(); // Our current chunk size is the tokio::io::copy default (8KiB at // the time of writing). - assert!(all_the_henries.len() > 8 * 1024); - let fingerprint = big_file_fingerprint(); - let digest = Digest::new(fingerprint, all_the_henries.len()); + assert!(testdata.len() > 8 * 1024); provider - .store_file(digest, mk_tempfile(Some(&all_the_henries)).await) + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) .await .unwrap(); - assert_store(&provider, fingerprint, all_the_henries).await; + assert_store(&provider, &testdata).await; } #[tokio::test] diff --git a/src/rust/engine/fs/store/src/remote/base_opendal.rs b/src/rust/engine/remote_provider/remote_provider_opendal/src/lib.rs old mode 100755 new mode 100644 similarity index 82% rename from src/rust/engine/fs/store/src/remote/base_opendal.rs rename to src/rust/engine/remote_provider/remote_provider_opendal/src/lib.rs index 79c85131efd..9fd358d37b3 --- a/src/rust/engine/fs/store/src/remote/base_opendal.rs +++ b/src/rust/engine/remote_provider/remote_provider_opendal/src/lib.rs @@ -1,6 +1,29 @@ // Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -#![allow(dead_code)] + +#![deny(warnings)] +// Enable all clippy lints except for many of the pedantic ones. It's a shame this needs to be copied and pasted across crates, but there doesn't appear to be a way to include inner attributes from a common source. +#![deny( + clippy::all, + clippy::default_trait_access, + clippy::expl_impl_clone_on_copy, + clippy::if_not_else, + clippy::needless_continue, + clippy::unseparated_literal_suffix, + clippy::used_underscore_binding +)] +// It is often more clear to show that nothing is being moved. +#![allow(clippy::match_ref_pats)] +// Subjective style. +#![allow( + clippy::len_without_is_empty, + clippy::redundant_field_names, + clippy::too_many_arguments +)] +// Default isn't as big a deal as people seem to think it is. +#![allow(clippy::new_without_default, clippy::new_ret_no_self)] +// Arc can be more clear than needing to grok Orderings: +#![allow(clippy::mutex_atomic)] use std::collections::HashSet; use std::time::Instant; @@ -8,14 +31,25 @@ use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; use futures::future; +use grpc_util::prost::MessageExt; use hashing::{async_verified_copy, Digest, Fingerprint, EMPTY_DIGEST}; use http::header::AUTHORIZATION; use opendal::layers::{ConcurrentLimitLayer, RetryLayer, TimeoutLayer}; use opendal::{Builder, Operator}; +use prost::Message; +use protos::gen::build::bazel::remote::execution::v2 as remexec; +use remexec::ActionResult; use tokio::fs::File; use workunit_store::ObservationMetric; -use super::{ByteStoreProvider, LoadDestination, RemoteOptions}; +use remote_provider_traits::{ + ActionCacheProvider, ByteStoreProvider, LoadDestination, RemoteOptions, +}; + +#[cfg(test)] +mod action_cache_tests; +#[cfg(test)] +mod byte_store_tests; const GITHUB_ACTIONS_CACHE_VERSION: &str = "pants-1"; @@ -26,9 +60,7 @@ pub enum LoadMode { } pub struct Provider { - /// This is public for easier testing of the action cache provider - // TODO: move all the providers into a single crate so that the pub isn't necessary - pub operator: Operator, + operator: Operator, base_path: String, } @@ -277,3 +309,35 @@ impl ByteStoreProvider for Provider { Ok(existences.into_iter().flatten().collect()) } } + +#[async_trait] +impl ActionCacheProvider for Provider { + async fn update_action_result( + &self, + action_digest: Digest, + action_result: ActionResult, + ) -> Result<(), String> { + let bytes = action_result.to_bytes(); + self.store_bytes(action_digest, bytes).await + } + async fn get_action_result( + &self, + action_digest: Digest, + _build_id: &str, + ) -> Result, String> { + let mut destination = Vec::new(); + + match self + .load_without_validation(action_digest, &mut destination) + .await? + { + false => Ok(None), + true => { + let bytes = Bytes::from(destination); + Ok(Some(ActionResult::decode(bytes).map_err(|e| { + format!("failed to decode action result for digest {action_digest:?}: {e}") + })?)) + } + } + } +} diff --git a/src/rust/engine/remote_provider/remote_provider_reapi/Cargo.toml b/src/rust/engine/remote_provider/remote_provider_reapi/Cargo.toml new file mode 100644 index 00000000000..3f28caa04ec --- /dev/null +++ b/src/rust/engine/remote_provider/remote_provider_reapi/Cargo.toml @@ -0,0 +1,28 @@ +[package] +version = "0.0.1" +edition = "2021" +name = "remote_provider_reapi" +authors = ["Pants Build "] +publish = false + +[dependencies] +async-oncecell = { workspace = true } +async-stream = { workspace = true } +async-trait = { workspace = true } +bytes = { workspace = true } +futures = { workspace = true } +grpc_util = { path = "../../grpc_util" } +hashing = { path = "../../hashing" } +parking_lot = { workspace = true } +protos = { path = "../../protos" } +remote_provider_traits = { path = "../remote_provider_traits" } +tokio = { workspace = true } +tokio-util = { workspace = true, features = ["io"] } +tonic = { workspace = true } +uuid = { workspace = true, features = ["v4"] } +workunit_store = { path = "../../workunit_store" } + +[dev-dependencies] +mock = { path = "../../testutil/mock" } +tempfile = { workspace = true } +testutil = { path = "../../testutil" } diff --git a/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs b/src/rust/engine/remote_provider/remote_provider_reapi/src/action_cache.rs similarity index 93% rename from src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs rename to src/rust/engine/remote_provider/remote_provider_reapi/src/action_cache.rs index 9f5e5fa4551..fea62175cf0 100644 --- a/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs +++ b/src/rust/engine/remote_provider/remote_provider_reapi/src/action_cache.rs @@ -10,13 +10,12 @@ use hashing::Digest; use protos::gen::build::bazel::remote::execution::v2 as remexec; use remexec::action_cache_client::ActionCacheClient; use remexec::ActionResult; +use remote_provider_traits::{ActionCacheProvider, RemoteCacheProviderOptions}; use workunit_store::Metric; -use crate::remote::apply_headers; -use process_execution::Context; use tonic::{Code, Request}; -use super::{ActionCacheProvider, RemoteCacheProviderOptions}; +use crate::apply_headers; pub struct Provider { instance_name: Option, @@ -98,7 +97,7 @@ impl ActionCacheProvider for Provider { async fn get_action_result( &self, action_digest: Digest, - context: &Context, + build_id: &str, ) -> Result, String> { let client = self.action_cache_client.as_ref().clone(); let response = retry_call( @@ -109,7 +108,7 @@ impl ActionCacheProvider for Provider { instance_name: self.instance_name.clone().unwrap_or_default(), ..remexec::GetActionResultRequest::default() }; - let request = apply_headers(Request::new(request), &context.build_id); + let request = apply_headers(Request::new(request), build_id); async move { client.get_action_result(request).await } }, status_is_retryable, diff --git a/src/rust/engine/process_execution/remote/src/remote_cache/reapi_tests.rs b/src/rust/engine/remote_provider/remote_provider_reapi/src/action_cache_tests.rs similarity index 89% rename from src/rust/engine/process_execution/remote/src/remote_cache/reapi_tests.rs rename to src/rust/engine/remote_provider/remote_provider_reapi/src/action_cache_tests.rs index fa6ab7c6071..429f3395a8f 100644 --- a/src/rust/engine/process_execution/remote/src/remote_cache/reapi_tests.rs +++ b/src/rust/engine/remote_provider/remote_provider_reapi/src/action_cache_tests.rs @@ -4,10 +4,10 @@ use std::{collections::BTreeMap, time::Duration}; use hashing::Digest; use mock::StubCAS; -use process_execution::Context; use protos::gen::build::bazel::remote::execution::v2 as remexec; +use remote_provider_traits::{ActionCacheProvider, RemoteCacheProviderOptions}; -use super::{reapi::Provider, ActionCacheProvider, RemoteCacheProviderOptions}; +use super::action_cache::Provider; async fn new_provider(cas: &StubCAS) -> Provider { Provider::new(RemoteCacheProviderOptions { @@ -40,9 +40,7 @@ async fn get_action_result_existing() { .insert(action_digest.hash, action_result.clone()); assert_eq!( - provider - .get_action_result(action_digest, &Context::default()) - .await, + provider.get_action_result(action_digest, "").await, Ok(Some(action_result)) ); } @@ -55,9 +53,7 @@ async fn get_action_result_missing() { let action_digest = Digest::of_bytes(b"update_action_cache test"); assert_eq!( - provider - .get_action_result(action_digest, &Context::default()) - .await, + provider.get_action_result(action_digest, "").await, Ok(None) ); } @@ -70,7 +66,7 @@ async fn get_action_result_grpc_error() { let action_digest = Digest::of_bytes(b"get_action_result_grpc_error test"); let error = provider - .get_action_result(action_digest, &Context::default()) + .get_action_result(action_digest, "") .await .expect_err("Want err"); diff --git a/src/rust/engine/fs/store/src/remote/reapi.rs b/src/rust/engine/remote_provider/remote_provider_reapi/src/byte_store.rs old mode 100755 new mode 100644 similarity index 99% rename from src/rust/engine/fs/store/src/remote/reapi.rs rename to src/rust/engine/remote_provider/remote_provider_reapi/src/byte_store.rs index d78a368ca27..6ee5e3a75bd --- a/src/rust/engine/fs/store/src/remote/reapi.rs +++ b/src/rust/engine/remote_provider/remote_provider_reapi/src/byte_store.rs @@ -29,9 +29,7 @@ use tokio::sync::Mutex; use tonic::{Code, Request, Status}; use workunit_store::{Metric, ObservationMetric}; -use crate::RemoteOptions; - -use super::{ByteStoreProvider, LoadDestination}; +use remote_provider_traits::{ByteStoreProvider, LoadDestination, RemoteOptions}; pub struct Provider { instance_name: Option, diff --git a/src/rust/engine/fs/store/src/remote/reapi_tests.rs b/src/rust/engine/remote_provider/remote_provider_reapi/src/byte_store_tests.rs similarity index 84% rename from src/rust/engine/fs/store/src/remote/reapi_tests.rs rename to src/rust/engine/remote_provider/remote_provider_reapi/src/byte_store_tests.rs index 23fc279709a..443eb0d979f 100644 --- a/src/rust/engine/fs/store/src/remote/reapi_tests.rs +++ b/src/rust/engine/remote_provider/remote_provider_reapi/src/byte_store_tests.rs @@ -5,17 +5,19 @@ use std::time::Duration; use bytes::Bytes; use grpc_util::tls; -use hashing::{Digest, Fingerprint}; use mock::{RequestType, StubCAS}; use tempfile::TempDir; use testutil::data::TestData; +use testutil::file::mk_tempfile; use tokio::fs::File; +use workunit_store::WorkunitStore; -use crate::remote::{ByteStoreProvider, RemoteOptions}; -use crate::tests::{big_file_bytes, big_file_fingerprint, mk_tempfile, new_cas}; -use crate::MEGABYTES; +use remote_provider_traits::{ByteStoreProvider, RemoteOptions}; -use super::reapi::Provider; +use crate::byte_store::Provider; + +const MEGABYTES: usize = 1024 * 1024; +const STORE_BATCH_API_SIZE_LIMIT: usize = 4 * MEGABYTES; fn remote_options( cas_address: String, @@ -40,15 +42,19 @@ async fn new_provider(cas: &StubCAS) -> Provider { Provider::new(remote_options( cas.address(), 10 * MEGABYTES, - crate::tests::STORE_BATCH_API_SIZE_LIMIT, + STORE_BATCH_API_SIZE_LIMIT, )) .await .unwrap() } async fn load_test(chunk_size: usize) { + let _ = WorkunitStore::setup_for_tests(); let testdata = TestData::roland(); - let cas = new_cas(chunk_size); + let cas = StubCAS::builder() + .chunk_size_bytes(chunk_size) + .file(&testdata) + .build(); let provider = new_provider(&cas).await; let mut destination = Vec::new(); @@ -146,15 +152,9 @@ async fn load_existing_wrong_digest_error() { ) } -fn assert_cas_store( - cas: &StubCAS, - fingerprint: Fingerprint, - bytes: Bytes, - chunks: usize, - chunk_size: usize, -) { +fn assert_cas_store(cas: &StubCAS, testdata: &TestData, chunks: usize, chunk_size: usize) { let blobs = cas.blobs.lock(); - assert_eq!(blobs.get(&fingerprint), Some(&bytes)); + assert_eq!(blobs.get(&testdata.fingerprint()), Some(&testdata.bytes())); let write_message_sizes = cas.write_message_sizes.lock(); assert_eq!(write_message_sizes.len(), chunks); @@ -182,10 +182,12 @@ async fn store_file_one_chunk() { .await .unwrap(); - assert_cas_store(&cas, testdata.fingerprint(), testdata.bytes(), 1, 1024) + assert_cas_store(&cas, &testdata, 1, 1024) } #[tokio::test] async fn store_file_multiple_chunks() { + let testdata = TestData::all_the_henries(); + let cas = StubCAS::empty(); let chunk_size = 10 * 1024; let provider = Provider::new(remote_options( @@ -196,16 +198,15 @@ async fn store_file_multiple_chunks() { .await .unwrap(); - let all_the_henries = big_file_bytes(); - let fingerprint = big_file_fingerprint(); - let digest = Digest::new(fingerprint, all_the_henries.len()); - provider - .store_file(digest, mk_tempfile(Some(&all_the_henries)).await) + .store_file( + testdata.digest(), + mk_tempfile(Some(&testdata.bytes())).await, + ) .await .unwrap(); - assert_cas_store(&cas, fingerprint, all_the_henries, 98, chunk_size) + assert_cas_store(&cas, &testdata, 98, chunk_size) } #[tokio::test] @@ -222,7 +223,7 @@ async fn store_file_empty_file() { .await .unwrap(); - assert_cas_store(&cas, testdata.fingerprint(), testdata.bytes(), 1, 1024) + assert_cas_store(&cas, &testdata, 1, 1024) } #[tokio::test] @@ -256,7 +257,7 @@ async fn store_file_connection_error() { let provider = Provider::new(remote_options( "http://doesnotexist.example".to_owned(), 10 * MEGABYTES, - crate::tests::STORE_BATCH_API_SIZE_LIMIT, + STORE_BATCH_API_SIZE_LIMIT, )) .await .unwrap(); @@ -307,10 +308,12 @@ async fn store_bytes_one_chunk() { .await .unwrap(); - assert_cas_store(&cas, testdata.fingerprint(), testdata.bytes(), 1, 1024) + assert_cas_store(&cas, &testdata, 1, 1024) } #[tokio::test] async fn store_bytes_multiple_chunks() { + let testdata = TestData::all_the_henries(); + let cas = StubCAS::empty(); let chunk_size = 10 * 1024; let provider = Provider::new(remote_options( @@ -321,16 +324,12 @@ async fn store_bytes_multiple_chunks() { .await .unwrap(); - let all_the_henries = big_file_bytes(); - let fingerprint = big_file_fingerprint(); - let digest = Digest::new(fingerprint, all_the_henries.len()); - provider - .store_bytes(digest, all_the_henries.clone()) + .store_bytes(testdata.digest(), testdata.bytes()) .await .unwrap(); - assert_cas_store(&cas, fingerprint, all_the_henries, 98, chunk_size) + assert_cas_store(&cas, &testdata, 98, chunk_size) } #[tokio::test] @@ -344,7 +343,7 @@ async fn store_bytes_empty_file() { .await .unwrap(); - assert_cas_store(&cas, testdata.fingerprint(), testdata.bytes(), 1, 1024) + assert_cas_store(&cas, &testdata, 1, 1024) } #[tokio::test] @@ -374,6 +373,7 @@ async fn store_bytes_batch_grpc_error() { #[tokio::test] async fn store_bytes_write_stream_grpc_error() { + let testdata = TestData::all_the_henries(); let cas = StubCAS::cas_always_errors(); let chunk_size = 10 * 1024; let provider = Provider::new(remote_options( @@ -384,12 +384,8 @@ async fn store_bytes_write_stream_grpc_error() { .await .unwrap(); - let all_the_henries = big_file_bytes(); - let fingerprint = big_file_fingerprint(); - let digest = Digest::new(fingerprint, all_the_henries.len()); - let error = provider - .store_bytes(digest, all_the_henries) + .store_bytes(testdata.digest(), testdata.bytes()) .await .expect_err("Want err"); assert!( @@ -410,7 +406,7 @@ async fn store_bytes_connection_error() { let provider = Provider::new(remote_options( "http://doesnotexist.example".to_owned(), 10 * MEGABYTES, - crate::tests::STORE_BATCH_API_SIZE_LIMIT, + STORE_BATCH_API_SIZE_LIMIT, )) .await .unwrap(); @@ -427,13 +423,15 @@ async fn store_bytes_connection_error() { #[tokio::test] async fn list_missing_digests_none_missing() { - let cas = new_cas(1024); + let testdata = TestData::roland(); + let _ = WorkunitStore::setup_for_tests(); + let cas = StubCAS::builder().file(&testdata).build(); let provider = new_provider(&cas).await; assert_eq!( provider - .list_missing_digests(&mut vec![TestData::roland().digest()].into_iter()) + .list_missing_digests(&mut vec![testdata.digest()].into_iter()) .await, Ok(HashSet::new()) ) diff --git a/src/rust/engine/remote_provider/remote_provider_reapi/src/lib.rs b/src/rust/engine/remote_provider/remote_provider_reapi/src/lib.rs new file mode 100644 index 00000000000..255962f5716 --- /dev/null +++ b/src/rust/engine/remote_provider/remote_provider_reapi/src/lib.rs @@ -0,0 +1,59 @@ +// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +#![deny(warnings)] +// Enable all clippy lints except for many of the pedantic ones. It's a shame this needs to be copied and pasted across crates, but there doesn't appear to be a way to include inner attributes from a common source. +#![deny( + clippy::all, + clippy::default_trait_access, + clippy::expl_impl_clone_on_copy, + clippy::if_not_else, + clippy::needless_continue, + clippy::unseparated_literal_suffix, + clippy::used_underscore_binding +)] +// It is often more clear to show that nothing is being moved. +#![allow(clippy::match_ref_pats)] +// Subjective style. +#![allow( + clippy::len_without_is_empty, + clippy::redundant_field_names, + clippy::too_many_arguments +)] +// Default isn't as big a deal as people seem to think it is. +#![allow(clippy::new_without_default, clippy::new_ret_no_self)] +// Arc can be more clear than needing to grok Orderings: +#![allow(clippy::mutex_atomic)] + +use protos::gen::build::bazel::remote::execution::v2 as remexec; +use tonic::metadata::BinaryMetadataValue; +use tonic::Request; + +use grpc_util::prost::MessageExt; + +pub mod action_cache; +#[cfg(test)] +pub mod action_cache_tests; +pub mod byte_store; +#[cfg(test)] +pub mod byte_store_tests; + +/// Apply REAPI request metadata header to a `tonic::Request`. +pub fn apply_headers(mut request: Request, build_id: &str) -> Request { + let reapi_request_metadata = remexec::RequestMetadata { + tool_details: Some(remexec::ToolDetails { + tool_name: "pants".into(), + ..remexec::ToolDetails::default() + }), + tool_invocation_id: build_id.to_string(), + ..remexec::RequestMetadata::default() + }; + + let md = request.metadata_mut(); + md.insert_bin( + "google.devtools.remoteexecution.v1test.requestmetadata-bin", + BinaryMetadataValue::try_from(reapi_request_metadata.to_bytes()).unwrap(), + ); + + request +} diff --git a/src/rust/engine/remote_provider/remote_provider_traits/Cargo.toml b/src/rust/engine/remote_provider/remote_provider_traits/Cargo.toml new file mode 100644 index 00000000000..d8fddb5d83c --- /dev/null +++ b/src/rust/engine/remote_provider/remote_provider_traits/Cargo.toml @@ -0,0 +1,15 @@ +[package] +version = "0.0.1" +edition = "2021" +name = "remote_provider_traits" +authors = ["Pants Build "] +publish = false + +[dependencies] +async-oncecell = { workspace = true } +async-trait = { workspace = true } +bytes = { workspace = true } +grpc_util = { path = "../../grpc_util" } +hashing = { path = "../../hashing" } +protos = { path = "../../protos" } +tokio = { workspace = true, features = ["fs"] } diff --git a/src/rust/engine/remote_provider/remote_provider_traits/src/lib.rs b/src/rust/engine/remote_provider/remote_provider_traits/src/lib.rs new file mode 100644 index 00000000000..f0587804ece --- /dev/null +++ b/src/rust/engine/remote_provider/remote_provider_traits/src/lib.rs @@ -0,0 +1,135 @@ +// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +#![deny(warnings)] +// Enable all clippy lints except for many of the pedantic ones. It's a shame this needs to be copied and pasted across crates, but there doesn't appear to be a way to include inner attributes from a common source. +#![deny( + clippy::all, + clippy::default_trait_access, + clippy::expl_impl_clone_on_copy, + clippy::if_not_else, + clippy::needless_continue, + clippy::unseparated_literal_suffix, + clippy::used_underscore_binding +)] +// It is often more clear to show that nothing is being moved. +#![allow(clippy::match_ref_pats)] +// Subjective style. +#![allow( + clippy::len_without_is_empty, + clippy::redundant_field_names, + clippy::too_many_arguments +)] +// Default isn't as big a deal as people seem to think it is. +#![allow(clippy::new_without_default, clippy::new_ret_no_self)] +// Arc can be more clear than needing to grok Orderings: +#![allow(clippy::mutex_atomic)] + +use std::collections::{BTreeMap, HashSet}; +use std::sync::Arc; +use std::time::Duration; + +use async_oncecell::OnceCell; +use async_trait::async_trait; +use bytes::Bytes; +use hashing::Digest; +use protos::gen::build::bazel::remote::execution::v2 as remexec; +use remexec::{ActionResult, ServerCapabilities}; +use tokio::fs::File; +use tokio::io::{AsyncSeekExt, AsyncWrite}; + +// TODO: Consider providing `impl Default`, similar to `remote::LocalOptions`. +#[derive(Clone)] +pub struct RemoteOptions { + // TODO: this is currently framed for the REAPI provider, with some options used by others, would + // be good to generalise + pub cas_address: String, + pub instance_name: Option, + pub headers: BTreeMap, + pub tls_config: grpc_util::tls::Config, + pub chunk_size_bytes: usize, + pub rpc_timeout: Duration, + pub rpc_retries: usize, + pub rpc_concurrency_limit: usize, + pub capabilities_cell_opt: Option>>, + pub batch_api_size_limit: usize, +} + +#[derive(Clone)] +pub struct RemoteCacheProviderOptions { + // TODO: this is currently framed for the REAPI provider, with some options used by others, would + // be good to generalise + // TODO: this is structurally very similar to `RemoteOptions`: maybe they should be the same? (see + // comment in `choose_action_cache_provider` too) + pub instance_name: Option, + pub action_cache_address: String, + pub root_ca_certs: Option>, + pub mtls_data: Option<(Vec, Vec)>, + pub headers: BTreeMap, + pub concurrency_limit: usize, + pub rpc_timeout: Duration, +} + +#[async_trait] +pub trait ByteStoreProvider: Sync + Send + 'static { + /// Store the bytes readable from `file` into the remote store + async fn store_file(&self, digest: Digest, file: File) -> Result<(), String>; + + /// Store the bytes in `bytes` into the remote store, as an optimisation of `store_file` when the + /// bytes are already in memory + async fn store_bytes(&self, digest: Digest, bytes: Bytes) -> Result<(), String>; + + /// Load the data stored (if any) in the remote store for `digest` into `destination`. Returns + /// true when found, false when not. + async fn load( + &self, + digest: Digest, + destination: &mut dyn LoadDestination, + ) -> Result; + + /// Return any digests from `digests` that are not (currently) available in the remote store. + async fn list_missing_digests( + &self, + digests: &mut (dyn Iterator + Send), + ) -> Result, String>; +} + +/// Places that write the result of a remote `load` +#[async_trait] +pub trait LoadDestination: AsyncWrite + Send + Sync + Unpin + 'static { + /// Clear out the writer and start again, if there's been previous contents written + async fn reset(&mut self) -> std::io::Result<()>; +} + +#[async_trait] +impl LoadDestination for tokio::fs::File { + async fn reset(&mut self) -> std::io::Result<()> { + self.rewind().await?; + self.set_len(0).await + } +} + +#[async_trait] +impl LoadDestination for Vec { + async fn reset(&mut self) -> std::io::Result<()> { + self.clear(); + Ok(()) + } +} + +/// This `ActionCacheProvider` trait captures the operations required to be able to cache command +/// executions remotely. +#[async_trait] +pub trait ActionCacheProvider: Sync + Send + 'static { + async fn update_action_result( + &self, + action_digest: Digest, + action_result: ActionResult, + ) -> Result<(), String>; + + async fn get_action_result( + &self, + action_digest: Digest, + build_id: &str, + ) -> Result, String>; +} diff --git a/src/rust/engine/remote_provider/src/lib.rs b/src/rust/engine/remote_provider/src/lib.rs new file mode 100644 index 00000000000..556c9648cfd --- /dev/null +++ b/src/rust/engine/remote_provider/src/lib.rs @@ -0,0 +1,128 @@ +// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +#![deny(warnings)] +// Enable all clippy lints except for many of the pedantic ones. It's a shame this needs to be copied and pasted across crates, but there doesn't appear to be a way to include inner attributes from a common source. +#![deny( + clippy::all, + clippy::default_trait_access, + clippy::expl_impl_clone_on_copy, + clippy::if_not_else, + clippy::needless_continue, + clippy::unseparated_literal_suffix, + clippy::used_underscore_binding +)] +// It is often more clear to show that nothing is being moved. +#![allow(clippy::match_ref_pats)] +// Subjective style. +#![allow( + clippy::len_without_is_empty, + clippy::redundant_field_names, + clippy::too_many_arguments +)] +// Default isn't as big a deal as people seem to think it is. +#![allow(clippy::new_without_default, clippy::new_ret_no_self)] +// Arc can be more clear than needing to grok Orderings: +#![allow(clippy::mutex_atomic)] + +use std::sync::Arc; + +// Re-export these so that consumers don't have to know about the exact arrangement of underlying +// crates. +pub use remote_provider_traits::{ + ActionCacheProvider, ByteStoreProvider, LoadDestination, RemoteCacheProviderOptions, + RemoteOptions, +}; + +const REAPI_ADDRESS_SCHEMAS: [&str; 4] = ["grpc://", "grpcs://", "http://", "https://"]; + +// TODO(#19902): a unified view of choosing a provider would be nice +pub async fn choose_byte_store_provider( + options: RemoteOptions, +) -> Result, String> { + let address = options.cas_address.clone(); + if REAPI_ADDRESS_SCHEMAS.iter().any(|s| address.starts_with(s)) { + Ok(Arc::new( + remote_provider_reapi::byte_store::Provider::new(options).await?, + )) + } else if let Some(path) = address.strip_prefix("file://") { + // It's a bit weird to support local "file://" for a 'remote' store... but this is handy for + // testing. + Ok(Arc::new(remote_provider_opendal::Provider::fs( + path, + "byte-store".to_owned(), + options, + )?)) + } else if let Some(url) = address.strip_prefix("github-actions-cache+") { + // This is relying on python validating that it was set as `github-actions-cache+https://...` so + // incorrect values could easily slip through here and cause downstream confusion. We're + // intending to change the approach (https://github.com/pantsbuild/pants/issues/19902) so this + // is tolerable for now. + Ok(Arc::new( + remote_provider_opendal::Provider::github_actions_cache( + url, + "byte-store".to_owned(), + options, + )?, + )) + } else { + Err(format!( + "Cannot initialise remote byte store provider with address {address}, as the scheme is not supported", + )) + } +} + +pub async fn choose_action_cache_provider( + options: RemoteCacheProviderOptions, +) -> Result, String> { + let address = options.action_cache_address.clone(); + + // TODO: we shouldn't need to gin up a whole copy of this struct; it'd be better to have the two + // set of remoting options managed together. + let remote_options = RemoteOptions { + cas_address: address.clone(), + instance_name: options.instance_name.clone(), + headers: options.headers.clone(), + tls_config: grpc_util::tls::Config::new( + options.root_ca_certs.clone(), + options.mtls_data.clone(), + )?, + rpc_timeout: options.rpc_timeout, + rpc_concurrency_limit: options.concurrency_limit, + // TODO: these should either be passed through or not synthesized here + chunk_size_bytes: 0, + rpc_retries: 0, + capabilities_cell_opt: None, + batch_api_size_limit: 0, + }; + + if REAPI_ADDRESS_SCHEMAS.iter().any(|s| address.starts_with(s)) { + Ok(Arc::new( + remote_provider_reapi::action_cache::Provider::new(options).await?, + )) + } else if let Some(path) = address.strip_prefix("file://") { + // It's a bit weird to support local "file://" for a 'remote' store... but this is handy for + // testing. + Ok(Arc::new(remote_provider_opendal::Provider::fs( + path, + "action-cache".to_owned(), + remote_options, + )?)) + } else if let Some(url) = address.strip_prefix("github-actions-cache+") { + // This is relying on python validating that it was set as `github-actions-cache+https://...` so + // incorrect values could easily slip through here and cause downstream confusion. We're + // intending to change the approach (https://github.com/pantsbuild/pants/issues/19902) so this + // is tolerable for now. + Ok(Arc::new( + remote_provider_opendal::Provider::github_actions_cache( + url, + "action-cache".to_owned(), + remote_options, + )?, + )) + } else { + Err(format!( + "Cannot initialise remote action cache provider with address {address}, as the scheme is not supported", + )) + } +} diff --git a/src/rust/engine/testutil/Cargo.toml b/src/rust/engine/testutil/Cargo.toml index a20038c64af..e7551cb3813 100644 --- a/src/rust/engine/testutil/Cargo.toml +++ b/src/rust/engine/testutil/Cargo.toml @@ -13,4 +13,5 @@ grpc_util = { path = "../grpc_util" } hashing = { path = "../hashing" } prost = { workspace = true } protos = { path = "../protos" } +tempfile = { workspace = true } tokio = { workspace = true } diff --git a/src/rust/engine/fs/store/testdata/all_the_henries b/src/rust/engine/testutil/src/all_the_henries.txt similarity index 100% rename from src/rust/engine/fs/store/testdata/all_the_henries rename to src/rust/engine/testutil/src/all_the_henries.txt diff --git a/src/rust/engine/testutil/src/data.rs b/src/rust/engine/testutil/src/data.rs index 556958e0721..63167bd67f1 100644 --- a/src/rust/engine/testutil/src/data.rs +++ b/src/rust/engine/testutil/src/data.rs @@ -33,6 +33,17 @@ impl TestData { ) } + pub fn all_the_henries() -> TestData { + TestData::new(include_str!("./all_the_henries.txt")) + } + + pub fn double_all_the_henries() -> TestData { + let big_file = TestData::all_the_henries(); + TestData { + string: big_file.string.repeat(2), + } + } + pub fn new(s: &str) -> TestData { TestData { string: s.to_owned(), diff --git a/src/rust/engine/testutil/src/file.rs b/src/rust/engine/testutil/src/file.rs index 3a8ed4dc455..d96ee69f2a4 100644 --- a/src/rust/engine/testutil/src/file.rs +++ b/src/rust/engine/testutil/src/file.rs @@ -4,6 +4,8 @@ use std::io::Read; use std::os::unix::fs::PermissionsExt; use std::path::Path; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; + pub fn list_dir(path: &Path) -> Vec { let mut v: Vec<_> = std::fs::read_dir(path) .unwrap_or_else(|err| panic!("Listing dir {path:?}: {err:?}")) @@ -32,3 +34,18 @@ pub fn is_executable(path: &Path) -> bool { .map(|meta| meta.permissions().mode() & 0o100 == 0o100) .unwrap_or(false) } + +pub async fn mk_tempfile(contents: Option<&[u8]>) -> tokio::fs::File { + let file = tokio::task::spawn_blocking(tempfile::tempfile) + .await + .unwrap() + .unwrap(); + let mut file = tokio::fs::File::from_std(file); + + if let Some(contents) = contents { + file.write_all(contents).await.unwrap(); + file.rewind().await.unwrap(); + } + + file +}