Skip to content

Commit

Permalink
fix: Move /api/client/register to a post request. (#81)
Browse files Browse the repository at this point in the history
Earlier we didn't accept metrics from downstream clients because we made
a wrong assumption about Request Method type. This PR fixes this and
starts accepting client metrics and posting them upstream.

Closes: #80 

Co-authored-by: Nuno Góis <github@nunogois.com>
  • Loading branch information
Christopher Kolstad and nunogois authored Feb 28, 2023
1 parent 77b9b0c commit 98666cf
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 11 deletions.
8 changes: 8 additions & 0 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ pub struct CliArgs {

#[command(subcommand)]
pub mode: EdgeMode,

/// Instance id. Used for metrics reporting.
#[clap(long, env, default_value_t = ulid::Ulid::new().to_string())]
pub instance_id: String,

/// App name. Used for metrics reporting.
#[clap(short, long, env, default_value = "unleash-edge")]
pub app_name: String,
}

#[derive(Args, Debug, Clone)]
Expand Down
27 changes: 19 additions & 8 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use crate::metrics::client_metrics::{ApplicationKey, MetricsCache};
use crate::types::{EdgeJsonResult, EdgeResult, EdgeSource, EdgeToken};
use actix_web::web::{self, Json};
use actix_web::{get, post, HttpRequest, HttpResponse};
use tracing::debug;
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::{
from_bucket_app_name_and_env, ClientApplication, ClientMetrics,
from_bucket_app_name_and_env, ClientApplication, ClientMetrics, ConnectVia,
};

#[utoipa::path(
Expand Down Expand Up @@ -43,14 +44,19 @@ pub async fn features(
#[post("/client/register")]
pub async fn register(
edge_token: EdgeToken,
connect_via: web::Data<ConnectVia>,
_req: HttpRequest,
client_application: web::Json<ClientApplication>,
metrics_cache: web::Data<MetricsCache>,
) -> EdgeResult<HttpResponse> {
let client_application = client_application.into_inner();
let updated_with_connection_info = client_application.connect_via(
connect_via.app_name.as_str(),
connect_via.instance_id.as_str(),
);
let to_write = ClientApplication {
environment: edge_token.environment,
..client_application
..updated_with_connection_info
};
metrics_cache.applications.insert(
ApplicationKey {
Expand All @@ -76,10 +82,10 @@ pub async fn register(
("Authorization" = [])
)
)]
#[get("/client/metrics")]
#[post("/client/metrics")]
pub async fn metrics(
edge_token: EdgeToken,
metrics: web::Json<ClientMetrics>,
metrics: Json<ClientMetrics>,
metrics_cache: web::Data<MetricsCache>,
) -> EdgeResult<HttpResponse> {
let metrics = metrics.into_inner();
Expand All @@ -88,13 +94,13 @@ pub async fn metrics(
metrics.app_name,
edge_token.environment.unwrap(),
);

debug!("Received metrics: {metrics:?}");
metrics_cache.sink_metrics(&metrics);
Ok(HttpResponse::Accepted().finish())
}

pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
cfg.service(features).service(register);
cfg.service(features).service(register).service(metrics);
}

#[cfg(test)]
Expand All @@ -115,10 +121,11 @@ mod tests {
};
use chrono::{DateTime, Utc};
use serde_json::json;
use ulid::Ulid;
use unleash_types::client_metrics::ClientMetricsEnv;

async fn make_test_request() -> Request {
test::TestRequest::get()
test::TestRequest::post()
.uri("/api/client/metrics")
.insert_header(ContentType::json())
.insert_header((
Expand Down Expand Up @@ -148,6 +155,10 @@ mod tests {

let app = test::init_service(
App::new()
.app_data(Data::new(ConnectVia {
app_name: "test".into(),
instance_id: Ulid::new().to_string(),
}))
.app_data(Data::from(metrics_cache.clone()))
.service(web::scope("/api").service(super::metrics)),
)
Expand All @@ -156,7 +167,7 @@ mod tests {
let req = make_test_request().await;
let _result = test::call_and_read_body(&app, req).await;

let cache = metrics_cache;
let cache = metrics_cache.clone();

let found_metric = cache
.metrics
Expand Down
3 changes: 2 additions & 1 deletion server/src/http/background_send_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::types::{
BatchMetricsRequest, BatchMetricsRequestBody, EdgeResult, EdgeSource, EdgeToken,
};
use std::sync::Arc;
use tracing::warn;
use tracing::{debug, warn};

pub async fn send_metrics_task(
metrics_cache: Arc<MetricsCache>,
Expand All @@ -22,6 +22,7 @@ pub async fn send_metrics_task(

match api_key {
Ok(api_key) => {
debug!("Going to post {metrics:?} for {api_key:?}");
let request = BatchMetricsRequest {
api_key: api_key.token.clone(),
body: BatchMetricsRequestBody {
Expand Down
7 changes: 6 additions & 1 deletion server/src/http/unleash_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::types::{
ValidateTokensRequest,
};
use reqwest::{header, Client};
use tracing::debug;

use crate::urls::UnleashUrls;
use crate::{error::EdgeError, types::ClientFeaturesRequest};
Expand Down Expand Up @@ -111,17 +112,21 @@ impl UnleashClient {
}

pub async fn send_batch_metrics(&self, request: BatchMetricsRequest) -> EdgeResult<()> {
debug!(
"Posting metrics to {}",
self.urls.edge_metrics_url.to_string()
);
let result = self
.backing_client
.post(self.urls.edge_metrics_url.to_string())
.header(reqwest::header::AUTHORIZATION, request.api_key)
.json(&request.body)
.send()
.await
.map_err(|_| EdgeError::EdgeMetricsError)?;
if result.status().is_success() {
Ok(())
} else {
debug!("{}", result.status());
Err(EdgeError::EdgeMetricsError)
}
}
Expand Down
6 changes: 6 additions & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use actix_web::{web, App, HttpServer};
use actix_web_opentelemetry::RequestTracing;
use clap::Parser;
use cli::CliArgs;
use unleash_types::client_metrics::ConnectVia;

use unleash_edge::client_api;
use unleash_edge::data_sources::builder::build_source_and_sink;
Expand All @@ -31,6 +32,10 @@ async fn main() -> Result<(), anyhow::Error> {
let mode_arg = args.clone().mode;
let http_args = args.clone().http;
let (metrics_handler, request_metrics) = prom_metrics::instantiate(None);
let connect_via = ConnectVia {
app_name: args.clone().app_name,
instance_id: args.clone().instance_id,
};
let repo_info = build_source_and_sink(args).await.unwrap();
let source = repo_info.source;
let source_clone = source.clone();
Expand All @@ -52,6 +57,7 @@ async fn main() -> Result<(), anyhow::Error> {
let mut app = App::new()
.app_data(edge_source)
.app_data(web::Data::new(mode_arg.clone()))
.app_data(web::Data::new(connect_via.clone()))
.app_data(web::Data::from(metrics_cache.clone()));
if validator.is_some() {
app = app.app_data(web::Data::from(validator.clone().unwrap()))
Expand Down
39 changes: 38 additions & 1 deletion server/src/metrics/client_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl PartialEq for MetricsKey {
}
}

#[derive(Default, Debug)]
pub struct MetricsBatch {
pub applications: Vec<ClientApplication>,
pub metrics: Vec<ClientMetricsEnv>,
Expand Down Expand Up @@ -106,7 +107,7 @@ mod test {
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use unleash_types::client_metrics::ClientMetricsEnv;
use unleash_types::client_metrics::{ClientMetricsEnv, ConnectVia};

#[test]
fn cache_aggregates_data_correctly() {
Expand Down Expand Up @@ -275,4 +276,40 @@ mod test {
cache.reset_metrics();
assert!(cache.metrics.is_empty());
}

#[test]
fn adding_another_connection_link_works() {
let client_application = ClientApplication {
app_name: "tests_help".into(),
connect_via: None,
environment: Some("development".into()),
instance_id: Some("test".into()),
interval: 60,
sdk_version: None,
started: Default::default(),
strategies: vec![],
};
let connected_via_test_instance = client_application.connect_via("test", "instance");
let connected_via_edge_as_well = connected_via_test_instance.connect_via("edge", "edgeid");
assert_eq!(
connected_via_test_instance.connect_via.unwrap(),
vec![ConnectVia {
app_name: "test".into(),
instance_id: "instance".into()
}]
);
assert_eq!(
connected_via_edge_as_well.connect_via.unwrap(),
vec![
ConnectVia {
app_name: "test".into(),
instance_id: "instance".into()
},
ConnectVia {
app_name: "edge".into(),
instance_id: "edgeid".into()
}
]
)
}
}

0 comments on commit 98666cf

Please sign in to comment.