Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge "byte store" and "action cache" provider options #20115

Merged
merged 8 commits into from
Oct 31, 2023
13 changes: 6 additions & 7 deletions src/rust/engine/fs/brfs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use log::{debug, error, warn};
use parking_lot::Mutex;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use protos::require_digest;
use store::{RemoteOptions, Store, StoreError};
use store::{RemoteStoreOptions, Store, StoreError};
use tokio::signal::unix::{signal, SignalKind};
use tokio::task;
use tokio_stream::wrappers::SignalStream;
Expand Down Expand Up @@ -792,18 +792,17 @@ async fn main() {
Store::local_only(runtime.clone(), store_path).expect("Error making local store.");
let store = match args.value_of("server-address") {
Some(address) => local_only_store
.into_with_remote(RemoteOptions {
cas_address: address.to_owned(),
.into_with_remote(RemoteStoreOptions {
store_address: address.to_owned(),
instance_name: args.value_of("remote-instance-name").map(str::to_owned),
tls_config,
headers,
chunk_size_bytes: 4 * 1024 * 1024,
rpc_timeout: std::time::Duration::from_secs(5 * 60),
rpc_retries: 1,
rpc_concurrency_limit: args
timeout: std::time::Duration::from_secs(5 * 60),
retries: 1,
concurrency_limit: args
.value_of_t::<usize>("rpc-concurrency-limit")
.expect("Bad rpc-concurrency-limit flag"),
capabilities_cell_opt: None,
batch_api_size_limit: args
.value_of_t::<usize>("batch-api-size-limit")
.expect("Bad batch-api-size-limit flag"),
Expand Down
13 changes: 6 additions & 7 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use protos::require_digest;
use serde_derive::Serialize;
use std::collections::{BTreeMap, BTreeSet};
use store::{
RemoteOptions, Snapshot, SnapshotOps, Store, StoreError, StoreFileByDigest, SubsetParams,
RemoteStoreOptions, Snapshot, SnapshotOps, Store, StoreError, StoreFileByDigest, SubsetParams,
UploadSummary,
};
use workunit_store::WorkunitStore;
Expand Down Expand Up @@ -413,8 +413,8 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {

(
local_only
.into_with_remote(RemoteOptions {
cas_address: cas_address.to_owned(),
.into_with_remote(RemoteStoreOptions {
store_address: cas_address.to_owned(),
instance_name: top_match
.value_of("remote-instance-name")
.map(str::to_owned),
Expand All @@ -426,14 +426,13 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
//
// Make fs_util have a very long deadline (because it's not configurable,
// like it is inside pants).
rpc_timeout: Duration::from_secs(30 * 60),
rpc_retries: top_match
timeout: Duration::from_secs(30 * 60),
retries: top_match
.value_of_t::<usize>("rpc-attempts")
.expect("Bad rpc-attempts flag"),
rpc_concurrency_limit: top_match
concurrency_limit: top_match
.value_of_t::<usize>("rpc-concurrency-limit")
.expect("Bad rpc-concurrency-limit flag"),
capabilities_cell_opt: None,
batch_api_size_limit: top_match
.value_of_t::<usize>("batch-api-size-limit")
.expect("Bad batch-api-size-limit flag"),
Expand Down
7 changes: 5 additions & 2 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ mod remote_tests;

// Consumers of this crate shouldn't need to worry about the exact crate structure that comes
// together to make a store.
pub use remote_provider::RemoteOptions;
pub use remote_provider::RemoteStoreOptions;

pub struct LocalOptions {
pub files_max_size_bytes: usize,
Expand Down Expand Up @@ -379,7 +379,10 @@ impl Store {
/// Add remote storage to a Store. If it is missing a value which it tries to load, it will
/// attempt to back-fill its local storage from the remote storage.
///
pub async fn into_with_remote(self, remote_options: RemoteOptions) -> Result<Store, String> {
pub async fn into_with_remote(
self,
remote_options: RemoteStoreOptions,
) -> Result<Store, String> {
Ok(Store {
local: self.local,
remote: Some(RemoteStore::new(
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::Future;
use hashing::Digest;
use log::Level;
use remote_provider::{
choose_byte_store_provider, ByteStoreProvider, LoadDestination, RemoteOptions,
choose_byte_store_provider, ByteStoreProvider, LoadDestination, RemoteStoreOptions,
};
use tokio::fs::File;
use workunit_store::{in_workunit, ObservationMetric};
Expand Down Expand Up @@ -38,7 +38,7 @@ impl ByteStore {
}
}

pub async fn from_options(options: RemoteOptions) -> Result<ByteStore, String> {
pub async fn from_options(options: RemoteStoreOptions) -> Result<ByteStore, String> {
let instance_name = options.instance_name.clone();
let provider = choose_byte_store_provider(options).await?;
Ok(ByteStore::new(instance_name, provider))
Expand Down
24 changes: 11 additions & 13 deletions src/rust/engine/fs/store/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use bytes::Bytes;
use grpc_util::tls;
use hashing::{Digest, Fingerprint};
use parking_lot::Mutex;
use remote_provider::{ByteStoreProvider, LoadDestination, RemoteOptions};
use remote_provider::{ByteStoreProvider, LoadDestination, RemoteStoreOptions};
use tempfile::TempDir;
use testutil::data::TestData;
use testutil::file::mk_tempfile;
Expand All @@ -29,16 +29,15 @@ async fn smoke_test_from_options_reapi_provider() {

let cas = new_cas(10);

let store = ByteStore::from_options(RemoteOptions {
cas_address: cas.address(),
let store = ByteStore::from_options(RemoteStoreOptions {
store_address: cas.address(),
instance_name: None,
tls_config: tls::Config::default(),
headers: BTreeMap::new(),
chunk_size_bytes: 10 * MEGABYTES,
rpc_timeout: Duration::from_secs(5),
rpc_retries: 1,
rpc_concurrency_limit: 256,
capabilities_cell_opt: None,
timeout: Duration::from_secs(5),
retries: 1,
concurrency_limit: 256,
batch_api_size_limit: crate::tests::STORE_BATCH_API_SIZE_LIMIT,
})
.await
Expand Down Expand Up @@ -78,16 +77,15 @@ async fn smoke_test_from_options_file_provider() {
let _ = WorkunitStore::setup_for_tests();
let dir = TempDir::new().unwrap();

let store = ByteStore::from_options(RemoteOptions {
cas_address: format!("file://{}", dir.path().display()),
let store = ByteStore::from_options(RemoteStoreOptions {
store_address: format!("file://{}", dir.path().display()),
instance_name: None,
tls_config: tls::Config::default(),
headers: BTreeMap::new(),
chunk_size_bytes: 10 * MEGABYTES,
rpc_timeout: Duration::from_secs(5),
rpc_retries: 1,
rpc_concurrency_limit: 256,
capabilities_cell_opt: None,
timeout: Duration::from_secs(5),
retries: 1,
concurrency_limit: 256,
batch_api_size_limit: crate::tests::STORE_BATCH_API_SIZE_LIMIT,
})
.await
Expand Down
17 changes: 8 additions & 9 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use workunit_store::WorkunitStore;

use crate::local::ByteStore;
use crate::{
EntryType, FileContent, RemoteOptions, Snapshot, Store, StoreError, StoreFileByDigest,
EntryType, FileContent, RemoteStoreOptions, Snapshot, Store, StoreError, StoreFileByDigest,
UploadSummary, MEGABYTES,
};

Expand Down Expand Up @@ -59,20 +59,19 @@ fn new_local_store<P: AsRef<Path>>(dir: P) -> Store {
}

fn remote_options(
cas_address: String,
store_address: String,
instance_name: Option<String>,
headers: BTreeMap<String, String>,
) -> RemoteOptions {
RemoteOptions {
cas_address,
) -> RemoteStoreOptions {
RemoteStoreOptions {
store_address,
instance_name,
tls_config: tls::Config::default(),
headers,
chunk_size_bytes: 10 * MEGABYTES,
rpc_timeout: Duration::from_secs(1),
rpc_retries: 1,
rpc_concurrency_limit: 256,
capabilities_cell_opt: None,
timeout: Duration::from_secs(1),
retries: 1,
concurrency_limit: 256,
batch_api_size_limit: STORE_BATCH_API_SIZE_LIMIT,
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/rust/engine/process_execution/remote/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ impl CommandRunner {
overall_deadline: Duration,
retry_interval_duration: Duration,
execution_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
) -> Result<Self, String> {
let needs_tls = execution_address.starts_with("https://");

Expand Down Expand Up @@ -201,7 +200,7 @@ impl CommandRunner {
executor,
overall_deadline,
retry_interval_duration,
capabilities_cell: capabilities_cell_opt.unwrap_or_else(|| Arc::new(OnceCell::new())),
capabilities_cell: Arc::new(OnceCell::new()),
capabilities_client,
};

Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/process_execution/remote/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use process_execution::{make_execute_request, EntireExecuteRequest};

// Consumers of this crate shouldn't need to worry about the exact crate structure that comes
// together to make a remote cache command runner.
pub use remote_provider::RemoteCacheProviderOptions;
pub use remote_provider::RemoteStoreOptions;

#[derive(Clone, Copy, Debug, strum_macros::EnumString)]
#[strum(serialize_all = "snake_case")]
Expand Down Expand Up @@ -112,7 +112,7 @@ impl CommandRunner {

pub async fn from_provider_options(
runner_options: RemoteCacheRunnerOptions,
provider_options: RemoteCacheProviderOptions,
provider_options: RemoteStoreOptions,
) -> Result<Self, String> {
let provider = choose_action_cache_provider(provider_options).await?;
Ok(Self::new(runner_options, provider))
Expand Down
32 changes: 18 additions & 14 deletions src/rust/engine/process_execution/remote/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use grpc_util::tls;
use hashing::{Digest, EMPTY_DIGEST};
use mock::StubCAS;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remote_provider::RemoteCacheProviderOptions;
use store::{RemoteOptions, Store};
use store::{RemoteStoreOptions, Store};
use testutil::data::{TestData, TestDirectory, TestTree};
use workunit_store::{RunId, RunningWorkunit, WorkunitStore};

Expand Down Expand Up @@ -106,16 +105,15 @@ impl StoreSetup {
let store_dir = store_temp_dir.path().join("store_dir");
let store = Store::local_only(executor.clone(), store_dir)
.unwrap()
.into_with_remote(RemoteOptions {
cas_address: cas.address(),
.into_with_remote(RemoteStoreOptions {
store_address: cas.address(),
instance_name: None,
tls_config: tls::Config::default(),
headers: BTreeMap::new(),
chunk_size_bytes: 10 * 1024 * 1024,
rpc_timeout: Duration::from_secs(1),
rpc_retries: 1,
rpc_concurrency_limit: 256,
capabilities_cell_opt: None,
timeout: Duration::from_secs(1),
retries: 1,
concurrency_limit: 256,
batch_api_size_limit: 4 * 1024 * 1024,
})
.await
Expand Down Expand Up @@ -161,13 +159,16 @@ async fn create_cached_runner(
cache_content_behavior,
append_only_caches_base_path: None,
},
RemoteCacheProviderOptions {
RemoteStoreOptions {
instance_name: None,
action_cache_address: store_setup.cas.address(),
store_address: store_setup.cas.address(),
tls_config: tls::Config::default(),
headers: BTreeMap::default(),
concurrency_limit: 256,
rpc_timeout: CACHE_READ_TIMEOUT,
timeout: CACHE_READ_TIMEOUT,
retries: 0,
batch_api_size_limit: 0,
chunk_size_bytes: 0,
},
)
.await
Expand Down Expand Up @@ -761,13 +762,16 @@ async fn make_action_result_basic() {
cache_content_behavior: CacheContentBehavior::Defer,
append_only_caches_base_path: None,
},
RemoteCacheProviderOptions {
RemoteStoreOptions {
instance_name: None,
action_cache_address: cas.address(),
store_address: cas.address(),
tls_config: tls::Config::default(),
headers: BTreeMap::default(),
concurrency_limit: 256,
rpc_timeout: CACHE_READ_TIMEOUT,
timeout: CACHE_READ_TIMEOUT,
retries: 0,
batch_api_size_limit: 0,
chunk_size_bytes: 0,
},
)
.await
Expand Down
20 changes: 7 additions & 13 deletions src/rust/engine/process_execution/remote/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use prost::Message;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use protos::gen::google::longrunning::Operation;
use remexec::{execution_stage::Value as ExecutionStageValue, ExecutedActionMetadata};
use store::{RemoteOptions, SnapshotOps, Store, StoreError};
use store::{RemoteStoreOptions, SnapshotOps, Store, StoreError};
use tempfile::TempDir;
use testutil::data::{TestData, TestDirectory, TestTree};
use testutil::{owned_string_vec, relative_paths};
Expand Down Expand Up @@ -1268,17 +1268,16 @@ async fn server_sending_triggering_timeout_with_deadline_exceeded() {
assert!(result.stdout().contains("user timeout"));
}

fn remote_options_for_cas(cas: &mock::StubCAS) -> RemoteOptions {
RemoteOptions {
cas_address: cas.address(),
fn remote_options_for_cas(cas: &mock::StubCAS) -> RemoteStoreOptions {
RemoteStoreOptions {
store_address: cas.address(),
instance_name: None,
tls_config: tls::Config::default(),
headers: BTreeMap::new(),
chunk_size_bytes: 10 * 1024 * 1024,
rpc_timeout: Duration::from_secs(1),
rpc_retries: 1,
rpc_concurrency_limit: STORE_CONCURRENCY_LIMIT,
capabilities_cell_opt: None,
timeout: Duration::from_secs(1),
retries: 1,
concurrency_limit: STORE_CONCURRENCY_LIMIT,
batch_api_size_limit: STORE_BATCH_API_SIZE_LIMIT,
}
}
Expand Down Expand Up @@ -1338,7 +1337,6 @@ async fn sends_headers() {
OVERALL_DEADLINE_SECS,
RETRY_INTERVAL,
EXEC_CONCURRENCY_LIMIT,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -1498,7 +1496,6 @@ async fn ensure_inline_stdio_is_stored() {
OVERALL_DEADLINE_SECS,
RETRY_INTERVAL,
EXEC_CONCURRENCY_LIMIT,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -1875,7 +1872,6 @@ async fn execute_missing_file_uploads_if_known() {
OVERALL_DEADLINE_SECS,
RETRY_INTERVAL,
EXEC_CONCURRENCY_LIMIT,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -1927,7 +1923,6 @@ async fn execute_missing_file_errors_if_unknown() {
OVERALL_DEADLINE_SECS,
RETRY_INTERVAL,
EXEC_CONCURRENCY_LIMIT,
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -2639,7 +2634,6 @@ async fn create_command_runner(
OVERALL_DEADLINE_SECS,
RETRY_INTERVAL,
EXEC_CONCURRENCY_LIMIT,
None,
)
.await
.expect("Failed to make command runner");
Expand Down
Loading