Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use Version-headers to decide where to post metrics #389

Merged
merged 4 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ reqwest = { version = "0.11.23", default-features = false, features = [
] }
rustls = "0.21.8"
rustls-pemfile = "1.0.4"
semver = "1.0.21"
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
serde_qs = { version = "0.12.0", features = ["actix4", "tracing"] }
Expand Down
25 changes: 16 additions & 9 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,13 +576,23 @@ mod tests {
upstream_engine_cache,
)
.await;
let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
let status = client
.send_bulk_metrics_to_client_endpoint(MetricsBatch::default(), None)
let req = reqwest::Client::new();
let status = req
.post(srv.url("/api/client/metrics/bulk").as_str())
.body(
serde_json::to_string(&crate::types::BatchMetricsRequestBody {
applications: vec![],
metrics: vec![],
})
.unwrap(),
)
.send()
.await;
assert_eq!(status.expect_err("").status_code(), StatusCode::FORBIDDEN);
assert!(status.is_ok());
assert_eq!(status.unwrap().status(), StatusCode::FORBIDDEN);
let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
let successful = client
.send_bulk_metrics_to_client_endpoint(MetricsBatch::default(), Some(token.clone()))
.send_bulk_metrics_to_client_endpoint(MetricsBatch::default(), &token.token)
.await;
assert!(successful.is_ok());
}
Expand All @@ -604,10 +614,7 @@ mod tests {
.await;
let client = UnleashClient::new(srv.url("/").as_str(), None).unwrap();
let status = client
.send_bulk_metrics_to_client_endpoint(
MetricsBatch::default(),
Some(frontend_token.clone()),
)
.send_bulk_metrics_to_client_endpoint(MetricsBatch::default(), &frontend_token.token)
.await;
assert_eq!(status.expect_err("").status_code(), StatusCode::FORBIDDEN);
}
Expand Down
205 changes: 145 additions & 60 deletions server/src/http/background_send_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use actix_web::http::StatusCode;
use std::cmp::max;
use tracing::{error, info, trace, warn};

use super::unleash_client::UnleashClient;
use super::feature_refresher::FeatureRefresher;

use crate::types::TokenRefresh;
use crate::{
error::EdgeError,
metrics::client_metrics::{size_of_batch, MetricsCache},
};
use chrono::Duration;
use dashmap::DashMap;
use lazy_static::lazy_static;
use prometheus::{register_int_gauge, register_int_gauge_vec, IntGauge, IntGaugeVec, Opts};
use rand::Rng;
Expand All @@ -25,76 +27,135 @@ lazy_static! {
.unwrap();
pub static ref METRICS_UNEXPECTED_ERRORS: IntGauge =
register_int_gauge!(Opts::new("metrics_send_error", "Failures to send metrics")).unwrap();
pub static ref METRICS_UPSTREAM_OUTDATED: IntGaugeVec = register_int_gauge_vec!(
Opts::new(
"metrics_upstream_outdated",
"Number of times we have tried to send metrics to an outdated endpoint"
),
&["environment"]
)
.unwrap();
pub static ref METRICS_UPSTREAM_CLIENT_BULK: IntGaugeVec = register_int_gauge_vec!(
Opts::new(
"metrics_upstream_client_bulk",
"Number of times we have tried to send metrics to the client bulk endpoint"
),
&["environment"]
)
.unwrap();
}

fn decide_where_to_post(
environment: &String,
known_tokens: Arc<DashMap<String, TokenRefresh>>,
) -> (bool, String) {
if let Some(token_refresh) = known_tokens
.iter()
.find(|t| t.token.environment == Some(environment.to_string()))
{
if token_refresh.use_client_bulk_endpoint {
info!("Sending metrics to client bulk endpoint");
METRICS_UPSTREAM_CLIENT_BULK
.with_label_values(&[environment])
.inc();
(true, token_refresh.token.token.clone())
} else {
warn!("Your upstream is outdated. Please upgrade to at least Unleash version 5.9.0 or Edge Version 17.0.0");
METRICS_UPSTREAM_OUTDATED
.with_label_values(&[environment])
.inc();
(false, "".into())
}
} else {
(false, "".into())
}
}

