Skip to content

Commit

Permalink
Add jitter to LazyCredentialsCache (smithy-lang#2335)
Browse files Browse the repository at this point in the history
* Add jitter to `LazyCredentialsCache`

This commit adds jitter to `LazyCredentialsCache`. A jitter provides a
mechanism for randomizing the buffer time for credentials. This allows
credentials with the same expiry to expire at slightly different times,
thereby preventing thundering herds.

* Update CHANGELOG.next.toml

---------

Co-authored-by: Yuki Saito <awsaito@amazon.com>
  • Loading branch information
ysaito1001 and ysaito1001 authored Feb 10, 2023
1 parent cdc710d commit 0a11d51
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 8 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.next.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,9 @@ message = "Upgrade Rust MSRV to 1.63.0"
references = ["smithy-rs#2222"]
meta = { "breaking" = true, "tada" = true, "bug" = false, "target" = "all" }
author = "Nugine"

[[aws-sdk-rust]]
message = "Adds jitter to `LazyCredentialsCache`. This allows credentials with the same expiry to expire at slightly different times, thereby preventing thundering herds."
references = ["smithy-rs#2335"]
meta = { "breaking" = false, "tada" = false, "bug" = false }
author = "ysaito1001"
1 change: 1 addition & 0 deletions aws/rust-runtime/aws-credential-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ test-util = []
[dependencies]
aws-smithy-async = { path = "../../../rust-runtime/aws-smithy-async" }
aws-smithy-types = { path = "../../../rust-runtime/aws-smithy-types" }
fastrand = "1.4.0"
tokio = { version = "1.8.4", features = ["sync"] }
tracing = "0.1"
zeroize = "1"
Expand Down
97 changes: 89 additions & 8 deletions aws/rust-runtime/aws-credential-types/src/cache/lazy_caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::time_source::TimeSource;
const DEFAULT_LOAD_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_CREDENTIAL_EXPIRATION: Duration = Duration::from_secs(15 * 60);
const DEFAULT_BUFFER_TIME: Duration = Duration::from_secs(10);
const DEFAULT_BUFFER_TIME_JITTER_FRACTION: fn() -> f64 = fastrand::f64;

#[derive(Debug)]
pub(crate) struct LazyCredentialsCache {
Expand All @@ -28,6 +29,8 @@ pub(crate) struct LazyCredentialsCache {
cache: ExpiringCache<Credentials, CredentialsError>,
provider: SharedCredentialsProvider,
load_timeout: Duration,
buffer_time: Duration,
buffer_time_jitter_fraction: fn() -> f64,
default_credential_expiration: Duration,
}

Expand All @@ -37,15 +40,18 @@ impl LazyCredentialsCache {
sleeper: Arc<dyn AsyncSleep>,
provider: SharedCredentialsProvider,
load_timeout: Duration,
default_credential_expiration: Duration,
buffer_time: Duration,
buffer_time_jitter_fraction: fn() -> f64,
default_credential_expiration: Duration,
) -> Self {
Self {
time,
sleeper,
cache: ExpiringCache::new(buffer_time),
provider,
load_timeout,
buffer_time,
buffer_time_jitter_fraction,
default_credential_expiration,
}
}
Expand Down Expand Up @@ -95,7 +101,12 @@ impl ProvideCachedCredentials for LazyCredentialsCache {
let expiry = credentials
.expiry()
.unwrap_or(now + default_credential_expiration);
Ok((credentials, expiry))

let jitter = self
.buffer_time
.mul_f64((self.buffer_time_jitter_fraction)());

Ok((credentials, expiry + jitter))
}
// Only instrument the the actual load future so that no span
// is opened if the cache decides not to execute it.
Expand Down Expand Up @@ -125,8 +136,8 @@ mod builder {

use super::TimeSource;
use super::{
LazyCredentialsCache, DEFAULT_BUFFER_TIME, DEFAULT_CREDENTIAL_EXPIRATION,
DEFAULT_LOAD_TIMEOUT,
LazyCredentialsCache, DEFAULT_BUFFER_TIME, DEFAULT_BUFFER_TIME_JITTER_FRACTION,
DEFAULT_CREDENTIAL_EXPIRATION, DEFAULT_LOAD_TIMEOUT,
};

/// Builder for constructing a `LazyCredentialsCache`.
Expand All @@ -147,6 +158,7 @@ mod builder {
time_source: Option<TimeSource>,
load_timeout: Option<Duration>,
buffer_time: Option<Duration>,
buffer_time_jitter_fraction: Option<fn() -> f64>,
default_credential_expiration: Option<Duration>,
}

Expand Down Expand Up @@ -228,6 +240,38 @@ mod builder {
self
}

/// A random percentage by which buffer time is jittered for randomization.
///
/// For example, if credentials are expiring in 15 minutes, the buffer time is 10 seconds,
/// and buffer time jitter fraction is 0.2, then buffer time is adjusted to 8 seconds.
/// Therefore, any requests made after 14 minutes and 52 seconds will load new credentials.
///
/// Defaults to a randomly generated value between 0.0 and 1.0. This setter is for testing only.
#[cfg(feature = "test-util")]
pub fn buffer_time_jitter_fraction(
mut self,
buffer_time_jitter_fraction: fn() -> f64,
) -> Self {
self.set_buffer_time_jitter_fraction(Some(buffer_time_jitter_fraction));
self
}

/// A random percentage by which buffer time is jittered for randomization.
///
/// For example, if credentials are expiring in 15 minutes, the buffer time is 10 seconds,
/// and buffer time jitter fraction is 0.2, then buffer time is adjusted to 8 seconds.
/// Therefore, any requests made after 14 minutes and 52 seconds will load new credentials.
///
/// Defaults to a randomly generated value between 0.0 and 1.0. This setter is for testing only.
#[cfg(feature = "test-util")]
pub fn set_buffer_time_jitter_fraction(
&mut self,
buffer_time_jitter_fraction: Option<fn() -> f64>,
) -> &mut Self {
self.buffer_time_jitter_fraction = buffer_time_jitter_fraction;
self
}

/// Default expiration time to set on credentials if they don't have an expiration time.
///
/// This is only used if the given [`ProvideCredentials`](crate::provider::ProvideCredentials) returns
Expand Down Expand Up @@ -283,8 +327,10 @@ mod builder {
}),
provider,
self.load_timeout.unwrap_or(DEFAULT_LOAD_TIMEOUT),
default_credential_expiration,
self.buffer_time.unwrap_or(DEFAULT_BUFFER_TIME),
self.buffer_time_jitter_fraction
.unwrap_or(DEFAULT_BUFFER_TIME_JITTER_FRACTION),
default_credential_expiration,
)
}
}
Expand All @@ -310,8 +356,11 @@ mod tests {
DEFAULT_LOAD_TIMEOUT,
};

