Skip to content

Commit

Permalink
Add timeout to remote store calls, and adjust name of cache timeout. (#…
Browse files Browse the repository at this point in the history
…18695)

#16196 moved the cache read timeout down to the network layer, which
made it much more accurate. But cache lookups also involve a number of
calls to a remote `Store`, which did not have their own timeout.

This change adds an RPC timeout for `Store` accesses to allow for
retries of tar-pitted remote store RPCs, and adjusts the naming of the
`--remote-cache-rpc-timeout-millis` option to make it clear that it
applies to all cache operations (including writes).
  • Loading branch information
stuhood authored Apr 6, 2023
1 parent 0689381 commit 317a09d
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 32 deletions.
4 changes: 2 additions & 2 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,14 @@ def __init__(
root_ca_certs_path=execution_options.remote_ca_certs_path,
store_headers=execution_options.remote_store_headers,
store_chunk_bytes=execution_options.remote_store_chunk_bytes,
store_chunk_upload_timeout=execution_options.remote_store_chunk_upload_timeout_seconds,
store_rpc_retries=execution_options.remote_store_rpc_retries,
store_rpc_concurrency=execution_options.remote_store_rpc_concurrency,
store_rpc_timeout_millis=execution_options.remote_store_rpc_timeout_millis,
store_batch_api_size_limit=execution_options.remote_store_batch_api_size_limit,
cache_warnings_behavior=execution_options.remote_cache_warnings.value,
cache_content_behavior=execution_options.cache_content_behavior.value,
cache_rpc_concurrency=execution_options.remote_cache_rpc_concurrency,
cache_read_timeout_millis=execution_options.remote_cache_read_timeout_millis,
cache_rpc_timeout_millis=execution_options.remote_cache_rpc_timeout_millis,
execution_headers=execution_options.remote_execution_headers,
execution_overall_deadline_secs=execution_options.remote_execution_overall_deadline_secs,
execution_rpc_concurrency=execution_options.remote_execution_rpc_concurrency,
Expand Down
40 changes: 31 additions & 9 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,14 +501,14 @@ class ExecutionOptions:
remote_store_address: str | None
remote_store_headers: dict[str, str]
remote_store_chunk_bytes: Any
remote_store_chunk_upload_timeout_seconds: int
remote_store_rpc_retries: int
remote_store_rpc_concurrency: int
remote_store_batch_api_size_limit: int
remote_store_rpc_timeout_millis: int

remote_cache_warnings: RemoteCacheWarningsBehavior
remote_cache_rpc_concurrency: int
remote_cache_read_timeout_millis: int
remote_cache_rpc_timeout_millis: int

remote_execution_address: str | None
remote_execution_headers: dict[str, str]
Expand All @@ -523,6 +523,15 @@ def from_options(
bootstrap_options: OptionValueContainer,
dynamic_remote_options: DynamicRemoteOptions,
) -> ExecutionOptions:
remote_cache_rpc_timeout_millis = resolve_conflicting_options(
old_option="remote_cache_read_timeout_millis",
new_option="remote_cache_rpc_timeout_millis",
old_scope="",
new_scope="",
old_container=bootstrap_options,
new_container=bootstrap_options,
)

return cls(
# Remote execution strategy.
remote_execution=dynamic_remote_options.execution,
Expand All @@ -546,14 +555,14 @@ def from_options(
remote_store_address=dynamic_remote_options.store_address,
remote_store_headers=dynamic_remote_options.store_headers,
remote_store_chunk_bytes=bootstrap_options.remote_store_chunk_bytes,
remote_store_chunk_upload_timeout_seconds=bootstrap_options.remote_store_chunk_upload_timeout_seconds,
remote_store_rpc_retries=bootstrap_options.remote_store_rpc_retries,
remote_store_rpc_concurrency=dynamic_remote_options.store_rpc_concurrency,
remote_store_batch_api_size_limit=bootstrap_options.remote_store_batch_api_size_limit,
remote_store_rpc_timeout_millis=bootstrap_options.remote_store_rpc_timeout_millis,
# Remote cache setup.
remote_cache_warnings=bootstrap_options.remote_cache_warnings,
remote_cache_rpc_concurrency=dynamic_remote_options.cache_rpc_concurrency,
remote_cache_read_timeout_millis=bootstrap_options.remote_cache_read_timeout_millis,
remote_cache_rpc_timeout_millis=remote_cache_rpc_timeout_millis,
# Remote execution setup.
remote_execution_address=dynamic_remote_options.execution_address,
remote_execution_headers=dynamic_remote_options.execution_headers,
Expand Down Expand Up @@ -632,14 +641,14 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions:
"user-agent": f"pants/{VERSION}",
},
remote_store_chunk_bytes=1024 * 1024,
remote_store_chunk_upload_timeout_seconds=60,
remote_store_rpc_retries=2,
remote_store_rpc_concurrency=128,
remote_store_batch_api_size_limit=4194304,
remote_store_rpc_timeout_millis=30000,
# Remote cache setup.
remote_cache_warnings=RemoteCacheWarningsBehavior.backoff,
remote_cache_rpc_concurrency=128,
remote_cache_read_timeout_millis=1500,
remote_cache_rpc_timeout_millis=1500,
# Remote execution setup.
remote_execution_address=None,
remote_execution_headers={
Expand Down Expand Up @@ -1172,7 +1181,6 @@ class BootstrapOptions:
)
process_cleanup = BoolOption(
default=(DEFAULT_EXECUTION_OPTIONS.keep_sandboxes == KeepSandboxes.never),
deprecation_start_version="2.15.0.dev1",
removal_version="3.0.0.dev0",
removal_hint="Use the `keep_sandboxes` option instead.",
help=softwrap(
Expand Down Expand Up @@ -1443,7 +1451,9 @@ class BootstrapOptions:
)
remote_store_chunk_upload_timeout_seconds = IntOption(
advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_store_chunk_upload_timeout_seconds,
default=60,
removal_version="2.19.0.dev0",
removal_hint="Unused: use the `remote_store_rpc_timeout_millis` option instead.",
help="Timeout (in seconds) for uploads of individual chunks to the remote file store.",
)
remote_store_rpc_retries = IntOption(
Expand All @@ -1456,6 +1466,11 @@ class BootstrapOptions:
default=DEFAULT_EXECUTION_OPTIONS.remote_store_rpc_concurrency,
help="The number of concurrent requests allowed to the remote store service.",
)
remote_store_rpc_timeout_millis = IntOption(
advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_store_rpc_timeout_millis,
help="Timeout value for remote store RPCs (not including streaming requests) in milliseconds.",
)
remote_store_batch_api_size_limit = IntOption(
advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_store_batch_api_size_limit,
Expand All @@ -1480,9 +1495,16 @@ class BootstrapOptions:
)
remote_cache_read_timeout_millis = IntOption(
advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_cache_read_timeout_millis,
default=DEFAULT_EXECUTION_OPTIONS.remote_cache_rpc_timeout_millis,
removal_version="2.19.0.dev0",
removal_hint="Use the `remote_cache_rpc_timeout_millis` option instead.",
help="Timeout value for remote cache lookups in milliseconds.",
)
remote_cache_rpc_timeout_millis = IntOption(
advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_cache_rpc_timeout_millis,
help="Timeout value for remote cache RPCs in milliseconds.",
)
remote_execution_address = StrOption(
advanced=True,
default=cast(str, DEFAULT_EXECUTION_OPTIONS.remote_execution_address),
Expand Down
5 changes: 1 addition & 4 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,7 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
// leave this hanging forever.
//
// Make fs_util have a very long deadline (because it's not configurable,
// like it is inside pants) until we switch to Tower (where we can more
// carefully control specific components of timeouts).
//
// See https://github.com/pantsbuild/pants/pull/6433 for more context.
// like it is inside pants).
Duration::from_secs(30 * 60),
top_match
.value_of_t::<usize>("rpc-attempts")
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ impl Store {
tls_config: grpc_util::tls::Config,
headers: BTreeMap<String, String>,
chunk_size_bytes: usize,
upload_timeout: Duration,
rpc_timeout: Duration,
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
Expand All @@ -399,7 +399,7 @@ impl Store {
tls_config,
headers,
chunk_size_bytes,
upload_timeout,
rpc_timeout,
rpc_retries,
rpc_concurrency_limit,
capabilities_cell_opt,
Expand Down
8 changes: 3 additions & 5 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ use remexec::{
use tokio::io::{AsyncSeekExt, AsyncWrite, AsyncWriteExt};
use tokio::sync::Mutex;
use tonic::{Code, Request, Status};
use workunit_store::{in_workunit, ObservationMetric};
use workunit_store::{in_workunit, Metric, ObservationMetric};

use crate::StoreError;

#[derive(Clone)]
pub struct ByteStore {
instance_name: Option<String>,
chunk_size_bytes: usize,
_upload_timeout: Duration,
_rpc_attempts: usize,
byte_stream_client: Arc<ByteStreamClient<LayeredService>>,
cas_client: Arc<ContentAddressableStorageClient<LayeredService>>,
Expand Down Expand Up @@ -131,7 +130,7 @@ impl ByteStore {
tls_config: grpc_util::tls::Config,
mut headers: BTreeMap<String, String>,
chunk_size_bytes: usize,
upload_timeout: Duration,
rpc_timeout: Duration,
rpc_retries: usize,
rpc_concurrency_limit: usize,
capabilities_cell_opt: Option<Arc<OnceCell<ServerCapabilities>>>,
Expand All @@ -150,7 +149,7 @@ impl ByteStore {
tonic::transport::Channel::balance_list(vec![endpoint].into_iter()),
rpc_concurrency_limit,
http_headers,
None,
Some((rpc_timeout, Metric::RemoteStoreRequestTimeouts)),
);

let byte_stream_client = Arc::new(ByteStreamClient::new(channel.clone()));
Expand All @@ -162,7 +161,6 @@ impl ByteStore {
Ok(ByteStore {
instance_name,
chunk_size_bytes,
_upload_timeout: upload_timeout,
_rpc_attempts: rpc_retries + 1,
byte_stream_client,
cas_client,
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/process_execution/remote/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl CommandRunner {
warnings_behavior: RemoteCacheWarningsBehavior,
cache_content_behavior: CacheContentBehavior,
concurrency_limit: usize,
read_timeout: Duration,
rpc_timeout: Duration,
append_only_caches_base_path: Option<String>,
) -> Result<Self, String> {
let tls_client_config = if action_cache_address.starts_with("https://") {
Expand All @@ -99,7 +99,7 @@ impl CommandRunner {
tonic::transport::Channel::balance_list(vec![endpoint].into_iter()),
concurrency_limit,
http_headers,
Some((read_timeout, Metric::RemoteCacheRequestTimeouts)),
Some((rpc_timeout, Metric::RemoteCacheRequestTimeouts)),
);
let action_cache_client = Arc::new(ActionCacheClient::new(channel));

Expand Down
8 changes: 4 additions & 4 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ pub struct RemotingOptions {
pub root_ca_certs_path: Option<PathBuf>,
pub store_headers: BTreeMap<String, String>,
pub store_chunk_bytes: usize,
pub store_chunk_upload_timeout: Duration,
pub store_rpc_retries: usize,
pub store_rpc_concurrency: usize,
pub store_rpc_timeout: Duration,
pub store_batch_api_size_limit: usize,
pub cache_warnings_behavior: RemoteCacheWarningsBehavior,
pub cache_content_behavior: CacheContentBehavior,
pub cache_rpc_concurrency: usize,
pub cache_read_timeout: Duration,
pub cache_rpc_timeout: Duration,
pub execution_headers: BTreeMap<String, String>,
pub execution_overall_deadline: Duration,
pub execution_rpc_concurrency: usize,
Expand Down Expand Up @@ -169,7 +169,7 @@ impl Core {
grpc_util::tls::Config::new_without_mtls(root_ca_certs.clone()),
remoting_opts.store_headers.clone(),
remoting_opts.store_chunk_bytes,
remoting_opts.store_chunk_upload_timeout,
remoting_opts.store_rpc_timeout,
remoting_opts.store_rpc_retries,
remoting_opts.store_rpc_concurrency,
capabilities_cell_opt,
Expand Down Expand Up @@ -337,7 +337,7 @@ impl Core {
remoting_opts.cache_warnings_behavior,
remoting_opts.cache_content_behavior,
remoting_opts.cache_rpc_concurrency,
remoting_opts.cache_read_timeout,
remoting_opts.cache_rpc_timeout,
remoting_opts.append_only_caches_base_path.clone(),
)?);
}
Expand Down
8 changes: 4 additions & 4 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,14 +299,14 @@ impl PyRemotingOptions {
root_ca_certs_path: Option<PathBuf>,
store_headers: BTreeMap<String, String>,
store_chunk_bytes: usize,
store_chunk_upload_timeout: u64,
store_rpc_retries: usize,
store_rpc_concurrency: usize,
store_rpc_timeout_millis: u64,
store_batch_api_size_limit: usize,
cache_warnings_behavior: String,
cache_content_behavior: String,
cache_rpc_concurrency: usize,
cache_read_timeout_millis: u64,
cache_rpc_timeout_millis: u64,
execution_headers: BTreeMap<String, String>,
execution_overall_deadline_secs: u64,
execution_rpc_concurrency: usize,
Expand All @@ -321,15 +321,15 @@ impl PyRemotingOptions {
root_ca_certs_path,
store_headers,
store_chunk_bytes,
store_chunk_upload_timeout: Duration::from_secs(store_chunk_upload_timeout),
store_rpc_retries,
store_rpc_concurrency,
store_rpc_timeout: Duration::from_millis(store_rpc_timeout_millis),
store_batch_api_size_limit,
cache_warnings_behavior: RemoteCacheWarningsBehavior::from_str(&cache_warnings_behavior)
.unwrap(),
cache_content_behavior: CacheContentBehavior::from_str(&cache_content_behavior).unwrap(),
cache_rpc_concurrency,
cache_read_timeout: Duration::from_millis(cache_read_timeout_millis),
cache_rpc_timeout: Duration::from_millis(cache_rpc_timeout_millis),
execution_headers,
execution_overall_deadline: Duration::from_secs(execution_overall_deadline_secs),
execution_rpc_concurrency,
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/workunit_store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub enum Metric {
RemoteExecutionSuccess,
RemoteExecutionTimeouts,
RemoteStoreMissingDigest,
RemoteStoreRequestTimeouts,
/// Number of times that we backtracked due to missing digests.
BacktrackAttempts,
DockerExecutionRequests,
Expand Down

0 comments on commit 317a09d

Please sign in to comment.