Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: client metrics start task #51

Merged
merged 1 commit into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct EdgeArgs {
pub unleash_url: String,
#[clap(short, long, env)]
pub redis_url: Option<String>,
#[clap(short, long, env, default_value_t = 10)]
pub metrics_interval_seconds: u64,
}

#[derive(Args, Debug, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions server/src/data_sources/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct SinkInfo {
pub validated_receive: mpsc::Receiver<EdgeToken>,
pub unvalidated_receive: mpsc::Receiver<EdgeToken>,
pub unleash_client: UnleashClient,
pub metrics_interval_seconds: u64,
}

fn build_offline(offline_args: OfflineArgs) -> EdgeResult<DataProviderPair> {
Expand Down Expand Up @@ -80,6 +81,7 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
validated_receive: validated_receiver,
unvalidated_receive: unvalidated_receiver,
unleash_client,
metrics_interval_seconds: edge_args.metrics_interval_seconds,
}),
})
}
Expand Down
13 changes: 11 additions & 2 deletions server/src/data_sources/memory_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ impl TokenSource for MemoryProvider {
Ok(self.token_store.values().into_iter().cloned().collect())
}

async fn get_valid_tokens(&self) -> EdgeResult<Vec<EdgeToken>> {
Ok(self
.token_store
.values()
.filter(|t| t.status == TokenValidationStatus::Validated)
.cloned()
.collect())
}

