From 6855c7d78fe2e3b2b30289ff1523290a6706254d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nuno=20G=C3=B3is?= Date: Wed, 8 Feb 2023 18:12:37 +0000 Subject: [PATCH] task: add send metrics task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Simon Hornby Co-authored-by: Gastón Fournier --- server/src/cli.rs | 2 + server/src/data_sources/builder.rs | 2 + server/src/data_sources/memory_provider.rs | 13 +++++- server/src/data_sources/offline_provider.rs | 11 ++++- server/src/data_sources/redis_provider.rs | 10 ++++- server/src/edge_api.rs | 21 +++++----- server/src/frontend_api.rs | 6 ++- server/src/http/background_send_metrics.rs | 46 +++++++++++++++++---- server/src/http/unleash_client.rs | 3 +- server/src/main.rs | 8 +++- server/src/types.rs | 9 +++- 11 files changed, 103 insertions(+), 28 deletions(-) diff --git a/server/src/cli.rs b/server/src/cli.rs index 2e03569b..7039edae 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -35,6 +35,8 @@ pub struct EdgeArgs { pub unleash_url: String, #[clap(short, long, env)] pub redis_url: Option, + #[clap(short, long, env, default_value_t = 10)] + pub metrics_interval_seconds: u64, } #[derive(Args, Debug, Clone)] diff --git a/server/src/data_sources/builder.rs b/server/src/data_sources/builder.rs index 760e5e91..862fc42c 100644 --- a/server/src/data_sources/builder.rs +++ b/server/src/data_sources/builder.rs @@ -30,6 +30,7 @@ pub struct SinkInfo { pub validated_receive: mpsc::Receiver, pub unvalidated_receive: mpsc::Receiver, pub unleash_client: UnleashClient, + pub metrics_interval_seconds: u64, } fn build_offline(offline_args: OfflineArgs) -> EdgeResult { @@ -80,6 +81,7 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult { validated_receive: validated_receiver, unvalidated_receive: unvalidated_receiver, unleash_client, + metrics_interval_seconds: edge_args.metrics_interval_seconds, }), }) } diff --git a/server/src/data_sources/memory_provider.rs b/server/src/data_sources/memory_provider.rs index 2f28597d..d5ae6228 100644 --- a/server/src/data_sources/memory_provider.rs +++ b/server/src/data_sources/memory_provider.rs @@ -67,6 +67,15 @@ impl TokenSource for MemoryProvider { Ok(self.token_store.values().into_iter().cloned().collect()) } + async fn get_valid_tokens(&self) -> EdgeResult> { + Ok(self + .token_store + .values() + .filter(|t| t.status == TokenValidationStatus::Validated) + .cloned() + .collect()) + } + async fn get_token_validation_status(&self, secret: &str) -> EdgeResult { if let Some(token) = self.token_store.get(secret) { Ok(token.clone().status) @@ -83,7 +92,7 @@ impl TokenSource for MemoryProvider { Ok(self.token_store.get(&secret).cloned()) } - async fn get_valid_tokens(&self, secrets: Vec) -> EdgeResult> { + async fn filter_valid_tokens(&self, secrets: Vec) -> EdgeResult> { Ok(secrets .iter() .filter_map(|s| self.token_store.get(s)) @@ -202,7 +211,7 @@ mod test { .sink_tokens(vec![james_bond.clone(), frank_drebin.clone()]) .await; let valid_tokens = provider - .get_valid_tokens(vec![ + .filter_valid_tokens(vec![ "jamesbond".into(), "anotherinvalidone".into(), "frankdrebin".into(), diff --git a/server/src/data_sources/offline_provider.rs b/server/src/data_sources/offline_provider.rs index c47d83ba..c6cb9175 100644 --- a/server/src/data_sources/offline_provider.rs +++ b/server/src/data_sources/offline_provider.rs @@ -30,6 +30,15 @@ impl TokenSource for OfflineProvider { Ok(self.valid_tokens.values().cloned().collect()) } + async fn get_valid_tokens(&self) -> EdgeResult> { + Ok(self + .valid_tokens + .values() + .filter(|t| t.status == TokenValidationStatus::Validated) + .cloned() + .collect()) + } + async fn get_token_validation_status(&self, secret: &str) -> EdgeResult { Ok(if self.valid_tokens.contains_key(secret) { TokenValidationStatus::Validated @@ -41,7 +50,7 @@ impl TokenSource for OfflineProvider { async fn token_details(&self, secret: String) -> EdgeResult> { Ok(self.valid_tokens.get(&secret).cloned()) } - async fn get_valid_tokens(&self, secrets: Vec) -> EdgeResult> { + async fn filter_valid_tokens(&self, secrets: Vec) -> EdgeResult> { Ok(self .valid_tokens .clone() diff --git a/server/src/data_sources/redis_provider.rs b/server/src/data_sources/redis_provider.rs index 17d6c537..f5b7b101 100644 --- a/server/src/data_sources/redis_provider.rs +++ b/server/src/data_sources/redis_provider.rs @@ -95,6 +95,14 @@ impl TokenSource for RedisProvider { .collect()) } + async fn get_valid_tokens(&self) -> EdgeResult> { + let tokens = self.get_known_tokens().await?; + Ok(tokens + .into_iter() + .filter(|t| t.status == TokenValidationStatus::Validated) + .collect()) + } + async fn get_token_validation_status(&self, secret: &str) -> EdgeResult { if let Some(t) = self .get_known_tokens() @@ -112,7 +120,7 @@ impl TokenSource for RedisProvider { } } - async fn get_valid_tokens(&self, _secrets: Vec) -> EdgeResult> { + async fn filter_valid_tokens(&self, _secrets: Vec) -> EdgeResult> { todo!() } diff --git a/server/src/edge_api.rs b/server/src/edge_api.rs index 95af84b8..cbc471b3 100644 --- a/server/src/edge_api.rs +++ b/server/src/edge_api.rs @@ -1,28 +1,28 @@ use actix_web::{ - get, post, + post, web::{self, Json}, HttpResponse, }; use tokio::sync::RwLock; -use crate::{metrics::client_metrics::MetricsCache, types::EdgeResult}; +use crate::{ + metrics::client_metrics::MetricsCache, + types::{BatchMetricsRequestBody, EdgeResult}, +}; use crate::{ metrics::client_metrics::MetricsKey, - types::{ - BatchMetricsRequest, EdgeJsonResult, EdgeSource, EdgeToken, TokenStrings, ValidatedTokens, - }, + types::{EdgeJsonResult, EdgeSource, TokenStrings, ValidatedTokens}, }; -#[get("/validate")] +#[post("/validate")] async fn validate( - _client_token: EdgeToken, token_provider: web::Data>, tokens: Json, ) -> EdgeJsonResult { let valid_tokens = token_provider .read() .await - .get_valid_tokens(tokens.into_inner().tokens) + .filter_valid_tokens(tokens.into_inner().tokens) .await?; Ok(Json(ValidatedTokens { tokens: valid_tokens, @@ -31,8 +31,7 @@ async fn validate( #[post("/metrics")] async fn metrics( - _client_token: EdgeToken, - batch_metrics_request: web::Json, + batch_metrics_request: web::Json, metrics_cache: web::Data>, ) -> EdgeResult { { @@ -65,5 +64,5 @@ async fn metrics( } pub fn configure_edge_api(cfg: &mut web::ServiceConfig) { - cfg.service(validate); + cfg.service(validate).service(metrics); } diff --git a/server/src/frontend_api.rs b/server/src/frontend_api.rs index a7e32a48..ac4fa18e 100644 --- a/server/src/frontend_api.rs +++ b/server/src/frontend_api.rs @@ -176,6 +176,10 @@ mod tests { todo!() } + async fn get_valid_tokens(&self) -> EdgeResult> { + todo!() + } + async fn get_token_validation_status( &self, _secret: &str, @@ -187,7 +191,7 @@ mod tests { todo!() } - async fn get_valid_tokens(&self, _tokens: Vec) -> EdgeResult> { + async fn filter_valid_tokens(&self, _tokens: Vec) -> EdgeResult> { todo!() } } diff --git a/server/src/http/background_send_metrics.rs b/server/src/http/background_send_metrics.rs index cd06435c..a87e2a5c 100644 --- a/server/src/http/background_send_metrics.rs +++ b/server/src/http/background_send_metrics.rs @@ -1,14 +1,18 @@ use super::unleash_client::UnleashClient; use std::time::Duration; +use crate::error::EdgeError; use crate::metrics::client_metrics::MetricsCache; -use crate::types::BatchMetricsRequest; +use crate::types::{ + BatchMetricsRequest, BatchMetricsRequestBody, EdgeResult, EdgeSource, EdgeToken, +}; use std::sync::Arc; use tokio::sync::RwLock; use tracing::warn; pub async fn send_metrics_task( metrics_cache: Arc>, + source: Arc>, unleash_client: UnleashClient, send_interval: u64, ) { @@ -16,19 +20,43 @@ pub async fn send_metrics_task( { let mut metrics_lock = metrics_cache.write().await; let metrics = metrics_lock.get_unsent_metrics(); + let api_key = get_first_token(source.clone()).await; - let request = BatchMetricsRequest { - applications: metrics.applications, - metrics: metrics.metrics, - }; + match api_key { + Ok(api_key) => { + let request = BatchMetricsRequest { + api_key: api_key.token.clone(), + body: BatchMetricsRequestBody { + applications: metrics.applications, + metrics: metrics.metrics, + }, + }; - if let Err(error) = unleash_client.send_batch_metrics(request).await { - warn!("Failed to send metrics: {error:?}"); - } else { - metrics_lock.reset_metrics(); + if let Err(error) = unleash_client.send_batch_metrics(request).await { + warn!("Failed to send metrics: {error:?}"); + } else { + metrics_lock.reset_metrics(); + } + } + Err(e) => { + warn!("Error sending metrics: {e:?}") + } } } tokio::time::sleep(Duration::from_secs(send_interval)).await; } } + +async fn get_first_token(source: Arc>) -> EdgeResult { + let source_lock = source.read().await; + let api_key = source_lock + .get_valid_tokens() + .await? + .get(0) + .map(|x| x.clone()); + match api_key { + Some(api_key) => Ok(api_key), + None => Err(EdgeError::DataSourceError("No tokens found".into())), + } +} diff --git a/server/src/http/unleash_client.rs b/server/src/http/unleash_client.rs index c421bba4..12e9fad7 100644 --- a/server/src/http/unleash_client.rs +++ b/server/src/http/unleash_client.rs @@ -114,7 +114,8 @@ impl UnleashClient { let result = self .backing_client .post(self.urls.edge_metrics_url.to_string()) - .json(&request) + .header(reqwest::header::AUTHORIZATION, request.api_key) + .json(&request.body) .send() .await .map_err(|_| EdgeError::EdgeMetricsError)?; diff --git a/server/src/main.rs b/server/src/main.rs index fd776921..e9fd8797 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -15,6 +15,7 @@ use unleash_edge::data_sources::builder::build_source_and_sink; use unleash_edge::edge_api; use unleash_edge::frontend_api; use unleash_edge::http::background_refresh::{poll_for_token_status, refresh_features}; +use unleash_edge::http::background_send_metrics::send_metrics_task; use unleash_edge::internal_backstage; use unleash_edge::metrics::client_metrics::MetricsCache; use unleash_edge::prom_metrics; @@ -30,9 +31,11 @@ async fn main() -> Result<(), anyhow::Error> { let (metrics_handler, request_metrics) = prom_metrics::instantiate(None); let repo_info = build_source_and_sink(args).unwrap(); let source = repo_info.source; + let source_clone = source.clone(); let sink_info = repo_info.sink_info; let metrics_cache = Arc::new(RwLock::new(MetricsCache::default())); + let metrics_cache_clone = metrics_cache.clone(); let server = HttpServer::new(move || { let edge_source = web::Data::from(source.clone()); @@ -84,8 +87,11 @@ async fn main() -> Result<(), anyhow::Error> { _ = poll_for_token_status(sink_info.unvalidated_receive, sink_info.validated_send.clone(), sink_info.sink.clone(), sink_info.unleash_client.clone()) => { tracing::info!("Token validator task is shutting down") }, - _ = refresh_features(sink_info.validated_receive, sink_info.sink, sink_info.unleash_client) => { + _ = refresh_features(sink_info.validated_receive, sink_info.sink, sink_info.unleash_client.clone()) => { tracing::info!("Refresh task is shutting down"); + }, + _ = send_metrics_task(metrics_cache_clone, source_clone, sink_info.unleash_client, sink_info.metrics_interval_seconds) => { + tracing::info!("Metrics task is shutting down"); } } } else { diff --git a/server/src/types.rs b/server/src/types.rs index 1cb8f95e..3527f68d 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -207,9 +207,10 @@ pub trait FeaturesSource { #[async_trait] pub trait TokenSource { async fn get_known_tokens(&self) -> EdgeResult>; + async fn get_valid_tokens(&self) -> EdgeResult>; async fn get_token_validation_status(&self, secret: &str) -> EdgeResult; async fn token_details(&self, secret: String) -> EdgeResult>; - async fn get_valid_tokens(&self, tokens: Vec) -> EdgeResult>; + async fn filter_valid_tokens(&self, tokens: Vec) -> EdgeResult>; } pub trait EdgeSource: FeaturesSource + TokenSource + Send + Sync {} @@ -227,6 +228,12 @@ pub trait FeatureSink { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct BatchMetricsRequest { + pub api_key: String, + pub body: BatchMetricsRequestBody, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct BatchMetricsRequestBody { pub applications: Vec, pub metrics: Vec, }