Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task client metrics #53

Merged
merged 6 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions application.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"appName":"myapp",
"instanceId":"isbetterthanyourapp",
sighphyre marked this conversation as resolved.
Show resolved Hide resolved
"sdkVersion":"imaginarySdk:3.1.0",
"environment":"development",
"interval":15,
"started": "2023-02-07T12:16:39.888Z",
"strategies": ["default", "gradualRollout"]
}
4 changes: 2 additions & 2 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ tokio = {version = "1.25.0", features = ["macros", "rt-multi-thread", "tracing"]
tracing = {version = "0.1.37", features = ["log"]}
tracing-subscriber = {version = "0.3.16", features = ["json", "env-filter"]}
ulid = "1.0.0"
unleash-types = {version = "0.7.1", features = ["openapi", "hashes"]}
unleash-yggdrasil = "0.4.2"
unleash-types = {version = "0.8.1", features = ["openapi", "hashes"]}
unleash-yggdrasil = "0.4.4"
[dev-dependencies]
actix-http = "3.3.0"
actix-http-test = "3.1.0"
Expand Down
2 changes: 2 additions & 0 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct EdgeArgs {
pub unleash_url: String,
#[clap(short, long, env)]
pub redis_url: Option<String>,
#[clap(short, long, env, default_value_t = 10)]
pub metrics_interval_seconds: u64,
}

#[derive(Args, Debug, Clone)]
Expand Down
167 changes: 164 additions & 3 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::types::{EdgeJsonResult, EdgeSource, EdgeToken};
use actix_web::get;
use crate::metrics::client_metrics::{ApplicationKey, MetricsCache};
use crate::types::{EdgeJsonResult, EdgeResult, EdgeSource, EdgeToken};
use actix_web::web::{self, Json};
use actix_web::{get, post, HttpRequest, HttpResponse};
use tokio::sync::RwLock;
use tracing::info;
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::{
from_bucket_app_name_and_env, ClientApplication, ClientMetrics,
};

#[get("/client/features")]
async fn features(
Expand All @@ -19,6 +23,163 @@ async fn features(
.map(Json)
}

#[post("/client/register")]
async fn register(
edge_token: EdgeToken,
_req: HttpRequest,
client_application: web::Json<ClientApplication>,
metrics_cache: web::Data<RwLock<MetricsCache>>,
) -> EdgeResult<HttpResponse> {
let client_application = client_application.into_inner();
let to_write = ClientApplication {
environment: edge_token.environment,
..client_application
};
{
let mut writeable_cache = metrics_cache.write().await;
writeable_cache.applications.insert(
ApplicationKey {
app_name: to_write.app_name.clone(),
instance_id: to_write
.instance_id
.clone()
.unwrap_or_else(|| ulid::Ulid::new().to_string()),
},
to_write,
);
}
Ok(HttpResponse::Accepted().finish())
}

#[get("/client/applications")]
async fn show_applications(
metrics_cache: web::Data<RwLock<MetricsCache>>,
) -> EdgeJsonResult<Vec<ClientApplication>> {
Ok(Json(
metrics_cache
.read()
.await
.applications
.values()
.cloned()
.collect(),
))
}

#[get("/client/metrics")]
async fn metrics(
edge_token: EdgeToken,
metrics: web::Json<ClientMetrics>,
metrics_cache: web::Data<RwLock<MetricsCache>>,
) -> EdgeResult<HttpResponse> {
let metrics = metrics.into_inner();
let metrics = from_bucket_app_name_and_env(
metrics.bucket,
metrics.app_name,
edge_token.environment.unwrap(),
);

{
let mut writeable_cache = metrics_cache.write().await;

writeable_cache.sink_metrics(&metrics);
}
Ok(HttpResponse::Accepted().finish())
}

pub fn configure_client_api(cfg: &mut web::ServiceConfig) {
cfg.service(features);
cfg.service(features)
.service(register)
.service(show_applications);
}

#[cfg(test)]
mod tests {

use std::{collections::HashMap, sync::Arc};

use crate::metrics::client_metrics::MetricsKey;

use super::*;

use actix_http::Request;
use actix_web::{
http::header::ContentType,
test,
web::{self, Data},
App,
};
use chrono::{DateTime, Utc};
use serde_json::json;
use unleash_types::client_metrics::ClientMetricsEnv;

async fn make_test_request() -> Request {
test::TestRequest::get()
.uri("/api/client/metrics")
.insert_header(ContentType::json())
.insert_header((
"Authorization",
"*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7",
))
.set_json(json!({
"appName": "some-app",
"instanceId": "some-instance",
"bucket": {
"start": "1867-11-07T12:00:00Z",
"stop": "1934-11-07T12:00:00Z",
"toggles": {
"some-feature": {
"yes": 1,
"no": 0
}
}
}
}))
.to_request()
}

#[actix_web::test]
async fn metrics_endpoint_correctly_aggregates_data() {
let metrics_cache = Arc::new(RwLock::new(MetricsCache::default()));

let app = test::init_service(
App::new()
.app_data(Data::from(metrics_cache.clone()))
.service(web::scope("/api").service(super::metrics)),
)
.await;

let req = make_test_request().await;
let _result = test::call_and_read_body(&app, req).await;

let cache = metrics_cache.read().await;

let found_metric = cache
.metrics
.get(&MetricsKey {
app_name: "some-app".into(),
feature_name: "some-feature".into(),
timestamp: DateTime::parse_from_rfc3339("1867-11-07T12:00:00Z")
.unwrap()
.with_timezone(&Utc),
})
.unwrap();

let expected = ClientMetricsEnv {
app_name: "some-app".into(),
feature_name: "some-feature".into(),
environment: "development".into(),
timestamp: DateTime::parse_from_rfc3339("1867-11-07T12:00:00Z")
.unwrap()
.with_timezone(&Utc),
yes: 1,
no: 0,
variants: HashMap::new(),
};

assert_eq!(found_metric.yes, expected.yes);
assert_eq!(found_metric.yes, 1);
assert_eq!(found_metric.no, 0);
assert_eq!(found_metric.no, expected.no);
}
}
2 changes: 2 additions & 0 deletions server/src/data_sources/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct SinkInfo {
pub validated_receive: mpsc::Receiver<EdgeToken>,
pub unvalidated_receive: mpsc::Receiver<EdgeToken>,
pub unleash_client: UnleashClient,
pub metrics_interval_seconds: u64,
}

fn build_offline(offline_args: OfflineArgs) -> EdgeResult<DataProviderPair> {
Expand Down Expand Up @@ -80,6 +81,7 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
validated_receive: validated_receiver,
unvalidated_receive: unvalidated_receiver,
unleash_client,
metrics_interval_seconds: edge_args.metrics_interval_seconds,
}),
})
}
Expand Down
13 changes: 11 additions & 2 deletions server/src/data_sources/memory_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ impl TokenSource for MemoryProvider {
Ok(self.token_store.values().into_iter().cloned().collect())
}