pub async fn send_metrics_task(
metrics_cache: Arc<MetricsCache>,
unleash_client: Arc<UnleashClient>,
feature_refresher: Arc<FeatureRefresher>,
send_interval: i64,
) {
let mut failures = 0;
let mut interval = Duration::seconds(send_interval);
loop {
let batches = metrics_cache.get_appropriately_sized_batches();
trace!("Posting {} batches", batches.len());
for batch in batches {
if !batch.applications.is_empty() || !batch.metrics.is_empty() {
if let Err(edge_error) = unleash_client.send_batch_metrics(batch.clone()).await {
match edge_error {
EdgeError::EdgeMetricsRequestError(status_code, message) => {
METRICS_UPSTREAM_HTTP_ERRORS
.with_label_values(&[status_code.as_str()])
.inc();
match status_code {
StatusCode::PAYLOAD_TOO_LARGE => error!(
"Metrics were too large. They were {}",
size_of_batch(&batch)
),
StatusCode::BAD_REQUEST => {
error!("Unleash said [{message:?}]. Dropping this metric bucket to avoid consuming too much memory");
}
StatusCode::NOT_FOUND => {
failures = 10;
interval = new_interval(interval, failures, 5);
error!("Upstream said we are trying to post to an endpoint that doesn't exist. backing off to {} seconds", interval.num_seconds());
}
StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => {
failures = 10;
interval = new_interval(interval, failures, 5);
error!("Upstream said we were not allowed to post metrics, backing off to {} seconds", interval.num_seconds());
}
StatusCode::TOO_MANY_REQUESTS => {
failures = max(10, failures + 1);
interval = new_interval(interval, failures, 5);
info!(
"Upstream said it was too busy, backing off to {} seconds",
interval.num_seconds()
);
metrics_cache.reinsert_batch(batch);
}
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
failures = max(10, failures + 1);
interval = new_interval(interval, failures, 5);
info!("Upstream said it is struggling. It returned Http status {}. Backing off to {} seconds", status_code, interval.num_seconds());
metrics_cache.reinsert_batch(batch);
}
_ => {
warn!("Failed to send metrics. Status code was {status_code}. Will reinsert metrics for next attempt");
metrics_cache.reinsert_batch(batch);
trace!("Looping metrics");
let envs = metrics_cache.get_metrics_by_environment();
for (env, batch) in envs.iter() {
let (use_new_endpoint, token) =
decide_where_to_post(env, feature_refresher.tokens_to_refresh.clone());
let batches = metrics_cache.get_appropriately_sized_env_batches(batch);
trace!("Posting {} batches for {env}", batches.len());
for batch in batches {
if !batch.applications.is_empty() || !batch.metrics.is_empty() {
let result = if use_new_endpoint {
feature_refresher
.unleash_client
.send_bulk_metrics_to_client_endpoint(batch.clone(), &token)
.await
} else {
feature_refresher
.unleash_client
.send_batch_metrics(batch.clone())
.await
};
if let Err(edge_error) = result {
match edge_error {
EdgeError::EdgeMetricsRequestError(status_code, message) => {
METRICS_UPSTREAM_HTTP_ERRORS
.with_label_values(&[status_code.as_str()])
.inc();
match status_code {
StatusCode::PAYLOAD_TOO_LARGE => error!(
"Metrics were too large. They were {}",
size_of_batch(&batch)
),
StatusCode::BAD_REQUEST => {
error!("Unleash said [{message:?}]. Dropping this metric bucket to avoid consuming too much memory");
}
StatusCode::NOT_FOUND => {
failures = 10;
interval = new_interval(interval, failures, 5);
error!("Upstream said we are trying to post to an endpoint that doesn't exist. backing off to {} seconds", interval.num_seconds());
}
StatusCode::FORBIDDEN | StatusCode::UNAUTHORIZED => {
failures = 10;
interval = new_interval(interval, failures, 5);
error!("Upstream said we were not allowed to post metrics, backing off to {} seconds", interval.num_seconds());
}
StatusCode::TOO_MANY_REQUESTS => {
failures = max(10, failures + 1);
interval = new_interval(interval, failures, 5);
info!(
"Upstream said it was too busy, backing off to {} seconds",
interval.num_seconds()
);
metrics_cache.reinsert_batch(batch);
}
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
failures = max(10, failures + 1);
interval = new_interval(interval, failures, 5);
info!("Upstream said it is struggling. It returned Http status {}. Backing off to {} seconds", status_code, interval.num_seconds());
metrics_cache.reinsert_batch(batch);
}
_ => {
warn!("Failed to send metrics. Status code was {status_code}. Will reinsert metrics for next attempt");
metrics_cache.reinsert_batch(batch);
}
}
}
_ => {
warn!("Failed to send metrics: {edge_error:?}");
METRICS_UNEXPECTED_ERRORS.inc();
}
}
_ => {
warn!("Failed to send metrics: {edge_error:?}");
METRICS_UNEXPECTED_ERRORS.inc();
}
} else {
failures = max(0, failures - 1);
interval = new_interval(interval, failures, 5);
}
} else {
failures = max(0, failures - 1);
interval = new_interval(interval, failures, 5);
}
}
}
Expand All @@ -120,11 +181,35 @@ fn random_jitter_seconds(max_jitter_seconds: u8) -> Duration {
#[cfg(test)]
mod tests {
use crate::http::background_send_metrics::new_interval;
use crate::types::{EdgeToken, TokenRefresh};
use chrono::Duration;
use dashmap::DashMap;
use std::sync::Arc;

#[test]
pub fn new_interval_does_not_overflow() {
#[tokio::test]
pub async fn new_interval_does_not_overflow() {
let metrics = new_interval(Duration::seconds(300), 10, 5);
assert!(metrics.num_seconds() < 3305);
}

#[tokio::test]
pub async fn decides_correctly_whether_to_post_to_client_bulk_or_edge_bulk() {
let refreshing_tokens = Arc::new(DashMap::default());
let old_token = EdgeToken::validated_client_token("*:development.somesecret");
let mut old_endpoint_refresh = TokenRefresh::new(old_token.clone(), None);
old_endpoint_refresh.use_client_bulk_endpoint = false;
let new_token = EdgeToken::validated_client_token("*:production.someothersecret");
let mut new_endpoint_refresh = TokenRefresh::new(new_token.clone(), None);
new_endpoint_refresh.use_client_bulk_endpoint = true;
refreshing_tokens.insert(old_token.token.clone(), old_endpoint_refresh);
refreshing_tokens.insert(new_token.token.clone(), new_endpoint_refresh);
let (use_new_endpoint, token) =
super::decide_where_to_post(&"development".to_string(), refreshing_tokens.clone());
assert!(!use_new_endpoint);
assert_eq!(&token, "");
let (use_new_endpoint, other_token) =
super::decide_where_to_post(&"production".to_string(), refreshing_tokens.clone());
assert!(use_new_endpoint);
assert_eq!(other_token, new_token.token);
}
}
15 changes: 12 additions & 3 deletions server/src/http/feature_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl FeatureRefresher {
/// Registers a token for refresh, the token will be discarded if it can be subsumed by another previously registered token
pub async fn register_token_for_refresh(&self, token: EdgeToken, etag: Option<EntityTag>) {
if !self.tokens_to_refresh.contains_key(&token.token) {
let _ = self
let use_new_bulk_endpoint_for_metrics = self
.unleash_client
.register_as_client(
token.token.clone(),
Expand All @@ -278,10 +278,15 @@ impl FeatureRefresher {
self.refresh_interval.num_seconds(),
),
)
.await;
.await
.unwrap_or_default();
let mut registered_tokens: Vec<TokenRefresh> =
self.tokens_to_refresh.iter().map(|t| t.clone()).collect();
registered_tokens.push(TokenRefresh::new(token.clone(), etag));
registered_tokens.push(TokenRefresh::new_with_client_bulk_endpoint(
token.clone(),
etag,
use_new_bulk_endpoint_for_metrics,
));
let minimum = simplify(&registered_tokens);
let mut keys = HashSet::new();
for refreshes in minimum {
Expand Down Expand Up @@ -792,6 +797,7 @@ mod tests {
last_refreshed: None,
last_check: None,
failure_count: 0,
use_client_bulk_endpoint: false,
};
let etag_and_last_refreshed_token =
EdgeToken::try_from("projectb:development.etag_and_last_refreshed_token".to_string())
Expand All @@ -803,6 +809,7 @@ mod tests {
last_refreshed: Some(Utc::now()),
last_check: Some(Utc::now()),
failure_count: 0,
use_client_bulk_endpoint: false,
};
let etag_but_old_token =
EdgeToken::try_from("projectb:development.etag_but_old_token".to_string()).unwrap();
Expand All @@ -814,6 +821,7 @@ mod tests {
next_refresh: None,
last_refreshed: Some(ten_seconds_ago),
last_check: Some(ten_seconds_ago),
use_client_bulk_endpoint: false,
failure_count: 0,
};
feature_refresher.tokens_to_refresh.insert(
Expand Down Expand Up @@ -1196,6 +1204,7 @@ mod tests {
last_refreshed: None,
last_check: None,
failure_count: 0,
use_client_bulk_endpoint: false,
};

current_tokens.insert(wildcard_token.token, token_refresh);
Expand Down
Loading
Loading