Skip to content

Commit

Permalink
Feature: Stalled stream protection (#3202)
Browse files Browse the repository at this point in the history
[See the upgrade guide for this feature to learn
more.](awslabs/aws-sdk-rust#956)

The breaking change is to the `MinimumThroughputBody::new` method.

## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here -->
#1562

## Description
<!--- Describe your changes in detail -->
awslabs/aws-sdk-rust#956

## Testing
<!--- Please describe in detail how you tested your changes -->
<!--- Include details of your testing environment, and the tests you ran
to -->
<!--- see how your change affects other areas of the code, etc. -->
I added several tests

## Checklist
<!--- If a checkbox below is not applicable, then please DELETE it
rather than leaving it unchecked -->
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the
smithy-rs codegen or runtime crates
- [x] I have updated `CHANGELOG.next.toml` if I made changes to the AWS
SDK, generated SDK code, or SDK runtime crates

----

_By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice._

---------

Co-authored-by: John DiSanti <jdisanti@amazon.com>
Co-authored-by: Russell Cohen <rcoh@amazon.com>
  • Loading branch information
3 people authored Nov 17, 2023
1 parent ad520b0 commit 768237a
Show file tree
Hide file tree
Showing 32 changed files with 1,219 additions and 63 deletions.
38 changes: 38 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,44 @@
# meta = { "breaking" = false, "tada" = false, "bug" = false, "target" = "client | server | all"}
# author = "rcoh"

[[aws-sdk-rust]]
message = """
Add configurable stalled-stream protection for downloads.
When making HTTP calls,
it's possible for a connection to 'stall out' and emit no more data due to server-side issues.
In the event this happens, it's desirable for the stream to error out as quickly as possible.
While timeouts can protect you from this issue, they aren't adaptive to the amount of data
being sent and so must be configured specifically for each use case. When enabled, stalled-stream
protection will ensure that bad streams error out quickly, regardless of the amount of data being
downloaded.
Protection is enabled by default for all clients but can be configured or disabled.
See [this discussion](https://github.com/awslabs/aws-sdk-rust/discussions/956) for more details.
"""
references = ["smithy-rs#3202"]
meta = { "breaking" = true, "tada" = true, "bug" = false }
author = "Velfi"

[[smithy-rs]]
message = """
Add configurable stalled-stream protection for downloads.
When making HTTP calls,
it's possible for a connection to 'stall out' and emit no more data due to server-side issues.
In the event this happens, it's desirable for the stream to error out as quickly as possible.
While timeouts can protect you from this issue, they aren't adaptive to the amount of data
being sent and so must be configured specifically for each use case. When enabled, stalled-stream
protection will ensure that bad streams error out quickly, regardless of the amount of data being
downloaded.
Protection is enabled by default for all clients but can be configured or disabled.
See [this discussion](https://github.com/awslabs/aws-sdk-rust/discussions/956) for more details.
"""
references = ["smithy-rs#3202"]
meta = { "breaking" = true, "tada" = true, "bug" = false, "target" = "client" }
author = "Velfi"

[[aws-sdk-rust]]
message = "Make certain types for EMR Serverless optional. Previously, they defaulted to 0, but this created invalid requests."
references = ["smithy-rs#3217"]
Expand Down
3 changes: 2 additions & 1 deletion aws/rust-runtime/aws-config/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ allowed_external_types = [
"aws_smithy_runtime_api::box_error::BoxError",
"aws_smithy_runtime::client::identity::cache::IdentityCache",
"aws_smithy_runtime::client::identity::cache::lazy::LazyCacheBuilder",
"aws_smithy_runtime_api::client::behavior_version::BehaviorVersion",
"aws_smithy_runtime_api::client::dns::ResolveDns",
"aws_smithy_runtime_api::client::dns::SharedDnsResolver",
"aws_smithy_runtime_api::client::http::HttpClient",
"aws_smithy_runtime_api::client::http::SharedHttpClient",
"aws_smithy_runtime_api::client::identity::ResolveCachedIdentity",
"aws_smithy_runtime_api::client::identity::ResolveIdentity",
"aws_smithy_runtime_api::client::behavior_version::BehaviorVersion",
"aws_smithy_runtime_api::client::orchestrator::HttpResponse",
"aws_smithy_runtime_api::client::result::SdkError",
"aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig",
"aws_smithy_types::body::SdkBody",
"aws_smithy_types::retry",
"aws_smithy_types::retry::*",
Expand Down
37 changes: 37 additions & 0 deletions aws/rust-runtime/aws-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ pub mod retry;
mod sensitive_command;
#[cfg(feature = "sso")]
pub mod sso;
pub mod stalled_stream_protection;
pub(crate) mod standard_property;
pub mod sts;
pub mod timeout;
Expand Down Expand Up @@ -216,6 +217,7 @@ mod loader {
use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
use aws_smithy_runtime_api::client::http::HttpClient;
use aws_smithy_runtime_api::client::identity::{ResolveCachedIdentity, SharedIdentityCache};
use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
use aws_smithy_runtime_api::shared::IntoShared;
use aws_smithy_types::retry::RetryConfig;
use aws_smithy_types::timeout::TimeoutConfig;
Expand Down Expand Up @@ -259,6 +261,7 @@ mod loader {
use_fips: Option<bool>,
use_dual_stack: Option<bool>,
time_source: Option<SharedTimeSource>,
stalled_stream_protection_config: Option<StalledStreamProtectionConfig>,
env: Option<Env>,
fs: Option<Fs>,
behavior_version: Option<BehaviorVersion>,
Expand Down Expand Up @@ -611,6 +614,39 @@ mod loader {
self
}

/// Override the [`StalledStreamProtectionConfig`] used to build [`SdkConfig`](aws_types::SdkConfig).
///
/// This configures stalled stream protection. When enabled, download streams
/// that stop (stream no data) for longer than a configured grace period will return an error.
///
/// By default, streams that transmit less than one byte per-second for five seconds will
/// be cancelled.
///
/// _Note_: When an override is provided, the default implementation is replaced.
///
/// # Examples
/// ```no_run
/// # async fn create_config() {
/// use aws_config::stalled_stream_protection::StalledStreamProtectionConfig;
/// use std::time::Duration;
/// let config = aws_config::from_env()
/// .stalled_stream_protection(
/// StalledStreamProtectionConfig::enabled()
/// .grace_period(Duration::from_secs(1))
/// .build()
/// )
/// .load()
/// .await;
/// # }
/// ```
pub fn stalled_stream_protection(
mut self,
stalled_stream_protection_config: StalledStreamProtectionConfig,
) -> Self {
self.stalled_stream_protection_config = Some(stalled_stream_protection_config);
self
}

/// Set configuration for all sub-loaders (credentials, region etc.)
///
/// Update the `ProviderConfig` used for all nested loaders. This can be used to override
Expand Down Expand Up @@ -757,6 +793,7 @@ mod loader {
builder.set_endpoint_url(self.endpoint_url);
builder.set_use_fips(use_fips);
builder.set_use_dual_stack(use_dual_stack);
builder.set_stalled_stream_protection(self.stalled_stream_protection_config);
builder.build()
}
}
Expand Down
9 changes: 9 additions & 0 deletions aws/rust-runtime/aws-config/src/stalled_stream_protection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Stalled stream protection configuration
// Re-export from aws-smithy-types
pub use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
3 changes: 2 additions & 1 deletion aws/rust-runtime/aws-types/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ allowed_external_types = [
"aws_smithy_async::rt::sleep::SharedAsyncSleep",
"aws_smithy_async::time::SharedTimeSource",
"aws_smithy_async::time::TimeSource",
"aws_smithy_runtime_api::client::behavior_version::BehaviorVersion",
"aws_smithy_runtime_api::client::http::HttpClient",
"aws_smithy_runtime_api::client::http::SharedHttpClient",
"aws_smithy_runtime_api::client::identity::ResolveCachedIdentity",
"aws_smithy_runtime_api::client::identity::SharedIdentityCache",
"aws_smithy_runtime_api::client::behavior_version::BehaviorVersion",
"aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig",
"aws_smithy_runtime_api::http::headers::Headers",
"aws_smithy_types::config_bag::storable::Storable",
"aws_smithy_types::config_bag::storable::StoreReplace",
Expand Down
81 changes: 81 additions & 0 deletions aws/rust-runtime/aws-types/src/sdk_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
use aws_smithy_runtime_api::client::http::HttpClient;
pub use aws_smithy_runtime_api::client::http::SharedHttpClient;
use aws_smithy_runtime_api::client::identity::{ResolveCachedIdentity, SharedIdentityCache};
pub use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
use aws_smithy_runtime_api::shared::IntoShared;
pub use aws_smithy_types::retry::RetryConfig;
pub use aws_smithy_types::timeout::TimeoutConfig;
Expand Down Expand Up @@ -60,6 +61,7 @@ pub struct SdkConfig {
sleep_impl: Option<SharedAsyncSleep>,
time_source: Option<SharedTimeSource>,
timeout_config: Option<TimeoutConfig>,
stalled_stream_protection_config: Option<StalledStreamProtectionConfig>,
http_client: Option<SharedHttpClient>,
use_fips: Option<bool>,
use_dual_stack: Option<bool>,
Expand All @@ -82,6 +84,7 @@ pub struct Builder {
sleep_impl: Option<SharedAsyncSleep>,
time_source: Option<SharedTimeSource>,
timeout_config: Option<TimeoutConfig>,
stalled_stream_protection_config: Option<StalledStreamProtectionConfig>,
http_client: Option<SharedHttpClient>,
use_fips: Option<bool>,
use_dual_stack: Option<bool>,
Expand Down Expand Up @@ -567,10 +570,82 @@ impl Builder {
use_dual_stack: self.use_dual_stack,
time_source: self.time_source,
behavior_version: self.behavior_version,
stalled_stream_protection_config: self.stalled_stream_protection_config,
}
}
}

impl Builder {
/// Set the [`StalledStreamProtectionConfig`] to configure protection for stalled streams.
///
/// This configures stalled stream protection. When enabled, download streams
/// that stall (stream no data) for longer than a configured grace period will return an error.
///
/// _Note:_ Stalled stream protection requires both a sleep implementation and a time source
/// in order to work. When enabling stalled stream protection, make sure to set
/// - A sleep impl with [Self::sleep_impl] or [Self::set_sleep_impl].
/// - A time source with [Self::time_source] or [Self::set_time_source].
///
/// # Examples
/// ```rust
/// use std::time::Duration;
/// use aws_types::SdkConfig;
/// pub use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
///
/// let stalled_stream_protection_config = StalledStreamProtectionConfig::enabled()
/// .grace_period(Duration::from_secs(1))
/// .build();
/// let config = SdkConfig::builder()
/// .stalled_stream_protection(stalled_stream_protection_config)
/// .build();
/// ```
pub fn stalled_stream_protection(
mut self,
stalled_stream_protection_config: StalledStreamProtectionConfig,
) -> Self {
self.set_stalled_stream_protection(Some(stalled_stream_protection_config));
self
}

/// Set the [`StalledStreamProtectionConfig`] to configure protection for stalled streams.
///
/// This configures stalled stream protection. When enabled, download streams
/// that stall (stream no data) for longer than a configured grace period will return an error.
///
/// By default, streams that transmit less than one byte per-second for five seconds will
/// be cancelled.
///
/// _Note:_ Stalled stream protection requires both a sleep implementation and a time source
/// in order to work. When enabling stalled stream protection, make sure to set
/// - A sleep impl with [Self::sleep_impl] or [Self::set_sleep_impl].
/// - A time source with [Self::time_source] or [Self::set_time_source].
///
/// # Examples
/// ```rust
/// use std::time::Duration;
/// use aws_types::sdk_config::{SdkConfig, Builder};
/// pub use aws_smithy_runtime_api::client::stalled_stream_protection::StalledStreamProtectionConfig;
///
/// fn set_stalled_stream_protection(builder: &mut Builder) {
/// let stalled_stream_protection_config = StalledStreamProtectionConfig::enabled()
/// .grace_period(Duration::from_secs(1))
/// .build();
/// builder.set_stalled_stream_protection(Some(stalled_stream_protection_config));
/// }
///
/// let mut builder = SdkConfig::builder();
/// set_stalled_stream_protection(&mut builder);
/// let config = builder.build();
/// ```
pub fn set_stalled_stream_protection(
&mut self,
stalled_stream_protection_config: Option<StalledStreamProtectionConfig>,
) -> &mut Self {
self.stalled_stream_protection_config = stalled_stream_protection_config;
self
}
}

impl SdkConfig {
/// Configured region
pub fn region(&self) -> Option<&Region> {
Expand Down Expand Up @@ -633,6 +708,11 @@ impl SdkConfig {
self.use_dual_stack
}

/// Configured stalled stream protection
pub fn stalled_stream_protection(&self) -> Option<StalledStreamProtectionConfig> {
self.stalled_stream_protection_config.clone()
}

/// Behavior major version configured for this client
pub fn behavior_version(&self) -> Option<BehaviorVersion> {
self.behavior_version.clone()
Expand Down Expand Up @@ -668,6 +748,7 @@ impl SdkConfig {
use_fips: self.use_fips,
use_dual_stack: self.use_dual_stack,
behavior_version: self.behavior_version,
stalled_stream_protection_config: self.stalled_stream_protection_config,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class GenericSmithySdkConfigSettings : ClientCodegenDecorator {
${section.serviceConfigBuilder}.set_http_client(${section.sdkConfig}.http_client());
${section.serviceConfigBuilder}.set_time_source(${section.sdkConfig}.time_source());
${section.serviceConfigBuilder}.set_behavior_version(${section.sdkConfig}.behavior_version());
// setting `None` here removes the default
if let Some(config) = ${section.sdkConfig}.stalled_stream_protection() {
${section.serviceConfigBuilder}.set_stalled_stream_protection(Some(config));
}
if let Some(cache) = ${section.sdkConfig}.identity_cache() {
${section.serviceConfigBuilder}.set_identity_cache(cache);
Expand Down
3 changes: 3 additions & 0 deletions aws/sdk/integration-tests/dynamodb/tests/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ async fn expect_uri(
let conf = customize(
aws_sdk_dynamodb::config::Builder::from(&conf)
.credentials_provider(Credentials::for_tests())
.stalled_stream_protection(
aws_sdk_dynamodb::config::StalledStreamProtectionConfig::disabled(),
)
.http_client(http_client),
)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/

use aws_sdk_dynamodb::config::{Credentials, Region, SharedAsyncSleep};
use aws_sdk_dynamodb::config::{
Credentials, Region, SharedAsyncSleep, StalledStreamProtectionConfig,
};
use aws_sdk_dynamodb::{config::retry::RetryConfig, error::ProvideErrorMetadata};
use aws_smithy_async::test_util::instant_time_and_sleep;
use aws_smithy_async::time::SharedTimeSource;
Expand Down Expand Up @@ -65,6 +67,7 @@ async fn test_adaptive_retries_with_no_throttling_errors() {

let http_client = StaticReplayClient::new(events);
let config = aws_sdk_dynamodb::Config::builder()
.stalled_stream_protection(StalledStreamProtectionConfig::disabled())
.credentials_provider(Credentials::for_tests())
.region(Region::new("us-east-1"))
.retry_config(
Expand Down Expand Up @@ -120,6 +123,7 @@ async fn test_adaptive_retries_with_throttling_errors() {

let http_client = StaticReplayClient::new(events);
let config = aws_sdk_dynamodb::Config::builder()
.stalled_stream_protection(StalledStreamProtectionConfig::disabled())
.credentials_provider(Credentials::for_tests())
.region(Region::new("us-east-1"))
.retry_config(
Expand Down
3 changes: 2 additions & 1 deletion aws/sdk/integration-tests/dynamodb/tests/shared-config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

use aws_sdk_dynamodb::config::{Credentials, Region};
use aws_sdk_dynamodb::config::{Credentials, Region, StalledStreamProtectionConfig};
use aws_smithy_runtime::client::http::test_util::capture_request;
use http::Uri;

Expand All @@ -12,6 +12,7 @@ use http::Uri;
async fn shared_config_testbed() {
let shared_config = aws_types::SdkConfig::builder()
.region(Region::new("us-east-4"))
.stalled_stream_protection(StalledStreamProtectionConfig::disabled())
.build();
let (http_client, request) = capture_request(None);
let conf = aws_sdk_dynamodb::config::Builder::from(&shared_config)
Expand Down
7 changes: 4 additions & 3 deletions aws/sdk/integration-tests/dynamodb/tests/timeouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
use std::time::Duration;

use aws_credential_types::provider::SharedCredentialsProvider;
use aws_credential_types::Credentials;
use aws_sdk_dynamodb::config::{Credentials, Region, StalledStreamProtectionConfig};
use aws_sdk_dynamodb::error::SdkError;
use aws_smithy_async::rt::sleep::{AsyncSleep, SharedAsyncSleep, Sleep};
use aws_smithy_runtime::client::http::test_util::NeverClient;
use aws_smithy_types::retry::RetryConfig;
use aws_smithy_types::timeout::TimeoutConfig;
use aws_types::region::Region;
use aws_types::SdkConfig;

#[derive(Debug, Clone)]
Expand All @@ -36,9 +35,10 @@ async fn api_call_timeout_retries() {
.build(),
)
.retry_config(RetryConfig::standard())
.stalled_stream_protection(StalledStreamProtectionConfig::disabled())
.sleep_impl(SharedAsyncSleep::new(InstantSleep))
.build();
let client = aws_sdk_dynamodb::Client::from_conf(aws_sdk_dynamodb::Config::new(&conf));
let client = aws_sdk_dynamodb::Client::new(&conf);
let resp = client
.list_tables()
.send()
Expand Down Expand Up @@ -68,6 +68,7 @@ async fn no_retries_on_operation_timeout() {
.operation_timeout(Duration::new(123, 0))
.build(),
)
.stalled_stream_protection(StalledStreamProtectionConfig::disabled())
.retry_config(RetryConfig::standard())
.sleep_impl(SharedAsyncSleep::new(InstantSleep))
.build();
Expand Down
Loading

0 comments on commit 768237a

Please sign in to comment.