async fn get_valid_tokens(&self) -> EdgeResult<Vec<EdgeToken>> {
Ok(self
.token_store
.values()
.filter(|t| t.status == TokenValidationStatus::Validated)
.cloned()
.collect())
}

async fn get_token_validation_status(&self, secret: &str) -> EdgeResult<TokenValidationStatus> {
if let Some(token) = self.token_store.get(secret) {
Ok(token.clone().status)
Expand All @@ -83,7 +92,7 @@ impl TokenSource for MemoryProvider {
Ok(self.token_store.get(&secret).cloned())
}

async fn get_valid_tokens(&self, secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
async fn filter_valid_tokens(&self, secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
Ok(secrets
.iter()
.filter_map(|s| self.token_store.get(s))
Expand Down Expand Up @@ -202,7 +211,7 @@ mod test {
.sink_tokens(vec![james_bond.clone(), frank_drebin.clone()])
.await;
let valid_tokens = provider
.get_valid_tokens(vec![
.filter_valid_tokens(vec![
"jamesbond".into(),
"anotherinvalidone".into(),
"frankdrebin".into(),
Expand Down
11 changes: 10 additions & 1 deletion server/src/data_sources/offline_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ impl TokenSource for OfflineProvider {
Ok(self.valid_tokens.values().cloned().collect())
}

async fn get_valid_tokens(&self) -> EdgeResult<Vec<EdgeToken>> {
Ok(self
.valid_tokens
.values()
.filter(|t| t.status == TokenValidationStatus::Validated)
.cloned()
.collect())
}

async fn get_token_validation_status(&self, secret: &str) -> EdgeResult<TokenValidationStatus> {
Ok(if self.valid_tokens.contains_key(secret) {
TokenValidationStatus::Validated
Expand All @@ -41,7 +50,7 @@ impl TokenSource for OfflineProvider {
async fn token_details(&self, secret: String) -> EdgeResult<Option<EdgeToken>> {
Ok(self.valid_tokens.get(&secret).cloned())
}
async fn get_valid_tokens(&self, secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
async fn filter_valid_tokens(&self, secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
Ok(self
.valid_tokens
.clone()
Expand Down
10 changes: 9 additions & 1 deletion server/src/data_sources/redis_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ impl TokenSource for RedisProvider {
.collect())
}

async fn get_valid_tokens(&self) -> EdgeResult<Vec<EdgeToken>> {
let tokens = self.get_known_tokens().await?;
Ok(tokens
.into_iter()
.filter(|t| t.status == TokenValidationStatus::Validated)
.collect())
}

async fn get_token_validation_status(&self, secret: &str) -> EdgeResult<TokenValidationStatus> {
if let Some(t) = self
.get_known_tokens()
Expand All @@ -112,7 +120,7 @@ impl TokenSource for RedisProvider {
}
}

async fn get_valid_tokens(&self, _secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
async fn filter_valid_tokens(&self, _secrets: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
todo!()
}

Expand Down
29 changes: 23 additions & 6 deletions server/src/edge_api.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,44 @@
use actix_web::{
get,
post,
web::{self, Json},
HttpResponse,
};
use tokio::sync::RwLock;

use crate::types::{EdgeJsonResult, EdgeSource, EdgeToken, TokenStrings, ValidatedTokens};
use crate::types::{EdgeJsonResult, EdgeSource, TokenStrings, ValidatedTokens};
use crate::{
metrics::client_metrics::MetricsCache,
types::{BatchMetricsRequestBody, EdgeResult},
};

#[get("/validate")]
#[post("/validate")]
async fn validate(
_client_token: EdgeToken,
token_provider: web::Data<RwLock<dyn EdgeSource>>,
tokens: Json<TokenStrings>,
) -> EdgeJsonResult<ValidatedTokens> {
let valid_tokens = token_provider
.read()
.await
.get_valid_tokens(tokens.into_inner().tokens)
.filter_valid_tokens(tokens.into_inner().tokens)
.await?;
Ok(Json(ValidatedTokens {
tokens: valid_tokens,
}))
}

#[post("/metrics")]
async fn metrics(
batch_metrics_request: web::Json<BatchMetricsRequestBody>,
metrics_cache: web::Data<RwLock<MetricsCache>>,
) -> EdgeResult<HttpResponse> {
{
let mut metrics_lock = metrics_cache.write().await;

metrics_lock.sink_metrics(&batch_metrics_request.metrics);
}
Ok(HttpResponse::Accepted().finish())
}

pub fn configure_edge_api(cfg: &mut web::ServiceConfig) {
cfg.service(validate);
cfg.service(validate).service(metrics);
}
Loading