Skip to content

Commit

Permalink
feat: obey http status responses. backoff when 429 or 50x (#339)
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher Kolstad authored Nov 23, 2023
1 parent 4106baa commit 33a511d
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 25 deletions.
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ opentelemetry_sdk = { version = "0.21.1", features = [
] }
prometheus = { version = "0.13.3", features = ["process"] }
prometheus-static-metric = "0.5.1"
rand = "0.8.5"
redis = { version = "0.23.3", features = ["tokio-comp", "tokio-rustls-comp"] }
reqwest = { version = "0.11.22", default-features = false, features = [
"rustls",
Expand Down
6 changes: 3 additions & 3 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::{EdgeError, FeatureError};
use crate::error::EdgeError;
use crate::filters::{
filter_client_features, name_match_filter, name_prefix_filter, project_filter, FeatureFilterSet,
};
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn resolve_features(
None => features_cache
.get(&cache_key(&validated_token))
.map(|client_features| filter_client_features(&client_features, &filter_set))
.ok_or(EdgeError::ClientFeaturesFetchError(FeatureError::Retriable)),
.ok_or(EdgeError::ClientCacheError),
}?;

Ok(Json(ClientFeatures {
Expand Down Expand Up @@ -140,7 +140,7 @@ pub async fn get_feature(
None => features_cache
.get(&cache_key(&validated_token))
.map(|client_features| filter_client_features(&client_features, &filter_set))
.ok_or(EdgeError::ClientFeaturesFetchError(FeatureError::Retriable)),
.ok_or(EdgeError::ClientCacheError),
}
.map(|client_features| client_features.features.into_iter().next())?
.ok_or(EdgeError::FeatureNotFound(feature_name.into_inner()))
Expand Down
12 changes: 10 additions & 2 deletions server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub const TRUST_PROXY_PARSE_ERROR: &str =
pub enum FeatureError {
AccessDenied,
NotFound,
Retriable,
Retriable(StatusCode),
}

#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -89,6 +89,7 @@ pub enum EdgeError {
AuthorizationDenied,
AuthorizationPending,
ClientBuildError(String),
ClientCacheError,
ClientCertificateError(CertificateError),
ClientFeaturesFetchError(FeatureError),
ClientFeaturesParseError(String),
Expand Down Expand Up @@ -131,7 +132,10 @@ impl Display for EdgeError {
EdgeError::PersistenceError(msg) => write!(f, "{msg}"),
EdgeError::JsonParseError(msg) => write!(f, "{msg}"),
EdgeError::ClientFeaturesFetchError(fe) => match fe {
FeatureError::Retriable => write!(f, "Could not fetch client features. Will retry"),
FeatureError::Retriable(status_code) => write!(
f,
"Could not fetch client features. Will retry {status_code}"
),
FeatureError::AccessDenied => write!(
f,
"Could not fetch client features because api key was not allowed"
Expand Down Expand Up @@ -196,6 +200,9 @@ impl Display for EdgeError {
"Client hydration failed. Somehow we said [{message}] when it did"
)
}
EdgeError::ClientCacheError => {
write!(f, "Fetching client features from cache failed")
}
}
}
}
Expand Down Expand Up @@ -230,6 +237,7 @@ impl ResponseError for EdgeError {
EdgeError::HealthCheckError(_) => StatusCode::INTERNAL_SERVER_ERROR,
EdgeError::ReadyCheckError(_) => StatusCode::INTERNAL_SERVER_ERROR,
EdgeError::ClientHydrationFailed(_) => StatusCode::INTERNAL_SERVER_ERROR,
EdgeError::ClientCacheError => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
57 changes: 55 additions & 2 deletions server/src/http/background_send_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use actix_web::http::StatusCode;
use tracing::{error, warn};
use std::cmp::max;
use tracing::{error, info, trace, warn};

use super::unleash_client::UnleashClient;
use std::time::Duration;
Expand All @@ -10,6 +11,7 @@ use crate::{
};
use lazy_static::lazy_static;
use prometheus::{register_int_gauge, register_int_gauge_vec, IntGauge, IntGaugeVec, Opts};
use rand::Rng;
use std::sync::Arc;

lazy_static! {
Expand All @@ -30,8 +32,11 @@ pub async fn send_metrics_task(
unleash_client: Arc<UnleashClient>,
send_interval: u64,
) {
let mut failures = 0;
let mut interval = Duration::from_secs(send_interval);
loop {
let batches = metrics_cache.get_appropriately_sized_batches();
trace!("Posting {} batches", batches.len());
for batch in batches {
if !batch.applications.is_empty() || !batch.metrics.is_empty() {
if let Err(edge_error) = unleash_client.send_batch_metrics(batch.clone()).await {
Expand All @@ -48,6 +53,34 @@ pub async fn send_metrics_task(
StatusCode::BAD_REQUEST => {
error!("Unleash said [{message:?}]. Dropping this metric bucket to avoid consuming too much memory");
}
StatusCode::NOT_FOUND => {
failures = 10;
interval = new_interval(send_interval, failures, 5);
error!("Upstream said we are trying to post to an endpoint that doesn't exist. backing off to {} seconds", interval.as_secs());
}
StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => {
failures = 10;
interval = new_interval(send_interval, failures, 5);
error!("Upstream said we were not allowed to post metrics, backing off to {} seconds", interval.as_secs());
}
StatusCode::TOO_MANY_REQUESTS => {
failures = max(10, failures + 1);
interval = new_interval(send_interval, failures, 5);
info!(
"Upstream said it was too busy, backing off to {} seconds",
interval.as_secs()
);
metrics_cache.reinsert_batch(batch);
}
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
failures = max(10, failures + 1);
interval = new_interval(send_interval, failures, 5);
info!("Upstream said it is struggling. It returned Http status {}. Backing off to {} seconds", status_code, interval.as_secs());
metrics_cache.reinsert_batch(batch);
}
_ => {
warn!("Failed to send metrics. Status code was {status_code}. Will reinsert metrics for next attempt");
metrics_cache.reinsert_batch(batch);
Expand All @@ -59,9 +92,29 @@ pub async fn send_metrics_task(
METRICS_UNEXPECTED_ERRORS.inc();
}
}
} else {
failures = max(0, failures - 1);
interval = new_interval(send_interval, failures, 5);
}
}
}
tokio::time::sleep(Duration::from_secs(send_interval)).await;
trace!(
"Done posting traces. Sleeping for {} seconds and then going again",
interval.as_secs()
);
tokio::time::sleep(interval).await;
}
}

fn new_interval(send_interval: u64, failures: u64, max_jitter_seconds: u64) -> Duration {
let initial = Duration::from_secs(send_interval);
let added_interval_from_failure = Duration::from_secs(send_interval * failures);
let jitter = random_jitter_milliseconds(max_jitter_seconds);
initial + added_interval_from_failure + jitter
}

fn random_jitter_milliseconds(max_jitter_seconds: u64) -> Duration {
let mut rng = rand::thread_rng();
let jitter = rng.gen_range(0..(max_jitter_seconds * 1000));
Duration::from_millis(jitter)
}
58 changes: 43 additions & 15 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::{sync::Arc, time::Duration};
use actix_web::http::header::EntityTag;
use chrono::Utc;
use dashmap::DashMap;
use tracing::{debug, warn};
use reqwest::StatusCode;
use tracing::{debug, info, warn};
use unleash_types::client_features::Segment;
use unleash_types::client_metrics::ClientApplication;
use unleash_types::{
Expand Down Expand Up @@ -167,8 +168,8 @@ impl FeatureRefresher {
.map(|e| e.value().clone())
.filter(|token| {
token
.last_check
.map(|last| Utc::now() - last > self.refresh_interval)
.next_refresh
.map(|refresh| Utc::now() > refresh)
.unwrap_or(true)
})
.collect()
Expand Down Expand Up @@ -362,9 +363,22 @@ impl FeatureRefresher {
match e {
EdgeError::ClientFeaturesFetchError(fe) => {
match fe {
FeatureError::Retriable => {
warn!("Couldn't refresh features, but will retry next go")
}
FeatureError::Retriable(status_code) => match status_code {
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
info!("Upstream is having some problems, increasing my waiting period");
self.backoff(&refresh.token);
}
StatusCode::TOO_MANY_REQUESTS => {
info!("Got told that upstream is receiving too many requests");
self.backoff(&refresh.token);
}
_ => {
warn!("Couldn't refresh features, but will retry next go")
}
},
FeatureError::AccessDenied => {
warn!("Token used to fetch features was Forbidden, will remove from list of refresh tasks");
self.tokens_to_refresh.remove(&refresh.token.token);
Expand All @@ -391,25 +405,31 @@ impl FeatureRefresher {
}
}
}
EdgeError::ClientCacheError => {
warn!("Couldn't refresh features, but will retry next go")
}
_ => warn!("Couldn't refresh features: {e:?}. Will retry next pass"),
}
}
}
}

pub fn backoff(&self, token: &EdgeToken) {
self.tokens_to_refresh
.alter(&token.token, |_k, old_refresh| {
old_refresh.backoff(&self.refresh_interval)
});
}
pub fn update_last_check(&self, token: &EdgeToken) {
if let Some(mut token) = self.tokens_to_refresh.get_mut(&token.token) {
token.last_check = Some(Utc::now());
}
self.tokens_to_refresh
.alter(&token.token, |_k, old_refresh| {
old_refresh.successful_check(&self.refresh_interval)
});
}

pub fn update_last_refresh(&self, token: &EdgeToken, etag: Option<EntityTag>) {
self.tokens_to_refresh
.entry(token.token.clone())
.and_modify(|token_to_refresh| {
token_to_refresh.last_check = Some(Utc::now());
token_to_refresh.last_refreshed = Some(Utc::now());
token_to_refresh.etag = etag
.alter(&token.token, |_k, old_refresh| {
old_refresh.successful_refresh(&self.refresh_interval, etag)
});
}
}
Expand Down Expand Up @@ -768,17 +788,21 @@ mod tests {
let no_etag_so_is_due_for_refresh = TokenRefresh {
token: no_etag_due_for_refresh_token,
etag: None,
next_refresh: None,
last_refreshed: None,
last_check: None,
failure_count: 0,
};
let etag_and_last_refreshed_token =
EdgeToken::try_from("projectb:development.etag_and_last_refreshed_token".to_string())
.unwrap();
let etag_and_last_refreshed_less_than_duration_ago = TokenRefresh {
token: etag_and_last_refreshed_token,
etag: Some(EntityTag::new_weak("abcde".into())),
next_refresh: Some(Utc::now() + Duration::seconds(10)),
last_refreshed: Some(Utc::now()),
last_check: Some(Utc::now()),
failure_count: 0,
};
let etag_but_old_token =
EdgeToken::try_from("projectb:development.etag_but_old_token".to_string()).unwrap();
Expand All @@ -787,8 +811,10 @@ mod tests {
let etag_but_last_refreshed_ten_seconds_ago = TokenRefresh {
token: etag_but_old_token,
etag: Some(EntityTag::new_weak("abcde".into())),
next_refresh: None,
last_refreshed: Some(ten_seconds_ago),
last_check: Some(ten_seconds_ago),
failure_count: 0,
};
feature_refresher.tokens_to_refresh.insert(
etag_but_last_refreshed_ten_seconds_ago.token.token.clone(),
Expand Down Expand Up @@ -1110,8 +1136,10 @@ mod tests {
let token_refresh = TokenRefresh {
token: wildcard_token.clone(),
etag: None,
next_refresh: None,
last_refreshed: None,
last_check: None,
failure_count: 0,
};

current_tokens.insert(wildcard_token.token, token_refresh);
Expand Down
11 changes: 8 additions & 3 deletions server/src/http/unleash_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,11 @@ impl UnleashClient {
.send()
.await
.map_err(|e| {
warn!("Failed to fetch errors due to [{e:?}] - Will retry");
EdgeError::ClientFeaturesFetchError(FeatureError::Retriable)
warn!("Failed to fetch. Due to [{e:?}] - Will retry");
match e.status() {
Some(s) => EdgeError::ClientFeaturesFetchError(FeatureError::Retriable(s)),
None => EdgeError::ClientFeaturesFetchError(FeatureError::NotFound),
}
})?;
let stop_time = Utc::now();
CLIENT_FEATURE_FETCH
Expand Down Expand Up @@ -407,7 +410,9 @@ impl UnleashClient {
CLIENT_FEATURE_FETCH_FAILURES
.with_label_values(&[response.status().as_str()])
.inc();
Err(EdgeError::ClientFeaturesFetchError(FeatureError::Retriable))
Err(EdgeError::ClientFeaturesFetchError(
FeatureError::Retriable(response.status()),
))
}
}

Expand Down
4 changes: 4 additions & 0 deletions server/src/persistence/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,17 +250,21 @@ mod tests {
status: TokenValidationStatus::Validated,
},
etag: Some(EntityTag::new_weak("1234".into())),
next_refresh: None,
last_refreshed: Some(Utc::now()),
last_check: Some(Utc::now()),
failure_count: 0,
},
TokenRefresh {
token: EdgeToken {
token: "otherthing:otherthing:aljjsdnasd".into(),
..EdgeToken::default()
},
etag: None,
next_refresh: None,
last_refreshed: None,
last_check: None,
failure_count: 0,
},
];

Expand Down
Loading

0 comments on commit 33a511d

Please sign in to comment.