diff --git a/server/src/http/background_send_metrics.rs b/server/src/http/background_send_metrics.rs index 54c3d48f..dd6001d0 100644 --- a/server/src/http/background_send_metrics.rs +++ b/server/src/http/background_send_metrics.rs @@ -67,6 +67,37 @@ fn decide_where_to_post( } } +pub async fn send_metrics_one_shot( + metrics_cache: Arc, + feature_refresher: Arc, +) { + let envs = metrics_cache.get_metrics_by_environment(); + for (env, batch) in envs.iter() { + let (use_new_endpoint, token) = + decide_where_to_post(env, feature_refresher.tokens_to_refresh.clone()); + let batches = metrics_cache.get_appropriately_sized_env_batches(batch); + trace!("Posting {} batches for {env}", batches.len()); + for batch in batches { + if !batch.applications.is_empty() || !batch.metrics.is_empty() { + let result = if use_new_endpoint { + feature_refresher + .unleash_client + .send_bulk_metrics_to_client_endpoint(batch.clone(), &token) + .await + } else { + feature_refresher + .unleash_client + .send_batch_metrics(batch.clone()) + .await + }; + if let Err(edge_error) = result { + warn!("Shut down metrics flush failed with {edge_error:?}") + } + } + } + } +} + pub async fn send_metrics_task( metrics_cache: Arc, feature_refresher: Arc, diff --git a/server/src/main.rs b/server/src/main.rs index 39b443a8..5a8d7310 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -12,8 +12,11 @@ use unleash_types::client_metrics::ConnectVia; use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; +use tracing::info; use unleash_edge::builder::build_caches_and_refreshers; use unleash_edge::cli::{CliArgs, EdgeMode}; +use unleash_edge::http::background_send_metrics::send_metrics_one_shot; +use unleash_edge::http::feature_refresher::FeatureRefresher; use unleash_edge::metrics::client_metrics::MetricsCache; use unleash_edge::offline::offline_hotload; use unleash_edge::persistence::{persist_data, EdgePersistence}; @@ -148,7 +151,7 @@ async fn main() -> Result<(), anyhow::Error> { tokio::select! { _ = server.run() => { tracing::info!("Actix is shutting down. Persisting data"); - clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone()).await; + clean_shutdown(persistence.clone(), lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; tracing::info!("Actix was shutdown properly"); }, _ = refresher.start_refresh_features_background_task() => { @@ -184,7 +187,7 @@ async fn main() -> Result<(), anyhow::Error> { _ => tokio::select! { _ = server.run() => { tracing::info!("Actix is shutting down. Persisting data"); - clean_shutdown(persistence, lazy_feature_cache.clone(), lazy_token_cache.clone()).await; + clean_shutdown(persistence, lazy_feature_cache.clone(), lazy_token_cache.clone(), metrics_cache_clone.clone(), feature_refresher.clone()).await; tracing::info!("Actix was shutdown properly"); } @@ -199,6 +202,8 @@ async fn clean_shutdown( persistence: Option>, feature_cache: Arc>, token_cache: Arc>, + metrics_cache: Arc, + feature_refresher: Option>, ) { let tokens: Vec = token_cache .iter() @@ -225,4 +230,8 @@ async fn clean_shutdown( .for_each(|failed_save| tracing::error!("Failed backing up: {failed_save:?}")); } } + if let Some(feature_refresher) = feature_refresher { + info!("Connected to an upstream, flushing last set of metrics"); + send_metrics_one_shot(metrics_cache, feature_refresher).await; + } }