async fn get_token_validation_status(&self, secret: &str) -> EdgeResult<TokenValidationStatus> {
if let Some(token) = self.token_store.get(secret) {
Ok(token.clone().status)
Expand All @@ -83,7 +92,7 @@ impl TokenSource for MemoryProvider {
Ok(self.token_store.get(&secret).cloned())
}

async fn get_valid_tokens(&self, secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
async fn filter_valid_tokens(&self, secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
Ok(secrets
.iter()
.filter_map(|s| self.token_store.get(s))
Expand Down Expand Up @@ -202,7 +211,7 @@ mod test {
.sink_tokens(vec![james_bond.clone(), frank_drebin.clone()])
.await;
let valid_tokens = provider
.get_valid_tokens(vec![
.filter_valid_tokens(vec![
"jamesbond".into(),
"anotherinvalidone".into(),
"frankdrebin".into(),
Expand Down
11 changes: 10 additions & 1 deletion server/src/data_sources/offline_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ impl TokenSource for OfflineProvider {
Ok(self.valid_tokens.values().cloned().collect())
}

async fn get_valid_tokens(&self) -> EdgeResult<Vec<EdgeToken>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idle thoughts but it looks like we might end up writing this same method over and over. Might be worth implementing this as a trait on Vec<EdgeToken>. Not something for this PR

Ok(self
.valid_tokens
.values()
.filter(|t| t.status == TokenValidationStatus::Validated)
.cloned()
.collect())
}

async fn get_token_validation_status(&self, secret: &str) -> EdgeResult<TokenValidationStatus> {
Ok(if self.valid_tokens.contains_key(secret) {
TokenValidationStatus::Validated
Expand All @@ -41,7 +50,7 @@ impl TokenSource for OfflineProvider {
async fn token_details(&self, secret: String) -> EdgeResult<Option<EdgeToken>> {
Ok(self.valid_tokens.get(&secret).cloned())
}
async fn get_valid_tokens(&self, secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
async fn filter_valid_tokens(&self, secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
Ok(self
.valid_tokens
.clone()
Expand Down
10 changes: 9 additions & 1 deletion server/src/data_sources/redis_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ impl TokenSource for RedisProvider {
.collect())
}

async fn get_valid_tokens(&self) -> EdgeResult<Vec<EdgeToken>> {
let tokens = self.get_known_tokens().await?;
Ok(tokens
.into_iter()
.filter(|t| t.status == TokenValidationStatus::Validated)
.collect())
}

async fn get_token_validation_status(&self, secret: &str) -> EdgeResult<TokenValidationStatus> {
if let Some(t) = self
.get_known_tokens()
Expand All @@ -112,7 +120,7 @@ impl TokenSource for RedisProvider {
}
}

async fn get_valid_tokens(&self, _secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
async fn filter_valid_tokens(&self, _secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
todo!()
}

Expand Down
21 changes: 10 additions & 11 deletions server/src/edge_api.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
use actix_web::{
get, post,
post,
web::{self, Json},
HttpResponse,
};
use tokio::sync::RwLock;

use crate::{metrics::client_metrics::MetricsCache, types::EdgeResult};
use crate::{
metrics::client_metrics::MetricsCache,
types::{BatchMetricsRequestBody, EdgeResult},
};
use crate::{
metrics::client_metrics::MetricsKey,
types::{
BatchMetricsRequest, EdgeJsonResult, EdgeSource, EdgeToken, TokenStrings, ValidatedTokens,
},
types::{EdgeJsonResult, EdgeSource, TokenStrings, ValidatedTokens},
};

#[get("/validate")]
#[post("/validate")]
async fn validate(
_client_token: EdgeToken,
token_provider: web::Data<RwLock<dyn EdgeSource>>,
tokens: Json<TokenStrings>,
) -> EdgeJsonResult<ValidatedTokens> {
let valid_tokens = token_provider
.read()
.await
.get_valid_tokens(tokens.into_inner().tokens)
.filter_valid_tokens(tokens.into_inner().tokens)
.await?;
Ok(Json(ValidatedTokens {
tokens: valid_tokens,
Expand All @@ -31,8 +31,7 @@ async fn validate(

#[post("/metrics")]
async fn metrics(
_client_token: EdgeToken,
batch_metrics_request: web::Json<BatchMetricsRequest>,
batch_metrics_request: web::Json<BatchMetricsRequestBody>,
metrics_cache: web::Data<RwLock<MetricsCache>>,
) -> EdgeResult<HttpResponse> {
{
Expand Down Expand Up @@ -65,5 +64,5 @@ async fn metrics(
}

pub fn configure_edge_api(cfg: &mut web::ServiceConfig) {
cfg.service(validate);
cfg.service(validate).service(metrics);
}
6 changes: 5 additions & 1 deletion server/src/frontend_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ mod tests {
todo!()
}

async fn get_valid_tokens(&self) -> EdgeResult<Vec<crate::types::EdgeToken>> {
todo!()
}

async fn get_token_validation_status(
&self,
_secret: &str,
Expand All @@ -187,7 +191,7 @@ mod tests {
todo!()
}

async fn get_valid_tokens(&self, _tokens: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
async fn filter_valid_tokens(&self, _tokens: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
todo!()
}
}
Expand Down
46 changes: 37 additions & 9 deletions server/src/http/background_send_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,62 @@
use super::unleash_client::UnleashClient;
use std::time::Duration;

use crate::error::EdgeError;
use crate::metrics::client_metrics::MetricsCache;
use crate::types::BatchMetricsRequest;
use crate::types::{
BatchMetricsRequest, BatchMetricsRequestBody, EdgeResult, EdgeSource, EdgeToken,
};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::warn;

pub async fn send_metrics_task(
metrics_cache: Arc<RwLock<MetricsCache>>,
source: Arc<RwLock<dyn EdgeSource>>,
unleash_client: UnleashClient,
send_interval: u64,
) {
loop {
{
let mut metrics_lock = metrics_cache.write().await;
let metrics = metrics_lock.get_unsent_metrics();
let api_key = get_first_token(source.clone()).await;

let request = BatchMetricsRequest {
applications: metrics.applications,
metrics: metrics.metrics,
};
match api_key {
Ok(api_key) => {
let request = BatchMetricsRequest {
api_key: api_key.token.clone(),
body: BatchMetricsRequestBody {
applications: metrics.applications,
metrics: metrics.metrics,
},
};

if let Err(error) = unleash_client.send_batch_metrics(request).await {
warn!("Failed to send metrics: {error:?}");
} else {
metrics_lock.reset_metrics();
if let Err(error) = unleash_client.send_batch_metrics(request).await {
warn!("Failed to send metrics: {error:?}");
} else {
metrics_lock.reset_metrics();
}
}
Err(e) => {
warn!("Error sending metrics: {e:?}")
}
}
}

tokio::time::sleep(Duration::from_secs(send_interval)).await;
}
}

async fn get_first_token(source: Arc<RwLock<dyn EdgeSource>>) -> EdgeResult<EdgeToken> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also idle thoughts, but if this feels like it wants to be an option not a result. Doesn't need to be done in this PR

let source_lock = source.read().await;
let api_key = source_lock
.get_valid_tokens()
.await?
.get(0)
.map(|x| x.clone());
match api_key {
Some(api_key) => Ok(api_key),
None => Err(EdgeError::DataSourceError("No tokens found".into())),
}
}
3 changes: 2 additions & 1 deletion server/src/http/unleash_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ impl UnleashClient {
let result = self
.backing_client
.post(self.urls.edge_metrics_url.to_string())
.json(&request)
.header(reqwest::header::AUTHORIZATION, request.api_key)
.json(&request.body)
.send()
.await
.map_err(|_| EdgeError::EdgeMetricsError)?;
Expand Down
8 changes: 7 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use unleash_edge::data_sources::builder::build_source_and_sink;
use unleash_edge::edge_api;
use unleash_edge::frontend_api;
use unleash_edge::http::background_refresh::{poll_for_token_status, refresh_features};
use unleash_edge::http::background_send_metrics::send_metrics_task;
use unleash_edge::internal_backstage;
use unleash_edge::metrics::client_metrics::MetricsCache;
use unleash_edge::prom_metrics;
Expand All @@ -30,9 +31,11 @@ async fn main() -> Result<(), anyhow::Error> {
let (metrics_handler, request_metrics) = prom_metrics::instantiate(None);
let repo_info = build_source_and_sink(args).unwrap();
let source = repo_info.source;
let source_clone = source.clone();
let sink_info = repo_info.sink_info;

let metrics_cache = Arc::new(RwLock::new(MetricsCache::default()));
let metrics_cache_clone = metrics_cache.clone();

let server = HttpServer::new(move || {
let edge_source = web::Data::from(source.clone());
Expand Down Expand Up @@ -84,8 +87,11 @@ async fn main() -> Result<(), anyhow::Error> {
_ = poll_for_token_status(sink_info.unvalidated_receive, sink_info.validated_send.clone(), sink_info.sink.clone(), sink_info.unleash_client.clone()) => {
tracing::info!("Token validator task is shutting down")
},
_ = refresh_features(sink_info.validated_receive, sink_info.sink, sink_info.unleash_client) => {
_ = refresh_features(sink_info.validated_receive, sink_info.sink, sink_info.unleash_client.clone()) => {
tracing::info!("Refresh task is shutting down");
},
_ = send_metrics_task(metrics_cache_clone, source_clone, sink_info.unleash_client, sink_info.metrics_interval_seconds) => {
tracing::info!("Metrics task is shutting down");
}
}
} else {
Expand Down
9 changes: 8 additions & 1 deletion server/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,10 @@ pub trait FeaturesSource {
#[async_trait]
pub trait TokenSource {
async fn get_known_tokens(&self) -> EdgeResult<Vec<EdgeToken>>;
async fn get_valid_tokens(&self) -> EdgeResult<Vec<EdgeToken>>;
async fn get_token_validation_status(&self, secret: &str) -> EdgeResult<TokenValidationStatus>;
async fn token_details(&self, secret: String) -> EdgeResult<Option<EdgeToken>>;
async fn get_valid_tokens(&self, tokens: Vec<String>) -> EdgeResult<Vec<EdgeToken>>;
async fn filter_valid_tokens(&self, tokens: Vec<String>) -> EdgeResult<Vec<EdgeToken>>;
}

pub trait EdgeSource: FeaturesSource + TokenSource + Send + Sync {}
Expand All @@ -227,6 +228,12 @@ pub trait FeatureSink {

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BatchMetricsRequest {
pub api_key: String,
pub body: BatchMetricsRequestBody,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BatchMetricsRequestBody {
pub applications: Vec<ClientApplication>,
pub metrics: Vec<ClientMetricsEnv>,
}
Expand Down