Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare remote::remote_cache::CommandRunner for other providers #19459

Merged
merged 4 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 51 additions & 28 deletions src/rust/engine/process_execution/remote/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use process_execution::{
use process_execution::{make_execute_request, EntireExecuteRequest};

mod reapi;
#[cfg(test)]
mod reapi_tests;

#[derive(Clone, Copy, Debug, strum_macros::EnumString)]
#[strum(serialize_all = "snake_case")]
Expand All @@ -39,7 +41,7 @@ pub enum RemoteCacheWarningsBehavior {
/// This `ActionCacheProvider` trait captures the operations required to be able to cache command
/// executions remotely.
#[async_trait]
trait ActionCacheProvider: Sync + Send + 'static {
pub trait ActionCacheProvider: Sync + Send + 'static {
async fn update_action_result(
&self,
action_digest: Digest,
Expand All @@ -53,6 +55,29 @@ trait ActionCacheProvider: Sync + Send + 'static {
) -> Result<Option<ActionResult>, String>;
}

#[derive(Clone)]
pub struct RemoteCacheProviderOptions {
pub instance_name: Option<String>,
pub action_cache_address: String,
pub root_ca_certs: Option<Vec<u8>>,
pub headers: BTreeMap<String, String>,
pub concurrency_limit: usize,
pub rpc_timeout: Duration,
}

pub struct RemoteCacheRunnerOptions {
pub inner: Arc<dyn process_execution::CommandRunner>,
pub instance_name: Option<String>,
pub process_cache_namespace: Option<String>,
pub executor: task_executor::Executor,
pub store: Store,
pub cache_read: bool,
pub cache_write: bool,
pub warnings_behavior: RemoteCacheWarningsBehavior,
pub cache_content_behavior: CacheContentBehavior,
pub append_only_caches_base_path: Option<String>,
}

/// This `CommandRunner` implementation caches results remotely using the Action Cache service
/// of the Remote Execution API.
///
Expand Down Expand Up @@ -80,32 +105,21 @@ pub struct CommandRunner {

impl CommandRunner {
pub fn new(
inner: Arc<dyn process_execution::CommandRunner>,
instance_name: Option<String>,
process_cache_namespace: Option<String>,
executor: task_executor::Executor,
store: Store,
action_cache_address: &str,
root_ca_certs: Option<Vec<u8>>,
headers: BTreeMap<String, String>,
cache_read: bool,
cache_write: bool,
warnings_behavior: RemoteCacheWarningsBehavior,
cache_content_behavior: CacheContentBehavior,
concurrency_limit: usize,
rpc_timeout: Duration,
append_only_caches_base_path: Option<String>,
) -> Result<Self, String> {
let provider = Arc::new(reapi::Provider::new(
instance_name.clone(),
action_cache_address,
root_ca_certs,
headers,
concurrency_limit,
rpc_timeout,
)?);

Ok(CommandRunner {
RemoteCacheRunnerOptions {
inner,
instance_name,
process_cache_namespace,
executor,
store,
cache_read,
cache_write,
warnings_behavior,
cache_content_behavior,
append_only_caches_base_path,
}: RemoteCacheRunnerOptions,
provider: Arc<dyn ActionCacheProvider + 'static>,
) -> Self {
CommandRunner {
inner,
instance_name,
process_cache_namespace,
Expand All @@ -119,7 +133,16 @@ impl CommandRunner {
warnings_behavior,
read_errors_counter: Arc::new(Mutex::new(BTreeMap::new())),
write_errors_counter: Arc::new(Mutex::new(BTreeMap::new())),
})
}
}

pub fn from_provider_options(
runner_options: RemoteCacheRunnerOptions,
provider_options: RemoteCacheProviderOptions,
) -> Result<Self, String> {
let provider = Arc::new(reapi::Provider::new(provider_options)?);

Ok(Self::new(runner_options, provider))
}

/// Create a REAPI `Tree` protobuf for an output directory by traversing down from a Pants
Expand Down
20 changes: 10 additions & 10 deletions src/rust/engine/process_execution/remote/src/remote_cache/reapi.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use grpc_util::retry::{retry_call, status_is_retryable};
Expand All @@ -18,7 +16,7 @@ use crate::remote::apply_headers;
use process_execution::Context;
use tonic::{Code, Request};

use super::ActionCacheProvider;
use super::{ActionCacheProvider, RemoteCacheProviderOptions};

pub struct Provider {
instance_name: Option<String>,
Expand All @@ -27,12 +25,14 @@ pub struct Provider {

impl Provider {
pub fn new(
instance_name: Option<String>,
action_cache_address: &str,
root_ca_certs: Option<Vec<u8>>,
mut headers: BTreeMap<String, String>,
concurrency_limit: usize,
rpc_timeout: Duration,
RemoteCacheProviderOptions {
instance_name,
action_cache_address,
root_ca_certs,
mut headers,
concurrency_limit,
rpc_timeout,
}: RemoteCacheProviderOptions,
) -> Result<Self, String> {
let tls_client_config = if action_cache_address.starts_with("https://") {
Some(grpc_util::tls::Config::new_without_mtls(root_ca_certs).try_into()?)
Expand All @@ -41,7 +41,7 @@ impl Provider {
};

let endpoint = grpc_util::create_endpoint(
action_cache_address,
&action_cache_address,
tls_client_config.as_ref(),
&mut headers,
)?;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::{collections::BTreeMap, time::Duration};

use hashing::Digest;
use mock::StubCAS;
use process_execution::Context;
use protos::gen::build::bazel::remote::execution::v2 as remexec;

use super::{reapi::Provider, ActionCacheProvider, RemoteCacheProviderOptions};

fn new_provider(cas: &StubCAS) -> Provider {
Provider::new(RemoteCacheProviderOptions {
instance_name: None,
action_cache_address: cas.address(),
root_ca_certs: None,
headers: BTreeMap::new(),
concurrency_limit: 256,
rpc_timeout: Duration::from_secs(2),
})
.unwrap()
}

#[tokio::test]
async fn get_action_result_existing() {
let cas = StubCAS::empty();
let provider = new_provider(&cas);

let action_digest = Digest::of_bytes(b"get_action_cache test");
let action_result = remexec::ActionResult {
exit_code: 123,
..Default::default()
};
cas
.action_cache
.action_map
.lock()
.insert(action_digest.hash, action_result.clone());

assert_eq!(
provider
.get_action_result(action_digest, &Context::default())
.await,
Ok(Some(action_result))
);
}

#[tokio::test]
async fn get_action_result_missing() {
let cas = StubCAS::empty();
let provider = new_provider(&cas);

let action_digest = Digest::of_bytes(b"update_action_cache test");

assert_eq!(
provider
.get_action_result(action_digest, &Context::default())
.await,
Ok(None)
);
}

#[tokio::test]
async fn get_action_result_grpc_error() {
let cas = StubCAS::builder().ac_always_errors().build();
let provider = new_provider(&cas);

let action_digest = Digest::of_bytes(b"get_action_result_grpc_error test");

let error = provider
.get_action_result(action_digest, &Context::default())
.await
.expect_err("Want err");

assert!(
error.contains("unavailable"),
"Bad error message, got: {error}"
);
}

#[tokio::test]
async fn update_action_cache() {
let cas = StubCAS::empty();
let provider = new_provider(&cas);

let action_digest = Digest::of_bytes(b"update_action_cache test");
let action_result = remexec::ActionResult {
exit_code: 123,
..Default::default()
};

provider
.update_action_result(action_digest, action_result.clone())
.await
.unwrap();

assert_eq!(
cas.action_cache.action_map.lock()[&action_digest.hash],
action_result
);
}

#[tokio::test]
async fn update_action_cache_grpc_error() {
let cas = StubCAS::builder().ac_always_errors().build();
let provider = new_provider(&cas);

let action_digest = Digest::of_bytes(b"update_action_cache_grpc_error test");
let action_result = remexec::ActionResult {
exit_code: 123,
..Default::default()
};

let error = provider
.update_action_result(action_digest, action_result.clone())
.await
.expect_err("Want err");

assert!(
error.contains("unavailable"),
"Bad error message, got: {error}"
);
}
78 changes: 45 additions & 33 deletions src/rust/engine/process_execution/remote/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use testutil::data::{TestData, TestDirectory, TestTree};
use workunit_store::{RunId, RunningWorkunit, WorkunitStore};

use crate::remote::ensure_action_stored_locally;
use crate::remote_cache::RemoteCacheWarningsBehavior;
use crate::remote_cache::{
RemoteCacheProviderOptions, RemoteCacheRunnerOptions, RemoteCacheWarningsBehavior,
};
use process_execution::{
make_execute_request, CacheContentBehavior, CommandRunner as CommandRunnerTrait, Context,
EntireExecuteRequest, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope,
Expand Down Expand Up @@ -146,22 +148,27 @@ fn create_cached_runner(
cache_content_behavior: CacheContentBehavior,
) -> Box<dyn CommandRunnerTrait> {
Box::new(
crate::remote_cache::CommandRunner::new(
local.into(),
None,
None,
store_setup.executor.clone(),
store_setup.store.clone(),
&store_setup.cas.address(),
None,
BTreeMap::default(),
true,
true,
RemoteCacheWarningsBehavior::FirstOnly,
cache_content_behavior,
256,
CACHE_READ_TIMEOUT,
None,
crate::remote_cache::CommandRunner::from_provider_options(
RemoteCacheRunnerOptions {
inner: local.into(),
instance_name: None,
process_cache_namespace: None,
executor: store_setup.executor.clone(),
store: store_setup.store.clone(),
cache_read: true,
cache_write: true,
warnings_behavior: RemoteCacheWarningsBehavior::FirstOnly,
cache_content_behavior,
append_only_caches_base_path: None,
},
RemoteCacheProviderOptions {
instance_name: None,
action_cache_address: store_setup.cas.address(),
root_ca_certs: None,
headers: BTreeMap::default(),
concurrency_limit: 256,
rpc_timeout: CACHE_READ_TIMEOUT,
},
)
.expect("caching command runner"),
)
Expand Down Expand Up @@ -731,22 +738,27 @@ async fn make_action_result_basic() {

let mock_command_runner = Arc::new(MockCommandRunner);
let cas = StubCAS::builder().build();
let runner = crate::remote_cache::CommandRunner::new(
mock_command_runner.clone(),
None,
None,
executor.clone(),
store.clone(),
&cas.address(),
None,
BTreeMap::default(),
true,
true,
RemoteCacheWarningsBehavior::FirstOnly,
CacheContentBehavior::Defer,
256,
CACHE_READ_TIMEOUT,
None,
let runner = crate::remote_cache::CommandRunner::from_provider_options(
RemoteCacheRunnerOptions {
inner: mock_command_runner.clone(),
instance_name: None,
process_cache_namespace: None,
executor: executor.clone(),
store: store.clone(),
cache_read: true,
cache_write: true,
warnings_behavior: RemoteCacheWarningsBehavior::FirstOnly,
cache_content_behavior: CacheContentBehavior::Defer,
append_only_caches_base_path: None,
},
RemoteCacheProviderOptions {
instance_name: None,
action_cache_address: cas.address(),
root_ca_certs: None,
headers: BTreeMap::default(),
concurrency_limit: 256,
rpc_timeout: CACHE_READ_TIMEOUT,
},
)
.expect("caching command runner");

Expand Down
Loading