diff --git a/src/rust/engine/process_execution/remote/src/remote_cache.rs b/src/rust/engine/process_execution/remote/src/remote_cache.rs index f62fe39435a..460e96d8af7 100644 --- a/src/rust/engine/process_execution/remote/src/remote_cache.rs +++ b/src/rust/engine/process_execution/remote/src/remote_cache.rs @@ -27,6 +27,8 @@ use process_execution::{ use process_execution::{make_execute_request, EntireExecuteRequest}; mod reapi; +#[cfg(test)] +mod reapi_tests; #[derive(Clone, Copy, Debug, strum_macros::EnumString)] #[strum(serialize_all = "snake_case")] @@ -39,7 +41,7 @@ pub enum RemoteCacheWarningsBehavior { /// This `ActionCacheProvider` trait captures the operations required to be able to cache command /// executions remotely. #[async_trait] -trait ActionCacheProvider: Sync + Send + 'static { +pub trait ActionCacheProvider: Sync + Send + 'static { async fn update_action_result( &self, action_digest: Digest, @@ -53,6 +55,29 @@ trait ActionCacheProvider: Sync + Send + 'static { ) -> Result, String>; } +#[derive(Clone)] +pub struct RemoteCacheProviderOptions { + pub instance_name: Option, + pub action_cache_address: String, + pub root_ca_certs: Option>, + pub headers: BTreeMap, + pub concurrency_limit: usize, + pub rpc_timeout: Duration, +} + +pub struct RemoteCacheRunnerOptions { + pub inner: Arc, + pub instance_name: Option, + pub process_cache_namespace: Option, + pub executor: task_executor::Executor, + pub store: Store, + pub cache_read: bool, + pub cache_write: bool, + pub warnings_behavior: RemoteCacheWarningsBehavior, + pub cache_content_behavior: CacheContentBehavior, + pub append_only_caches_base_path: Option, +} + /// This `CommandRunner` implementation caches results remotely using the Action Cache service /// of the Remote Execution API. /// @@ -80,32 +105,21 @@ pub struct CommandRunner { impl CommandRunner { pub fn new( - inner: Arc, - instance_name: Option, - process_cache_namespace: Option, - executor: task_executor::Executor, - store: Store, - action_cache_address: &str, - root_ca_certs: Option>, - headers: BTreeMap, - cache_read: bool, - cache_write: bool, - warnings_behavior: RemoteCacheWarningsBehavior, - cache_content_behavior: CacheContentBehavior, - concurrency_limit: usize, - rpc_timeout: Duration, - append_only_caches_base_path: Option, - ) -> Result { - let provider = Arc::new(reapi::Provider::new( - instance_name.clone(), - action_cache_address, - root_ca_certs, - headers, - concurrency_limit, - rpc_timeout, - )?); - - Ok(CommandRunner { + RemoteCacheRunnerOptions { + inner, + instance_name, + process_cache_namespace, + executor, + store, + cache_read, + cache_write, + warnings_behavior, + cache_content_behavior, + append_only_caches_base_path, + }: RemoteCacheRunnerOptions, + provider: Arc, + ) -> Self { + CommandRunner { inner, instance_name, process_cache_namespace, @@ -119,7 +133,16 @@ impl CommandRunner { warnings_behavior, read_errors_counter: Arc::new(Mutex::new(BTreeMap::new())), write_errors_counter: Arc::new(Mutex::new(BTreeMap::new())), - }) + } + } + + pub fn from_provider_options( + runner_options: RemoteCacheRunnerOptions, + provider_options: RemoteCacheProviderOptions, + ) -> Result { + let provider = Arc::new(reapi::Provider::new(provider_options)?); + + Ok(Self::new(runner_options, provider)) } /// Create a REAPI `Tree` protobuf for an output directory by traversing down from a Pants diff --git a/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs b/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs index 84d8c28d1dd..b7a7cf425ab 100644 --- a/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs +++ b/src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs @@ -1,9 +1,7 @@ // Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::collections::BTreeMap; use std::convert::TryInto; use std::sync::Arc; -use std::time::Duration; use async_trait::async_trait; use grpc_util::retry::{retry_call, status_is_retryable}; @@ -18,7 +16,7 @@ use crate::remote::apply_headers; use process_execution::Context; use tonic::{Code, Request}; -use super::ActionCacheProvider; +use super::{ActionCacheProvider, RemoteCacheProviderOptions}; pub struct Provider { instance_name: Option, @@ -27,12 +25,14 @@ pub struct Provider { impl Provider { pub fn new( - instance_name: Option, - action_cache_address: &str, - root_ca_certs: Option>, - mut headers: BTreeMap, - concurrency_limit: usize, - rpc_timeout: Duration, + RemoteCacheProviderOptions { + instance_name, + action_cache_address, + root_ca_certs, + mut headers, + concurrency_limit, + rpc_timeout, + }: RemoteCacheProviderOptions, ) -> Result { let tls_client_config = if action_cache_address.starts_with("https://") { Some(grpc_util::tls::Config::new_without_mtls(root_ca_certs).try_into()?) @@ -41,7 +41,7 @@ impl Provider { }; let endpoint = grpc_util::create_endpoint( - action_cache_address, + &action_cache_address, tls_client_config.as_ref(), &mut headers, )?; diff --git a/src/rust/engine/process_execution/remote/src/remote_cache/reapi_tests.rs b/src/rust/engine/process_execution/remote/src/remote_cache/reapi_tests.rs new file mode 100644 index 00000000000..d2c95abe0a4 --- /dev/null +++ b/src/rust/engine/process_execution/remote/src/remote_cache/reapi_tests.rs @@ -0,0 +1,123 @@ +// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). +use std::{collections::BTreeMap, time::Duration}; + +use hashing::Digest; +use mock::StubCAS; +use process_execution::Context; +use protos::gen::build::bazel::remote::execution::v2 as remexec; + +use super::{reapi::Provider, ActionCacheProvider, RemoteCacheProviderOptions}; + +fn new_provider(cas: &StubCAS) -> Provider { + Provider::new(RemoteCacheProviderOptions { + instance_name: None, + action_cache_address: cas.address(), + root_ca_certs: None, + headers: BTreeMap::new(), + concurrency_limit: 256, + rpc_timeout: Duration::from_secs(2), + }) + .unwrap() +} + +#[tokio::test] +async fn get_action_result_existing() { + let cas = StubCAS::empty(); + let provider = new_provider(&cas); + + let action_digest = Digest::of_bytes(b"get_action_cache test"); + let action_result = remexec::ActionResult { + exit_code: 123, + ..Default::default() + }; + cas + .action_cache + .action_map + .lock() + .insert(action_digest.hash, action_result.clone()); + + assert_eq!( + provider + .get_action_result(action_digest, &Context::default()) + .await, + Ok(Some(action_result)) + ); +} + +#[tokio::test] +async fn get_action_result_missing() { + let cas = StubCAS::empty(); + let provider = new_provider(&cas); + + let action_digest = Digest::of_bytes(b"update_action_cache test"); + + assert_eq!( + provider + .get_action_result(action_digest, &Context::default()) + .await, + Ok(None) + ); +} + +#[tokio::test] +async fn get_action_result_grpc_error() { + let cas = StubCAS::builder().ac_always_errors().build(); + let provider = new_provider(&cas); + + let action_digest = Digest::of_bytes(b"get_action_result_grpc_error test"); + + let error = provider + .get_action_result(action_digest, &Context::default()) + .await + .expect_err("Want err"); + + assert!( + error.contains("unavailable"), + "Bad error message, got: {error}" + ); +} + +#[tokio::test] +async fn update_action_cache() { + let cas = StubCAS::empty(); + let provider = new_provider(&cas); + + let action_digest = Digest::of_bytes(b"update_action_cache test"); + let action_result = remexec::ActionResult { + exit_code: 123, + ..Default::default() + }; + + provider + .update_action_result(action_digest, action_result.clone()) + .await + .unwrap(); + + assert_eq!( + cas.action_cache.action_map.lock()[&action_digest.hash], + action_result + ); +} + +#[tokio::test] +async fn update_action_cache_grpc_error() { + let cas = StubCAS::builder().ac_always_errors().build(); + let provider = new_provider(&cas); + + let action_digest = Digest::of_bytes(b"update_action_cache_grpc_error test"); + let action_result = remexec::ActionResult { + exit_code: 123, + ..Default::default() + }; + + let error = provider + .update_action_result(action_digest, action_result.clone()) + .await + .expect_err("Want err"); + + assert!( + error.contains("unavailable"), + "Bad error message, got: {error}" + ); +} diff --git a/src/rust/engine/process_execution/remote/src/remote_cache_tests.rs b/src/rust/engine/process_execution/remote/src/remote_cache_tests.rs index f81f0135e3e..45ceea028b1 100644 --- a/src/rust/engine/process_execution/remote/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/remote/src/remote_cache_tests.rs @@ -21,7 +21,9 @@ use testutil::data::{TestData, TestDirectory, TestTree}; use workunit_store::{RunId, RunningWorkunit, WorkunitStore}; use crate::remote::ensure_action_stored_locally; -use crate::remote_cache::RemoteCacheWarningsBehavior; +use crate::remote_cache::{ + RemoteCacheProviderOptions, RemoteCacheRunnerOptions, RemoteCacheWarningsBehavior, +}; use process_execution::{ make_execute_request, CacheContentBehavior, CommandRunner as CommandRunnerTrait, Context, EntireExecuteRequest, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, @@ -146,22 +148,27 @@ fn create_cached_runner( cache_content_behavior: CacheContentBehavior, ) -> Box { Box::new( - crate::remote_cache::CommandRunner::new( - local.into(), - None, - None, - store_setup.executor.clone(), - store_setup.store.clone(), - &store_setup.cas.address(), - None, - BTreeMap::default(), - true, - true, - RemoteCacheWarningsBehavior::FirstOnly, - cache_content_behavior, - 256, - CACHE_READ_TIMEOUT, - None, + crate::remote_cache::CommandRunner::from_provider_options( + RemoteCacheRunnerOptions { + inner: local.into(), + instance_name: None, + process_cache_namespace: None, + executor: store_setup.executor.clone(), + store: store_setup.store.clone(), + cache_read: true, + cache_write: true, + warnings_behavior: RemoteCacheWarningsBehavior::FirstOnly, + cache_content_behavior, + append_only_caches_base_path: None, + }, + RemoteCacheProviderOptions { + instance_name: None, + action_cache_address: store_setup.cas.address(), + root_ca_certs: None, + headers: BTreeMap::default(), + concurrency_limit: 256, + rpc_timeout: CACHE_READ_TIMEOUT, + }, ) .expect("caching command runner"), ) @@ -731,22 +738,27 @@ async fn make_action_result_basic() { let mock_command_runner = Arc::new(MockCommandRunner); let cas = StubCAS::builder().build(); - let runner = crate::remote_cache::CommandRunner::new( - mock_command_runner.clone(), - None, - None, - executor.clone(), - store.clone(), - &cas.address(), - None, - BTreeMap::default(), - true, - true, - RemoteCacheWarningsBehavior::FirstOnly, - CacheContentBehavior::Defer, - 256, - CACHE_READ_TIMEOUT, - None, + let runner = crate::remote_cache::CommandRunner::from_provider_options( + RemoteCacheRunnerOptions { + inner: mock_command_runner.clone(), + instance_name: None, + process_cache_namespace: None, + executor: executor.clone(), + store: store.clone(), + cache_read: true, + cache_write: true, + warnings_behavior: RemoteCacheWarningsBehavior::FirstOnly, + cache_content_behavior: CacheContentBehavior::Defer, + append_only_caches_base_path: None, + }, + RemoteCacheProviderOptions { + instance_name: None, + action_cache_address: cas.address(), + root_ca_certs: None, + headers: BTreeMap::default(), + concurrency_limit: 256, + rpc_timeout: CACHE_READ_TIMEOUT, + }, ) .expect("caching command runner"); diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 11aeacfeb10..19e1fcabb9f 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -44,6 +44,7 @@ use prost::Message; use protos::gen::build::bazel::remote::execution::v2::{Action, Command}; use protos::gen::buildbarn::cas::UncachedActionResult; use protos::require_digest; +use remote::remote_cache::{RemoteCacheProviderOptions, RemoteCacheRunnerOptions}; use store::{ImmutableInputs, RemoteOptions, Store}; use workunit_store::{in_workunit, Level, WorkunitStore}; @@ -316,24 +317,29 @@ async fn main() { let command_runner_box: Box = { Box::new( - remote::remote_cache::CommandRunner::new( - Arc::new(remote_runner), - process_metadata.instance_name.clone(), - process_metadata.cache_key_gen_version.clone(), - executor, - store.clone(), - &address, - root_ca_certs, - headers, - true, - true, - remote::remote_cache::RemoteCacheWarningsBehavior::Backoff, - CacheContentBehavior::Defer, - args.cache_rpc_concurrency, - Duration::from_secs(2), - args - .named_cache_path - .map(|p| p.to_string_lossy().to_string()), + remote::remote_cache::CommandRunner::from_provider_options( + RemoteCacheRunnerOptions { + inner: Arc::new(remote_runner), + instance_name: process_metadata.instance_name.clone(), + process_cache_namespace: process_metadata.cache_key_gen_version.clone(), + executor, + store: store.clone(), + cache_read: true, + cache_write: true, + warnings_behavior: remote::remote_cache::RemoteCacheWarningsBehavior::Backoff, + cache_content_behavior: CacheContentBehavior::Defer, + append_only_caches_base_path: args + .named_cache_path + .map(|p| p.to_string_lossy().to_string()), + }, + RemoteCacheProviderOptions { + instance_name: process_metadata.instance_name.clone(), + action_cache_address: address, + root_ca_certs, + headers, + concurrency_limit: args.cache_rpc_concurrency, + rpc_timeout: Duration::from_secs(2), + }, ) .expect("Failed to make remote cache command runner"), ) diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 3f5cd7855b8..ad021fb03fd 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -33,7 +33,9 @@ use process_execution::{ }; use protos::gen::build::bazel::remote::execution::v2::ServerCapabilities; use regex::Regex; -use remote::remote_cache::RemoteCacheWarningsBehavior; +use remote::remote_cache::{ + RemoteCacheProviderOptions, RemoteCacheRunnerOptions, RemoteCacheWarningsBehavior, +}; use remote::{self, remote_cache}; use rule_graph::RuleGraph; use store::{self, ImmutableInputs, RemoteOptions, Store}; @@ -323,22 +325,27 @@ impl Core { local_cache_write: bool, ) -> Result, String> { if remote_cache_read || remote_cache_write { - runner = Arc::new(remote_cache::CommandRunner::new( - runner, - instance_name, - process_cache_namespace.clone(), - executor.clone(), - full_store.clone(), - remoting_opts.store_address.as_ref().unwrap(), - root_ca_certs.clone(), - remoting_opts.store_headers.clone(), - remote_cache_read, - remote_cache_write, - remoting_opts.cache_warnings_behavior, - remoting_opts.cache_content_behavior, - remoting_opts.cache_rpc_concurrency, - remoting_opts.cache_rpc_timeout, - remoting_opts.append_only_caches_base_path.clone(), + runner = Arc::new(remote_cache::CommandRunner::from_provider_options( + RemoteCacheRunnerOptions { + inner: runner, + instance_name: instance_name.clone(), + process_cache_namespace: process_cache_namespace.clone(), + executor: executor.clone(), + store: full_store.clone(), + cache_read: remote_cache_read, + cache_write: remote_cache_write, + warnings_behavior: remoting_opts.cache_warnings_behavior, + cache_content_behavior: remoting_opts.cache_content_behavior, + append_only_caches_base_path: remoting_opts.append_only_caches_base_path.clone(), + }, + RemoteCacheProviderOptions { + instance_name, + action_cache_address: remoting_opts.store_address.clone().unwrap(), + root_ca_certs: root_ca_certs.clone(), + headers: remoting_opts.store_headers.clone(), + concurrency_limit: remoting_opts.cache_rpc_concurrency, + rpc_timeout: remoting_opts.cache_rpc_timeout, + }, )?); } diff --git a/src/rust/engine/testutil/mock/src/action_cache_service.rs b/src/rust/engine/testutil/mock/src/action_cache_service.rs index 218ab2e9c1b..85eb63b17cd 100644 --- a/src/rust/engine/testutil/mock/src/action_cache_service.rs +++ b/src/rust/engine/testutil/mock/src/action_cache_service.rs @@ -111,6 +111,10 @@ impl ActionCache for ActionCacheResponder { let request = request.into_inner(); + if self.always_errors.load(Ordering::SeqCst) { + return Err(Status::unavailable("unavailable".to_owned())); + } + let action_digest: Digest = match require_digest(request.action_digest.as_ref()) { Ok(digest) => digest, Err(_) => {