const BUFFER_TIME_NO_JITTER: fn() -> f64 = || 0_f64;

fn test_provider(
time: TimeSource,
buffer_time_jitter_fraction: fn() -> f64,
load_list: Vec<crate::provider::Result>,
) -> LazyCredentialsCache {
let load_list = Arc::new(Mutex::new(load_list));
Expand All @@ -327,8 +376,9 @@ mod tests {
}
})),
DEFAULT_LOAD_TIMEOUT,
DEFAULT_CREDENTIAL_EXPIRATION,
DEFAULT_BUFFER_TIME,
buffer_time_jitter_fraction,
DEFAULT_CREDENTIAL_EXPIRATION,
)
}

Expand Down Expand Up @@ -361,8 +411,9 @@ mod tests {
Arc::new(TokioSleep::new()),
provider,
DEFAULT_LOAD_TIMEOUT,
DEFAULT_CREDENTIAL_EXPIRATION,
DEFAULT_BUFFER_TIME,
BUFFER_TIME_NO_JITTER,
DEFAULT_CREDENTIAL_EXPIRATION,
);
assert_eq!(
epoch_secs(1000),
Expand All @@ -381,6 +432,7 @@ mod tests {
let mut time = TestingTimeSource::new(epoch_secs(100));
let credentials_cache = test_provider(
TimeSource::testing(&time),
BUFFER_TIME_NO_JITTER,
vec![
Ok(credentials(1000)),
Ok(credentials(2000)),
Expand All @@ -404,6 +456,7 @@ mod tests {
let mut time = TestingTimeSource::new(epoch_secs(100));
let credentials_cache = test_provider(
TimeSource::testing(&time),
BUFFER_TIME_NO_JITTER,
vec![
Ok(credentials(1000)),
Err(CredentialsError::not_loaded("failed")),
Expand All @@ -430,6 +483,7 @@ mod tests {
let time = TestingTimeSource::new(epoch_secs(0));
let credentials_cache = Arc::new(test_provider(
TimeSource::testing(&time),
BUFFER_TIME_NO_JITTER,
vec![
Ok(credentials(500)),
Ok(credentials(1500)),
Expand Down Expand Up @@ -480,13 +534,40 @@ mod tests {
Ok(credentials(1000))
})),
Duration::from_millis(5),
DEFAULT_CREDENTIAL_EXPIRATION,
DEFAULT_BUFFER_TIME,
BUFFER_TIME_NO_JITTER,
DEFAULT_CREDENTIAL_EXPIRATION,
);

assert!(matches!(
credentials_cache.provide_cached_credentials().await,
Err(CredentialsError::ProviderTimedOut { .. })
));
}

#[tokio::test]
async fn buffer_time_jitter() {
let mut time = TestingTimeSource::new(epoch_secs(100));
let buffer_time_jitter_fraction = || 0.5_f64;
let credentials_cache = test_provider(
TimeSource::testing(&time),
buffer_time_jitter_fraction,
vec![Ok(credentials(1000)), Ok(credentials(2000))],
);

expect_creds(1000, &credentials_cache).await;
let buffer_time_with_jitter =
(DEFAULT_BUFFER_TIME.as_secs_f64() * buffer_time_jitter_fraction()) as u64;
assert_eq!(buffer_time_with_jitter, 5);
// Advance time to the point where the first credentials are about to expire (but haven't).
let almost_expired_secs = 1000 - buffer_time_with_jitter - 1;
time.set_time(epoch_secs(almost_expired_secs));
// We should still use the first credentials.
expect_creds(1000, &credentials_cache).await;
// Now let the first credentials expire.
let expired_secs = almost_expired_secs + 1;
time.set_time(epoch_secs(expired_secs));
// Now that the first credentials have been expired, the second credentials will be retrieved.
expect_creds(2000, &credentials_cache).await;
}
}

0 comments on commit 0a11d51

Please sign in to comment.