From 1ea9d055f09a0c7c2b881a398bae5a39e0d55b9b Mon Sep 17 00:00:00 2001 From: Russell Cohen Date: Tue, 13 Feb 2024 15:19:07 -0500 Subject: [PATCH] Change timeout settings to merge them when configuration is merged (#3405) ## Motivation and Context For context, see https://github.com/smithy-lang/smithy-rs/discussions/3408 ## Description - During `invoke`, load all timeout configs and merge them via a custom loader. - Fix config bag bugs that prevented using a Stored type that differed from `T`. - Add new e2e and codegen integration test validating that timeout settings are properly merged. - Add fallback for an empty timeout config being equivalent to `TimeoutConfig::disabled`. ## Checklist - [x] I have updated `CHANGELOG.next.toml` if I made changes to the smithy-rs codegen or runtime crates - [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS SDK, generated SDK code, or SDK runtime crates ---- _By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice._ --------- Co-authored-by: John DiSanti --- CHANGELOG.next.toml | 24 +- aws/rust-runtime/aws-config/src/lib.rs | 22 +- .../amazon/smithy/rustsdk/TestUtil.kt | 2 +- .../rustsdk/TimeoutConfigMergingTest.kt | 172 ++++++++++++ .../s3/tests/service_timeout_overrides.rs | 62 +++++ .../ResiliencyConfigCustomization.kt | 20 +- .../src/client/orchestrator.rs | 12 +- .../aws-smithy-types/src/config_bag.rs | 17 +- rust-runtime/aws-smithy-types/src/timeout.rs | 260 +++++++++++++++--- 9 files changed, 537 insertions(+), 54 deletions(-) create mode 100644 aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/TimeoutConfigMergingTest.kt diff --git a/CHANGELOG.next.toml b/CHANGELOG.next.toml index 17f6a7aea8..ddcc1dd99f 100644 --- a/CHANGELOG.next.toml +++ b/CHANGELOG.next.toml @@ -37,14 +37,14 @@ author = "rcoh" [[smithy-rs]] message = "Added impl `Display` to Enums." -references = ["smithy-rs#3336","smithy-rs#3391"] -meta = { "breaking" = false, "tada" = false, "bug" = false , "target" = "client" } +references = ["smithy-rs#3336", "smithy-rs#3391"] +meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client" } author = "iampkmone" [[aws-sdk-rust]] message = "Added impl `Display` to Enums." references = ["smithy-rs#3336", "smithy-rs#3391"] -meta = { "breaking" = false, "tada" = false, "bug" = false} +meta = { "breaking" = false, "tada" = false, "bug" = false } author = "iampkmone" [[aws-sdk-rust]] @@ -104,3 +104,21 @@ message = "Retain the SSO token cache between calls to `provide_credentials` whe references = ["smithy-rs#3387"] meta = { "breaking" = false, "bug" = true, "tada" = false } author = "jdisanti" + +[[smithy-rs]] +message = """Fix bug where timeout settings where not merged properly. This will add a default connect timeout of 3.1s seconds for most clients. + +[**For more details see the long-form changelog discussion**](https://github.com/smithy-lang/smithy-rs/discussions/3408).""" + +references = ["smithy-rs#3405", "smithy-rs#3400", "smithy-rs#3258"] +meta = { "bug" = true, "breaking" = true, tada = false, target = "client" } +author = "rcoh" + +[[aws-sdk-rust]] +message = """Fix bug where timeout settings where not merged properly. This will add a default connect timeout of 3.1s seconds for most clients. + +[**For more details see the long-form changelog discussion**](https://github.com/smithy-lang/smithy-rs/discussions/3408).""" + +references = ["smithy-rs#3405", "smithy-rs#3400", "smithy-rs#3258"] +meta = { "bug" = true, "breaking" = true, tada = false } +author = "rcoh" diff --git a/aws/rust-runtime/aws-config/src/lib.rs b/aws/rust-runtime/aws-config/src/lib.rs index afb0f9ea2c..96152efae1 100644 --- a/aws/rust-runtime/aws-config/src/lib.rs +++ b/aws/rust-runtime/aws-config/src/lib.rs @@ -314,6 +314,12 @@ mod loader { /// Override the timeout config used to build [`SdkConfig`]. /// + /// This will be merged with timeouts coming from the timeout information provider, which + /// currently includes a default `CONNECT` timeout of `3.1s`. + /// + /// If you want to disable timeouts, use [`TimeoutConfig::disabled`]. If you want to disable + /// a specific timeout, use `TimeoutConfig::set_(None)`. + /// /// **Note: This only sets timeouts for calls to AWS services.** Timeouts for the credentials /// provider chain are configured separately. /// @@ -728,14 +734,14 @@ mod loader { .await }; - let timeout_config = if let Some(timeout_config) = self.timeout_config { - timeout_config - } else { - timeout_config::default_provider() - .configure(&conf) - .timeout_config() - .await - }; + let base_config = timeout_config::default_provider() + .configure(&conf) + .timeout_config() + .await; + let mut timeout_config = self + .timeout_config + .unwrap_or_else(|| TimeoutConfig::builder().build()); + timeout_config.take_defaults_from(&base_config); let credentials_provider = match self.credentials_provider { CredentialsProviderOption::Set(provider) => Some(provider), diff --git a/aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/TestUtil.kt b/aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/TestUtil.kt index 0da78519c0..5cc0056022 100644 --- a/aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/TestUtil.kt +++ b/aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/TestUtil.kt @@ -51,7 +51,7 @@ fun awsSdkIntegrationTest( fun awsIntegrationTestParams() = IntegrationTestParams( - cargoCommand = "cargo test --features test-util behavior-version-latest", + cargoCommand = "cargo test --features test-util,behavior-version-latest --tests --lib", runtimeConfig = AwsTestRuntimeConfig, additionalSettings = ObjectNode.builder().withMember( diff --git a/aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/TimeoutConfigMergingTest.kt b/aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/TimeoutConfigMergingTest.kt new file mode 100644 index 0000000000..f7e8778dcd --- /dev/null +++ b/aws/sdk-codegen/src/test/kotlin/software/amazon/smithy/rustsdk/TimeoutConfigMergingTest.kt @@ -0,0 +1,172 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.smithy.rustsdk + +import SdkCodegenIntegrationTest +import org.junit.jupiter.api.Test +import software.amazon.smithy.rust.codegen.core.rustlang.Attribute +import software.amazon.smithy.rust.codegen.core.rustlang.rustTemplate +import software.amazon.smithy.rust.codegen.core.rustlang.writable +import software.amazon.smithy.rust.codegen.core.testutil.integrationTest + +class TimeoutConfigMergingTest { + @Test + fun testTimeoutSettingsProperlyMerged() { + awsSdkIntegrationTest(SdkCodegenIntegrationTest.model) { ctx, crate -> + val name = ctx.moduleUseName() + crate.integrationTest("timeout_settings_properly_merged") { + rustTemplate( + """ + + use $name::Client; + use aws_smithy_runtime::test_util::capture_test_logs::capture_test_logs; + use aws_smithy_runtime_api::box_error::BoxError; + use aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextRef; + use aws_smithy_runtime_api::client::interceptors::Intercept; + use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents; + use aws_smithy_runtime::client::http::test_util::infallible_client_fn; + use aws_smithy_types::config_bag::ConfigBag; + use aws_smithy_types::timeout::TimeoutConfig; + use aws_smithy_types::body::SdkBody; + use aws_types::SdkConfig; + use std::sync::Arc; + use std::sync::Mutex; + use std::time::Duration; + + ##[derive(Debug, Clone)] + struct CaptureConfigInterceptor { + timeout_config: Arc>>, + } + + impl Intercept for CaptureConfigInterceptor { + fn name(&self) -> &'static str { + "capture config interceptor" + } + + fn read_before_attempt( + &self, + _context: &BeforeTransmitInterceptorContextRef<'_>, + _runtime_components: &RuntimeComponents, + cfg: &mut ConfigBag, + ) -> Result<(), BoxError> { + *self.timeout_config.lock().unwrap() = cfg.load::().cloned(); + Ok(()) + } + } + #{tokio_test} + async fn test_all_timeouts() { + let (_logs, _guard) = capture_test_logs(); + let connect_timeout = Duration::from_secs(1); + let read_timeout = Duration::from_secs(2); + let operation_attempt = Duration::from_secs(3); + let operation = Duration::from_secs(4); + let http_client = infallible_client_fn(|_req| http::Response::builder().body(SdkBody::empty()).unwrap()); + let sdk_config = SdkConfig::builder() + .timeout_config( + TimeoutConfig::builder() + .connect_timeout(connect_timeout) + .build(), + ) + .http_client(http_client) + .build(); + let client_config = $name::config::Builder::from(&sdk_config) + .timeout_config(TimeoutConfig::builder().read_timeout(read_timeout).build()) + .build(); + let client = Client::from_conf(client_config); + let interceptor = CaptureConfigInterceptor { + timeout_config: Default::default(), + }; + let _err = client + .some_operation() + .customize() + .config_override( + $name::Config::builder().timeout_config( + TimeoutConfig::builder() + .operation_attempt_timeout(operation_attempt) + .operation_timeout(operation) + .build(), + ), + ) + .interceptor(interceptor.clone()) + .send() + .await; + let _ = dbg!(_err); + assert_eq!( + interceptor + .timeout_config + .lock() + .unwrap() + .as_ref() + .expect("timeout config not set"), + &TimeoutConfig::builder() + .operation_timeout(operation) + .operation_attempt_timeout(operation_attempt) + .read_timeout(read_timeout) + .connect_timeout(connect_timeout) + .build(), + "full set of timeouts set from all three sources." + ); + + // disable timeouts + let _err = client + .some_operation() + .customize() + .config_override( + $name::Config::builder().timeout_config( + TimeoutConfig::disabled(), + ), + ) + .interceptor(interceptor.clone()) + .send() + .await; + let _ = dbg!(_err); + assert_eq!( + interceptor + .timeout_config + .lock() + .unwrap() + .as_ref() + .expect("timeout config not set"), + &TimeoutConfig::disabled(), + "timeouts disabled by config override" + ); + + // override one field + let _err = client + .some_operation() + .customize() + .config_override( + $name::Config::builder().timeout_config( + TimeoutConfig::builder().read_timeout(Duration::from_secs(10)).build(), + ), + ) + .interceptor(interceptor.clone()) + .send() + .await; + let _ = dbg!(_err); + assert_eq!( + interceptor + .timeout_config + .lock() + .unwrap() + .as_ref() + .expect("timeout config not set"), + &TimeoutConfig::builder() + .read_timeout(Duration::from_secs(10)) + .connect_timeout(connect_timeout) + .disable_operation_attempt_timeout() + .disable_operation_timeout() + .build(), + "read timeout overridden" + ); + } + """, + "tokio_test" to writable { Attribute.TokioTest.render(this) }, + ) + } + } + } +} diff --git a/aws/sdk/integration-tests/s3/tests/service_timeout_overrides.rs b/aws/sdk/integration-tests/s3/tests/service_timeout_overrides.rs index 40a4fb0578..e2b91f94c0 100644 --- a/aws/sdk/integration-tests/s3/tests/service_timeout_overrides.rs +++ b/aws/sdk/integration-tests/s3/tests/service_timeout_overrides.rs @@ -5,10 +5,13 @@ use aws_credential_types::provider::SharedCredentialsProvider; use aws_credential_types::Credentials; +use aws_smithy_async::assert_elapsed; use aws_smithy_async::rt::sleep::{SharedAsyncSleep, TokioSleep}; use aws_smithy_runtime::client::http::test_util::NeverClient; use aws_smithy_runtime::test_util::capture_test_logs::capture_test_logs; +use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion; use aws_smithy_runtime_api::client::result::SdkError; +use aws_smithy_types::retry::RetryConfig; use aws_smithy_types::timeout::TimeoutConfig; use aws_types::region::Region; use aws_types::SdkConfig; @@ -23,9 +26,11 @@ async fn timeouts_can_be_set_by_service() { .credentials_provider(SharedCredentialsProvider::new(Credentials::for_tests())) .region(Region::from_static("us-east-1")) .sleep_impl(SharedAsyncSleep::new(TokioSleep::new())) + .retry_config(RetryConfig::disabled()) .timeout_config( TimeoutConfig::builder() .operation_timeout(Duration::from_secs(5)) + .read_timeout(Duration::from_secs(1)) .build(), ) .http_client(NeverClient::new()) @@ -42,6 +47,7 @@ async fn timeouts_can_be_set_by_service() { .build(), ) .build(); + let client = aws_sdk_s3::Client::from_conf(config); let start = Instant::now(); let err = client @@ -60,3 +66,59 @@ async fn timeouts_can_be_set_by_service() { // it's shorter than the 5 second timeout if the test is broken assert!(start.elapsed() < Duration::from_millis(500)); } + +/// Ensures that a default timeout from aws-config is still persisted even if an operation_timeout +/// is set. +#[tokio::test] +async fn default_connect_timeout_set() { + let (_guard, _) = capture_test_logs(); + let sdk_config = aws_config::defaults(BehaviorVersion::latest()) + .test_credentials() + .region(Region::from_static("us-east-1")) + .timeout_config( + TimeoutConfig::builder() + .operation_timeout(Duration::from_secs(5)) + .build(), + ) + .retry_config(RetryConfig::disabled()) + // ip that + .endpoint_url( + // Emulate a connect timeout error by hitting an unroutable IP + "http://172.255.255.0:18104", + ) + .load() + .await; + assert_eq!( + sdk_config.timeout_config(), + Some( + &TimeoutConfig::builder() + .connect_timeout(Duration::from_millis(3100)) + .operation_timeout(Duration::from_secs(5)) + .build() + ) + ); + let config = aws_sdk_s3::config::Builder::from(&sdk_config) + .timeout_config( + TimeoutConfig::builder() + .operation_attempt_timeout(Duration::from_secs(4)) + .build(), + ) + .build(); + + let client = aws_sdk_s3::Client::from_conf(config); + let start = Instant::now(); + let err = client + .get_object() + .key("foo") + .bucket("bar") + .send() + .await + .expect_err("unroutable IP should timeout"); + assert!( + matches!(err, SdkError::DispatchFailure { .. }), + "expected DispatchFailure got {}", + err + ); + // ensure that of the three timeouts, the one we hit is connect timeout. + assert_elapsed!(start, Duration::from_millis(3100)); +} diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomization.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomization.kt index 8b6153b9a6..37866def1b 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomization.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/customizations/ResiliencyConfigCustomization.kt @@ -213,7 +213,11 @@ class ResiliencyConfigCustomization(codegenContext: ClientCodegenContext) : Conf self } - /// Set the timeout_config for the builder + /// Set the timeout_config for the builder. + /// + /// Setting this to `None` has no effect if another source of configuration has set timeouts. If you + /// are attempting to disable timeouts, use [`TimeoutConfig::disabled`](#{TimeoutConfig}::disabled) + /// /// /// ## Examples /// @@ -237,10 +241,22 @@ class ResiliencyConfigCustomization(codegenContext: ClientCodegenContext) : Conf *codegenScope, ) + // A timeout config can be set from SdkConfig. We want to merge that with a timeout config set here. + // Ideally, we would actually preserve `SdkConfig` as a separate layer (probably by converting it into + // its own runtime plugin). In the short term, this functionality accomplishes that for + // timeout configs. rustTemplate( """ pub fn set_timeout_config(&mut self, timeout_config: #{Option}<#{TimeoutConfig}>) -> &mut Self { - timeout_config.map(|t| self.config.store_put(t)); + // passing None has no impact. + let Some(mut timeout_config) = timeout_config else { + return self + }; + + if let Some(base) = self.config.load::<#{TimeoutConfig}>() { + timeout_config.take_defaults_from(base); + } + self.config.store_put(timeout_config); self } """, diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs index ef64869157..680c5df235 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator.rs @@ -30,7 +30,7 @@ use aws_smithy_runtime_api::client::ser_de::{ use aws_smithy_types::body::SdkBody; use aws_smithy_types::byte_stream::ByteStream; use aws_smithy_types::config_bag::ConfigBag; -use aws_smithy_types::timeout::TimeoutConfig; +use aws_smithy_types::timeout::{MergeTimeoutConfig, TimeoutConfig}; use std::mem; use tracing::{debug, debug_span, instrument, trace, Instrument}; @@ -193,6 +193,16 @@ fn apply_configuration( .merge_from(&operation_rc_builder) .build()?; + // In an ideal world, we'd simply update `cfg.load` to behave this way. Unfortunately, we can't + // do that without a breaking change. By overwriting the value in the config bag with a merged + // version, we can achieve a very similar behavior. `MergeTimeoutConfig` + let resolved_timeout_config = cfg.load::(); + tracing::debug!( + "timeout settings for this operation: {:?}", + resolved_timeout_config + ); + cfg.interceptor_state().store_put(resolved_timeout_config); + components.validate_final_config(cfg)?; Ok(components) } diff --git a/rust-runtime/aws-smithy-types/src/config_bag.rs b/rust-runtime/aws-smithy-types/src/config_bag.rs index 7936524468..6ca13c0f80 100644 --- a/rust-runtime/aws-smithy-types/src/config_bag.rs +++ b/rust-runtime/aws-smithy-types/src/config_bag.rs @@ -264,9 +264,10 @@ impl CloneableLayer { where T::StoredType: Clone, { - self.0 - .props - .insert(TypeId::of::(), TypeErasedBox::new_with_clone(value)); + self.0.props.insert( + TypeId::of::(), + TypeErasedBox::new_with_clone(value), + ); self } @@ -320,7 +321,7 @@ impl CloneableLayer { { self.0 .props - .entry(TypeId::of::()) + .entry(TypeId::of::()) .or_insert_with(|| TypeErasedBox::new_with_clone(T::StoredType::default())) .downcast_mut() .expect("typechecked") @@ -371,7 +372,7 @@ impl Layer { /// Inserts `value` into the layer directly fn put_directly(&mut self, value: T::StoredType) -> &mut Self { self.props - .insert(TypeId::of::(), TypeErasedBox::new(value)); + .insert(TypeId::of::(), TypeErasedBox::new(value)); self } @@ -490,14 +491,14 @@ impl Layer { /// Retrieves the value of type `T` from this layer if exists fn get(&self) -> Option<&T::StoredType> { self.props - .get(&TypeId::of::()) + .get(&TypeId::of::()) .map(|t| t.downcast_ref().expect("typechecked")) } /// Returns a mutable reference to `T` if it is stored in this layer fn get_mut(&mut self) -> Option<&mut T::StoredType> { self.props - .get_mut(&TypeId::of::()) + .get_mut(&TypeId::of::()) .map(|t| t.downcast_mut().expect("typechecked")) } @@ -508,7 +509,7 @@ impl Layer { T::StoredType: Default, { self.props - .entry(TypeId::of::()) + .entry(TypeId::of::()) .or_insert_with(|| TypeErasedBox::new(T::StoredType::default())) .downcast_mut() .expect("typechecked") diff --git a/rust-runtime/aws-smithy-types/src/timeout.rs b/rust-runtime/aws-smithy-types/src/timeout.rs index 160895ceb1..437da99e13 100644 --- a/rust-runtime/aws-smithy-types/src/timeout.rs +++ b/rust-runtime/aws-smithy-types/src/timeout.rs @@ -6,17 +6,65 @@ //! This module defines types that describe timeouts that can be applied to various stages of the //! Smithy networking stack. -use crate::config_bag::{Storable, StoreReplace}; +use crate::config_bag::value::Value; +use crate::config_bag::{ItemIter, Storable, Store, StoreReplace}; use std::time::Duration; +#[derive(Clone, Debug, PartialEq, Copy)] +enum CanDisable { + Disabled, + Unset, + Set(T), +} + +impl CanDisable { + fn none_implies_disabled(value: Option) -> Self { + match value { + Some(t) => CanDisable::Set(t), + None => CanDisable::Disabled, + } + } + + fn is_some(&self) -> bool { + matches!(self, CanDisable::Set(_)) + } + + fn value(self) -> Option { + match self { + CanDisable::Set(v) => Some(v), + _ => None, + } + } + + fn merge_from_lower_priority(self, other: Self) -> Self { + match (self, other) { + // if we are unset. take the value from the other + (CanDisable::Unset, value) => value, + (us, _) => us, + } + } +} + +impl From for CanDisable { + fn from(value: T) -> Self { + Self::Set(value) + } +} + +impl Default for CanDisable { + fn default() -> Self { + Self::Unset + } +} + /// Builder for [`TimeoutConfig`]. #[non_exhaustive] #[derive(Clone, Debug, Default)] pub struct TimeoutConfigBuilder { - connect_timeout: Option, - read_timeout: Option, - operation_timeout: Option, - operation_attempt_timeout: Option, + connect_timeout: CanDisable, + read_timeout: CanDisable, + operation_timeout: CanDisable, + operation_attempt_timeout: CanDisable, } impl TimeoutConfigBuilder { @@ -29,15 +77,23 @@ impl TimeoutConfigBuilder { /// /// The connect timeout is a limit on the amount of time it takes to initiate a socket connection. pub fn connect_timeout(mut self, connect_timeout: Duration) -> Self { - self.connect_timeout = Some(connect_timeout); + self.connect_timeout = connect_timeout.into(); self } /// Sets the connect timeout. /// + /// If `None` is passed, this will explicitly disable the connection timeout. + /// /// The connect timeout is a limit on the amount of time it takes to initiate a socket connection. pub fn set_connect_timeout(&mut self, connect_timeout: Option) -> &mut Self { - self.connect_timeout = connect_timeout; + self.connect_timeout = CanDisable::none_implies_disabled(connect_timeout); + self + } + + /// Disables the connect timeout + pub fn disable_connect_timeout(mut self) -> Self { + self.connect_timeout = CanDisable::Disabled; self } @@ -46,16 +102,24 @@ impl TimeoutConfigBuilder { /// The read timeout is the limit on the amount of time it takes to read the first byte of a response /// from the time the request is initiated. pub fn read_timeout(mut self, read_timeout: Duration) -> Self { - self.read_timeout = Some(read_timeout); + self.read_timeout = read_timeout.into(); self } /// Sets the read timeout. /// + /// If `None` is passed, this will explicitly disable the read timeout. To disable all timeouts use [`TimeoutConfig::disabled`]. + /// /// The read timeout is the limit on the amount of time it takes to read the first byte of a response /// from the time the request is initiated. pub fn set_read_timeout(&mut self, read_timeout: Option) -> &mut Self { - self.read_timeout = read_timeout; + self.read_timeout = CanDisable::none_implies_disabled(read_timeout); + self + } + + /// Disables the read timeout + pub fn disable_read_timeout(mut self) -> Self { + self.read_timeout = CanDisable::Disabled; self } @@ -68,12 +132,14 @@ impl TimeoutConfigBuilder { /// If you want to set a timeout on individual retry attempts, then see [`Self::operation_attempt_timeout`] /// or [`Self::set_operation_attempt_timeout`]. pub fn operation_timeout(mut self, operation_timeout: Duration) -> Self { - self.operation_timeout = Some(operation_timeout); + self.operation_timeout = operation_timeout.into(); self } /// Sets the operation timeout. /// + /// If `None` is passed, this will explicitly disable the read timeout. To disable all timeouts use [`TimeoutConfig::disabled`]. + /// /// An operation represents the full request/response lifecycle of a call to a service. /// The operation timeout is a limit on the total amount of time it takes for an operation to be /// fully serviced, including the time for all retries that may have been attempted for it. @@ -81,7 +147,13 @@ impl TimeoutConfigBuilder { /// If you want to set a timeout on individual retry attempts, then see [`Self::operation_attempt_timeout`] /// or [`Self::set_operation_attempt_timeout`]. pub fn set_operation_timeout(&mut self, operation_timeout: Option) -> &mut Self { - self.operation_timeout = operation_timeout; + self.operation_timeout = CanDisable::none_implies_disabled(operation_timeout); + self + } + + /// Disables the operation timeout + pub fn disable_operation_timeout(mut self) -> Self { + self.operation_timeout = CanDisable::Disabled; self } @@ -94,12 +166,14 @@ impl TimeoutConfigBuilder { /// If you want to set a timeout on the total time for an entire request including all of its retries, /// then see [`Self::operation_timeout`] /// or [`Self::set_operation_timeout`]. pub fn operation_attempt_timeout(mut self, operation_attempt_timeout: Duration) -> Self { - self.operation_attempt_timeout = Some(operation_attempt_timeout); + self.operation_attempt_timeout = operation_attempt_timeout.into(); self } /// Sets the operation attempt timeout. /// + /// If `None` is passed, this will explicitly disable the operation timeout. To disable all timeouts use [`TimeoutConfig::disabled`]. + /// /// An operation represents the full request/response lifecycle of a call to a service. /// When retries are enabled, then this setting makes it possible to set a timeout for individual /// retry attempts (including the initial attempt) for an operation. @@ -110,7 +184,14 @@ impl TimeoutConfigBuilder { &mut self, operation_attempt_timeout: Option, ) -> &mut Self { - self.operation_attempt_timeout = operation_attempt_timeout; + self.operation_attempt_timeout = + CanDisable::none_implies_disabled(operation_attempt_timeout); + self + } + + /// Disables the operation_attempt timeout + pub fn disable_operation_attempt_timeout(mut self) -> Self { + self.operation_attempt_timeout = CanDisable::Disabled; self } @@ -139,12 +220,18 @@ impl TimeoutConfigBuilder { /// ``` pub fn take_unset_from(self, other: Self) -> Self { Self { - connect_timeout: self.connect_timeout.or(other.connect_timeout), - read_timeout: self.read_timeout.or(other.read_timeout), - operation_timeout: self.operation_timeout.or(other.operation_timeout), + connect_timeout: self + .connect_timeout + .merge_from_lower_priority(other.connect_timeout), + read_timeout: self + .read_timeout + .merge_from_lower_priority(other.read_timeout), + operation_timeout: self + .operation_timeout + .merge_from_lower_priority(other.operation_timeout), operation_attempt_timeout: self .operation_attempt_timeout - .or(other.operation_attempt_timeout), + .merge_from_lower_priority(other.operation_attempt_timeout), } } @@ -203,16 +290,53 @@ impl From for TimeoutConfigBuilder { #[non_exhaustive] #[derive(Clone, PartialEq, Debug)] pub struct TimeoutConfig { - connect_timeout: Option, - read_timeout: Option, - operation_timeout: Option, - operation_attempt_timeout: Option, + connect_timeout: CanDisable, + read_timeout: CanDisable, + operation_timeout: CanDisable, + operation_attempt_timeout: CanDisable, } impl Storable for TimeoutConfig { type Storer = StoreReplace; } +/// Merger which merges timeout config settings when loading. +/// +/// If no timeouts are set, `TimeoutConfig::disabled()` will be returned. +/// +/// This API is not meant to be used externally. +#[derive(Debug)] +pub struct MergeTimeoutConfig; + +impl Storable for MergeTimeoutConfig { + type Storer = MergeTimeoutConfig; +} +impl Store for MergeTimeoutConfig { + type ReturnedType<'a> = TimeoutConfig; + type StoredType = as Store>::StoredType; + + fn merge_iter(iter: ItemIter<'_, Self>) -> Self::ReturnedType<'_> { + let mut result: Option = None; + // The item iterator iterates "backwards" over the config bags, starting at the highest + // priority layers and works backwards + for tc in iter { + match (result.as_mut(), tc) { + (Some(result), Value::Set(tc)) => { + // This maintains backwards compatible behavior where setting an EMPTY timeout config is equivalent to `TimeoutConfig::disabled()` + if result.has_timeouts() { + result.take_defaults_from(tc); + } + } + (None, Value::Set(tc)) => { + result = Some(tc.clone()); + } + (_, Value::ExplicitlyUnset(_)) => result = Some(TimeoutConfig::disabled()), + } + } + result.unwrap_or(TimeoutConfig::disabled()) + } +} + impl TimeoutConfig { /// Returns a builder to create a `TimeoutConfig`. pub fn builder() -> TimeoutConfigBuilder { @@ -229,13 +353,30 @@ impl TimeoutConfig { TimeoutConfigBuilder::from(self) } + /// Fill any unfilled values in `self` from `other`. + pub fn take_defaults_from(&mut self, other: &TimeoutConfig) -> &mut Self { + self.connect_timeout = self + .connect_timeout + .merge_from_lower_priority(other.connect_timeout); + self.read_timeout = self + .read_timeout + .merge_from_lower_priority(other.read_timeout); + self.operation_timeout = self + .operation_timeout + .merge_from_lower_priority(other.operation_timeout); + self.operation_attempt_timeout = self + .operation_attempt_timeout + .merge_from_lower_priority(other.operation_attempt_timeout); + self + } + /// Returns a timeout config with all timeouts disabled. pub fn disabled() -> TimeoutConfig { TimeoutConfig { - connect_timeout: None, - read_timeout: None, - operation_timeout: None, - operation_attempt_timeout: None, + connect_timeout: CanDisable::Disabled, + read_timeout: CanDisable::Disabled, + operation_timeout: CanDisable::Disabled, + operation_attempt_timeout: CanDisable::Disabled, } } @@ -243,7 +384,7 @@ impl TimeoutConfig { /// /// The connect timeout is a limit on the amount of time it takes to initiate a socket connection. pub fn connect_timeout(&self) -> Option { - self.connect_timeout + self.connect_timeout.value() } /// Returns this config's read timeout. @@ -251,7 +392,7 @@ impl TimeoutConfig { /// The read timeout is the limit on the amount of time it takes to read the first byte of a response /// from the time the request is initiated. pub fn read_timeout(&self) -> Option { - self.read_timeout + self.read_timeout.value() } /// Returns this config's operation timeout. @@ -260,7 +401,7 @@ impl TimeoutConfig { /// The operation timeout is a limit on the total amount of time it takes for an operation to be /// fully serviced, including the time for all retries that may have been attempted for it. pub fn operation_timeout(&self) -> Option { - self.operation_timeout + self.operation_timeout.value() } /// Returns this config's operation attempt timeout. @@ -269,12 +410,13 @@ impl TimeoutConfig { /// When retries are enabled, then this setting makes it possible to set a timeout for individual /// retry attempts (including the initial attempt) for an operation. pub fn operation_attempt_timeout(&self) -> Option { - self.operation_attempt_timeout + self.operation_attempt_timeout.value() } /// Returns true if any of the possible timeouts are set. pub fn has_timeouts(&self) -> bool { self.connect_timeout.is_some() + || self.read_timeout.is_some() || self.operation_timeout.is_some() || self.operation_attempt_timeout.is_some() } @@ -316,8 +458,8 @@ impl OperationTimeoutConfig { impl From<&TimeoutConfig> for OperationTimeoutConfig { fn from(cfg: &TimeoutConfig) -> Self { OperationTimeoutConfig { - operation_timeout: cfg.operation_timeout, - operation_attempt_timeout: cfg.operation_attempt_timeout, + operation_timeout: cfg.operation_timeout.value(), + operation_attempt_timeout: cfg.operation_attempt_timeout.value(), } } } @@ -327,3 +469,59 @@ impl From for OperationTimeoutConfig { OperationTimeoutConfig::from(&cfg) } } + +#[cfg(test)] +mod test { + use crate::config_bag::{CloneableLayer, ConfigBag}; + use crate::timeout::{MergeTimeoutConfig, TimeoutConfig}; + use std::time::Duration; + + #[test] + fn timeout_configs_merged_in_config_bag() { + let mut read_timeout = CloneableLayer::new("timeout"); + read_timeout.store_put( + TimeoutConfig::builder() + .read_timeout(Duration::from_secs(3)) + .connect_timeout(Duration::from_secs(1)) + .build(), + ); + let mut operation_timeout = CloneableLayer::new("timeout"); + operation_timeout.store_put( + TimeoutConfig::builder() + .operation_timeout(Duration::from_secs(5)) + .connect_timeout(Duration::from_secs(10)) + .build(), + ); + let cfg = ConfigBag::of_layers(vec![read_timeout.into(), operation_timeout.into()]); + let loaded = cfg.load::(); + // set by base layer + assert_eq!(loaded.read_timeout(), Some(Duration::from_secs(3))); + + // set by higher layer + assert_eq!(loaded.operation_timeout(), Some(Duration::from_secs(5))); + + // overridden by higher layer + assert_eq!(loaded.connect_timeout(), Some(Duration::from_secs(10))); + let mut next = cfg.add_layer("disabled"); + next.interceptor_state() + .store_put(TimeoutConfig::disabled()); + + assert_eq!(next.load::().read_timeout(), None); + + // builder().build() acts equivalently to disabled + next.interceptor_state() + .store_put(TimeoutConfig::builder().build()); + assert_eq!(next.load::().read_timeout(), None); + + // But if instead, you set a field of the timeout config, it will merge as expected. + next.interceptor_state().store_put( + TimeoutConfig::builder() + .operation_attempt_timeout(Duration::from_secs(1)) + .build(), + ); + assert_eq!( + next.load::().read_timeout(), + Some(Duration::from_secs(3)) + ); + } +}