From 7aeaa3e94243a552b9ceadad539e15ba3c6b4527 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nuno=20G=C3=B3is?= Date: Thu, 9 Feb 2023 15:58:10 +0000 Subject: [PATCH 1/9] task: add token validator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Christopher Kolstad Co-authored-by: Gastón Fournier Co-authored-by: Simon Hornby --- server/src/auth/mod.rs | 1 + server/src/auth/token_validator.rs | 207 ++++++++++++++++++++ server/src/data_sources/memory_provider.rs | 18 +- server/src/data_sources/offline_provider.rs | 11 +- server/src/data_sources/redis_provider.rs | 17 -- server/src/frontend_api.rs | 9 +- server/src/lib.rs | 1 + server/src/middleware/validate_token.rs | 71 +++---- server/src/types.rs | 8 +- 9 files changed, 253 insertions(+), 90 deletions(-) create mode 100644 server/src/auth/mod.rs create mode 100644 server/src/auth/token_validator.rs diff --git a/server/src/auth/mod.rs b/server/src/auth/mod.rs new file mode 100644 index 00000000..40cd9758 --- /dev/null +++ b/server/src/auth/mod.rs @@ -0,0 +1 @@ +pub mod token_validator; diff --git a/server/src/auth/token_validator.rs b/server/src/auth/token_validator.rs new file mode 100644 index 00000000..e656ecf3 --- /dev/null +++ b/server/src/auth/token_validator.rs @@ -0,0 +1,207 @@ +use crate::error::EdgeError; +use crate::http::unleash_client::UnleashClient; +use crate::types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken, ValidateTokensRequest}; +use std::sync::Arc; +use tokio::sync::RwLock; +use unleash_types::Merge; +pub struct TokenValidator { + unleash_client: UnleashClient, + edge_source: Arc>, + edge_sink: Option>>, +} + +impl TokenValidator { + async fn get_unknown_and_known_tokens( + &mut self, + tokens: Vec, + ) -> EdgeResult<(Vec, Vec)> { + let tokens_with_valid_format: Vec = tokens + .into_iter() + .filter_map(|t| EdgeToken::try_from(t).ok()) + .collect(); + + let source_known_tokens = self.edge_source.read().await.get_known_tokens().await?; + + if tokens_with_valid_format.is_empty() { + Err(EdgeError::TokenParseError) + } else { + Ok(( + tokens_with_valid_format + .clone() + .iter() + .filter(|t| !source_known_tokens.iter().any(|e| e.token == t.token)) + .cloned() + .collect(), + tokens_with_valid_format + .iter() + .filter(|t| source_known_tokens.iter().any(|e| e.token == t.token)) + .cloned() + .collect(), + )) + } + } + + pub async fn register_token(&mut self, token: String) -> EdgeResult { + let (unknown_tokens, known_tokens) = self + .get_unknown_and_known_tokens(vec![token.clone()]) + .await?; + if unknown_tokens.is_empty() { + Ok(known_tokens.get(0).unwrap().clone()) + } else { + Ok(self + .register_tokens(vec![token]) + .await? + .first() + .expect("Couldn't validate token") + .clone()) + } + } + + pub async fn register_tokens(&mut self, tokens: Vec) -> EdgeResult> { + let (unknown_tokens, known_tokens) = self.get_unknown_and_known_tokens(tokens).await?; + if unknown_tokens.is_empty() { + Ok(known_tokens) + } else { + let token_strings_to_validate: Vec = + unknown_tokens.iter().map(|t| t.token.clone()).collect(); + + let validation_result = self + .unleash_client + .validate_tokens(ValidateTokensRequest { + tokens: token_strings_to_validate, + }) + .await?; + + let tokens_to_sink: Vec = unknown_tokens + .into_iter() + .map(|maybe_valid| { + if let Some(validated_token) = validation_result + .iter() + .find(|v| maybe_valid.token == v.token) + { + EdgeToken { + status: crate::types::TokenValidationStatus::Validated, + ..validated_token.clone() + } + } else { + EdgeToken { + status: crate::types::TokenValidationStatus::Invalid, + ..maybe_valid + } + } + }) + .collect(); + println!("Going to sink {} tokens", tokens_to_sink.len()); + let mut sink_to_write = self.edge_sink.write().await; + let _ = sink_to_write.sink_tokens(tokens_to_sink.clone()).await; + Ok(tokens_to_sink.merge(known_tokens)) + } + } +} + +#[cfg(test)] +mod tests { + use crate::data_sources::memory_provider::MemoryProvider; + use crate::types::{EdgeToken, TokenType, TokenValidationStatus}; + use actix_http::HttpService; + use actix_http_test::{test_server, TestServer}; + use actix_service::map_config; + use actix_web::{dev::AppConfig, web, App, HttpResponse}; + use serde::{Deserialize, Serialize}; + use std::sync::Arc; + use tokio::sync::{mpsc, RwLock}; + + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct EdgeTokens { + pub tokens: Vec, + } + + async fn return_validated_tokens() -> HttpResponse { + HttpResponse::Ok().json(EdgeTokens { + tokens: valid_tokens(), + }) + } + + fn valid_tokens() -> Vec { + vec![EdgeToken { + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(), + projects: vec!["*".into()], + environment: Some("development".into()), + token_type: Some(TokenType::Client), + status: TokenValidationStatus::Validated, + }] + } + + async fn test_validation_server() -> TestServer { + test_server(move || { + HttpService::new(map_config( + App::new().service( + web::resource("/edge/validate").route(web::post().to(return_validated_tokens)), + ), + |_| AppConfig::default(), + )) + .tcp() + }) + .await + } + + #[tokio::test] + pub async fn can_validate_tokens() { + use crate::types::TokenSource; + let (sender, _) = mpsc::channel::(32); + let test_provider = Arc::new(RwLock::new(MemoryProvider::new(sender))); + let srv = test_validation_server().await; + let unleash_client = + crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None) + .expect("Couldn't build client"); + + let mut validation_holder = super::TokenValidator { + unleash_client, + edge_source: test_provider.clone(), + edge_sink: test_provider.clone(), + }; + let tokens_to_validate = vec![ + "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(), + "*:production.abcdef1234567890".into(), + ]; + validation_holder + .register_tokens(tokens_to_validate) + .await + .expect("Couldn't register tokens"); + let known_tokens = test_provider + .read() + .await + .get_known_tokens() + .await + .expect("Couldn't get tokens"); + assert_eq!(known_tokens.len(), 2); + assert!(known_tokens.iter().any(|t| t.token + == "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f" + && t.status == TokenValidationStatus::Validated)); + assert!(known_tokens + .iter() + .any(|t| t.token == "*:production.abcdef1234567890" + && t.status == TokenValidationStatus::Invalid)); + } + + #[tokio::test] + pub async fn tokens_with_wrong_format_is_not_included() { + let (sender, _) = mpsc::channel::(32); + let test_provider = Arc::new(RwLock::new(MemoryProvider::new(sender))); + let srv = test_validation_server().await; + let unleash_client = + crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None) + .expect("Couldn't build client"); + let mut validation_holder = super::TokenValidator { + unleash_client, + edge_source: test_provider.clone(), + edge_sink: test_provider.clone(), + }; + let invalid_tokens = vec!["jamesbond".into(), "invalidtoken".into()]; + let validated_tokens = validation_holder + .register_tokens(invalid_tokens) + .await + .expect("Couldn't register tokens"); + assert!(validated_tokens.is_empty()); + } +} diff --git a/server/src/data_sources/memory_provider.rs b/server/src/data_sources/memory_provider.rs index e059e2a0..de52757d 100644 --- a/server/src/data_sources/memory_provider.rs +++ b/server/src/data_sources/memory_provider.rs @@ -91,18 +91,6 @@ impl TokenSource for MemoryProvider { .collect()) } - async fn get_token_validation_status(&self, secret: &str) -> EdgeResult { - if let Some(token) = self.token_store.get(secret) { - Ok(token.clone().status) - } else { - let _ = self - .sender - .send(EdgeToken::try_from(secret.to_string())?) - .await; - Ok(TokenValidationStatus::Unknown) - } - } - async fn token_details(&self, secret: String) -> EdgeResult> { Ok(self.token_store.get(&secret).cloned()) } @@ -172,9 +160,11 @@ mod test { assert_eq!( provider - .get_token_validation_status("some_secret") + .token_details("some_secret".into()) .await - .unwrap(), + .expect("Could not retrieve token details") + .unwrap() + .status, TokenValidationStatus::Validated ) } diff --git a/server/src/data_sources/offline_provider.rs b/server/src/data_sources/offline_provider.rs index f58072ea..d3a9e1eb 100644 --- a/server/src/data_sources/offline_provider.rs +++ b/server/src/data_sources/offline_provider.rs @@ -37,14 +37,6 @@ impl TokenSource for OfflineProvider { .collect()) } - async fn get_token_validation_status(&self, secret: &str) -> EdgeResult { - Ok(if self.valid_tokens.contains_key(secret) { - TokenValidationStatus::Validated - } else { - TokenValidationStatus::Invalid - }) - } - async fn token_details(&self, secret: String) -> EdgeResult> { Ok(self.valid_tokens.get(&secret).cloned()) } @@ -83,8 +75,7 @@ impl OfflineProvider { features, valid_tokens: valid_tokens .into_iter() - .map(EdgeToken::try_from) - .filter_map(|t| t.ok()) + .filter_map(|t| EdgeToken::try_from(t).ok()) .map(|t| (t.token.clone(), t)) .collect(), } diff --git a/server/src/data_sources/redis_provider.rs b/server/src/data_sources/redis_provider.rs index d614a7c2..62b24a33 100644 --- a/server/src/data_sources/redis_provider.rs +++ b/server/src/data_sources/redis_provider.rs @@ -134,23 +134,6 @@ impl TokenSource for RedisProvider { .collect()) } - async fn get_token_validation_status(&self, secret: &str) -> EdgeResult { - if let Some(t) = self - .get_known_tokens() - .await? - .iter() - .find(|t| t.token == secret) - { - Ok(t.clone().status) - } else { - let _ = self - .sender - .send(EdgeToken::try_from(secret.to_string())?) - .await; - Ok(TokenValidationStatus::Unknown) - } - } - async fn filter_valid_tokens(&self, _secrets: Vec) -> EdgeResult> { todo!() } diff --git a/server/src/frontend_api.rs b/server/src/frontend_api.rs index 07f8d181..c77d0fb6 100644 --- a/server/src/frontend_api.rs +++ b/server/src/frontend_api.rs @@ -126,7 +126,7 @@ mod tests { use crate::data_sources::builder::DataProviderPair; use crate::types::{ EdgeResult, EdgeSink, EdgeSource, EdgeToken, FeatureSink, FeaturesSource, TokenSink, - TokenSource, TokenValidationStatus, + TokenSource, }; use actix_web::{ http::header::ContentType, @@ -180,13 +180,6 @@ mod tests { todo!() } - async fn get_token_validation_status( - &self, - _secret: &str, - ) -> EdgeResult { - Ok(TokenValidationStatus::Validated) - } - async fn token_details(&self, _secret: String) -> EdgeResult> { todo!() } diff --git a/server/src/lib.rs b/server/src/lib.rs index 593b00ec..518ebeb6 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -3,6 +3,7 @@ pub mod http; pub mod metrics; pub mod prom_metrics; +pub mod auth; pub mod cli; pub mod client_api; pub mod edge_api; diff --git a/server/src/middleware/validate_token.rs b/server/src/middleware/validate_token.rs index caec1b36..d1d148e6 100644 --- a/server/src/middleware/validate_token.rs +++ b/server/src/middleware/validate_token.rs @@ -1,4 +1,5 @@ -use crate::types::{EdgeSource, EdgeToken, TokenType, TokenValidationStatus}; +use crate::auth::token_validator::TokenValidator; +use crate::types::{EdgeToken, TokenType, TokenValidationStatus}; use actix_web::{ body::MessageBody, dev::{ServiceRequest, ServiceResponse}, @@ -8,54 +9,44 @@ use actix_web::{ use tokio::sync::RwLock; use tracing::instrument; -#[instrument(skip(srv, req, provider))] +#[instrument(skip(srv, req, validator))] pub async fn validate_token( token: EdgeToken, - provider: Data>, + validator: Data>, req: ServiceRequest, srv: crate::middleware::as_async_middleware::Next, ) -> Result, actix_web::Error> { - let res = if let Some(known_token) = provider - .read() - .await - .token_details(token.token.clone()) - .await? - { - match known_token.status { - TokenValidationStatus::Validated => match known_token.token_type { - Some(TokenType::Frontend) => { - if req.path().contains("/api/frontend") || req.path().contains("/api/proxy") { - srv.call(req).await?.map_into_left_body() - } else { - req.into_response(HttpResponse::Forbidden().finish()) - .map_into_right_body() - } + let mut validator_lock = validator.write().await; + + let known_token = validator_lock.register_token(token.token.clone()).await?; + let res = match known_token.status { + TokenValidationStatus::Validated => match known_token.token_type { + Some(TokenType::Frontend) => { + if req.path().contains("/api/frontend") || req.path().contains("/api/proxy") { + srv.call(req).await?.map_into_left_body() + } else { + req.into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body() } - Some(TokenType::Client) => { - if req.path().contains("/api/client") { - srv.call(req).await?.map_into_left_body() - } else { - req.into_response(HttpResponse::Forbidden().finish()) - .map_into_right_body() - } + } + Some(TokenType::Client) => { + if req.path().contains("/api/client") { + srv.call(req).await?.map_into_left_body() + } else { + req.into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body() } - _ => req - .into_response(HttpResponse::Forbidden().finish()) - .map_into_right_body(), - }, - TokenValidationStatus::Unknown => req - .into_response(HttpResponse::Unauthorized().finish()) - .map_into_right_body(), - TokenValidationStatus::Invalid => req + } + _ => req .into_response(HttpResponse::Forbidden().finish()) .map_into_right_body(), - } - } else { - let lock = provider.read().await; - let _ = lock.get_token_validation_status(token.token.as_str()).await; - drop(lock); - req.into_response(HttpResponse::Unauthorized()) - .map_into_right_body() + }, + TokenValidationStatus::Unknown => req + .into_response(HttpResponse::Unauthorized().finish()) + .map_into_right_body(), + TokenValidationStatus::Invalid => req + .into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body(), }; Ok(res) } diff --git a/server/src/types.rs b/server/src/types.rs index 5a280699..813d4a16 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -208,7 +208,6 @@ pub trait FeaturesSource { pub trait TokenSource { async fn get_known_tokens(&self) -> EdgeResult>; async fn get_valid_tokens(&self) -> EdgeResult>; - async fn get_token_validation_status(&self, secret: &str) -> EdgeResult; async fn token_details(&self, secret: String) -> EdgeResult>; async fn filter_valid_tokens(&self, tokens: Vec) -> EdgeResult>; } @@ -242,6 +241,13 @@ pub trait TokenSink { async fn sink_tokens(&mut self, tokens: Vec) -> EdgeResult<()>; } +#[async_trait] +pub trait TokenValidator { + /// Will validate upstream, and add tokens with status from upstream to token cache. + /// Will block until verified with upstream + async fn register_tokens(&mut self, tokens: Vec) -> EdgeResult>; +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct BuildInfo { pub package_version: String, From 286f7f10e20ee9b2bef472859b5bdfcf4d25f846 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nuno=20G=C3=B3is?= Date: Fri, 10 Feb 2023 10:08:38 +0000 Subject: [PATCH 2/9] fix: token validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Christopher Kolstad Co-authored-by: Simon Hornby Co-authored-by: Gastón Fournier --- server/src/auth/token_validator.rs | 47 +++++-------- server/src/client_api.rs | 1 - server/src/data_sources/builder.rs | 10 ++- server/src/main.rs | 10 ++- server/src/middleware/validate_token.rs | 93 +++++++++++++++---------- 5 files changed, 89 insertions(+), 72 deletions(-) diff --git a/server/src/auth/token_validator.rs b/server/src/auth/token_validator.rs index e656ecf3..c0f31a98 100644 --- a/server/src/auth/token_validator.rs +++ b/server/src/auth/token_validator.rs @@ -4,10 +4,11 @@ use crate::types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken, ValidateTokensRe use std::sync::Arc; use tokio::sync::RwLock; use unleash_types::Merge; +#[derive(Clone)] pub struct TokenValidator { - unleash_client: UnleashClient, - edge_source: Arc>, - edge_sink: Option>>, + pub unleash_client: Arc, + pub edge_source: Arc>, + pub edge_sink: Arc>, } impl TokenValidator { @@ -25,36 +26,20 @@ impl TokenValidator { if tokens_with_valid_format.is_empty() { Err(EdgeError::TokenParseError) } else { - Ok(( - tokens_with_valid_format - .clone() - .iter() - .filter(|t| !source_known_tokens.iter().any(|e| e.token == t.token)) - .cloned() - .collect(), - tokens_with_valid_format - .iter() - .filter(|t| source_known_tokens.iter().any(|e| e.token == t.token)) - .cloned() - .collect(), - )) + Ok(tokens_with_valid_format + .clone() + .into_iter() + .partition(|t| !source_known_tokens.iter().any(|e| e.token == t.token))) } } pub async fn register_token(&mut self, token: String) -> EdgeResult { - let (unknown_tokens, known_tokens) = self - .get_unknown_and_known_tokens(vec![token.clone()]) - .await?; - if unknown_tokens.is_empty() { - Ok(known_tokens.get(0).unwrap().clone()) - } else { - Ok(self - .register_tokens(vec![token]) - .await? - .first() - .expect("Couldn't validate token") - .clone()) - } + Ok(self + .register_tokens(vec![token]) + .await? + .first() + .expect("Couldn't validate token") + .clone()) } pub async fn register_tokens(&mut self, tokens: Vec) -> EdgeResult> { @@ -156,7 +141,7 @@ mod tests { .expect("Couldn't build client"); let mut validation_holder = super::TokenValidator { - unleash_client, + unleash_client: Arc::new(unleash_client), edge_source: test_provider.clone(), edge_sink: test_provider.clone(), }; @@ -193,7 +178,7 @@ mod tests { crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None) .expect("Couldn't build client"); let mut validation_holder = super::TokenValidator { - unleash_client, + unleash_client: Arc::new(unleash_client), edge_source: test_provider.clone(), edge_sink: test_provider.clone(), }; diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 3ff4e429..916c8613 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -14,7 +14,6 @@ async fn features( edge_token: EdgeToken, features_source: web::Data>, ) -> EdgeJsonResult { - info!("Getting data for {edge_token:?}"); features_source .read() .await diff --git a/server/src/data_sources/builder.rs b/server/src/data_sources/builder.rs index e42bacff..40b15dc0 100644 --- a/server/src/data_sources/builder.rs +++ b/server/src/data_sources/builder.rs @@ -7,6 +7,7 @@ use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; use crate::{ + auth::token_validator::TokenValidator, cli::{CliArgs, EdgeArg, EdgeMode, OfflineArgs}, http::unleash_client::UnleashClient, types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken}, @@ -30,6 +31,7 @@ pub struct SinkInfo { pub validated_receive: mpsc::Receiver, pub unvalidated_receive: mpsc::Receiver, pub unleash_client: UnleashClient, + pub token_validator: Arc, pub metrics_interval_seconds: u64, } @@ -72,6 +74,11 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult { EdgeArg::Redis(redis_url) => build_redis(redis_url, unvalidated_sender), EdgeArg::InMemory => build_memory(unvalidated_sender), }?; + let token_validator = TokenValidator { + unleash_client: Arc::new(unleash_client.clone()), + edge_source: source.clone(), + edge_sink: sink.clone(), + }; Ok(RepositoryInfo { source, @@ -80,7 +87,8 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult { validated_send: validated_sender, validated_receive: validated_receiver, unvalidated_receive: unvalidated_receiver, - unleash_client, + unleash_client: unleash_client, + token_validator: Arc::new(token_validator), metrics_interval_seconds: edge_args.metrics_interval_seconds, }), }) diff --git a/server/src/main.rs b/server/src/main.rs index e9fd8797..1d8aae90 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -33,6 +33,7 @@ async fn main() -> Result<(), anyhow::Error> { let source = repo_info.source; let source_clone = source.clone(); let sink_info = repo_info.sink_info; + let validator = sink_info.as_ref().map(|sink| sink.token_validator.clone()); let metrics_cache = Arc::new(RwLock::new(MetricsCache::default())); let metrics_cache_clone = metrics_cache.clone(); @@ -44,10 +45,13 @@ async fn main() -> Result<(), anyhow::Error> { .send_wildcard() .allow_any_header() .allow_any_method(); - App::new() + let mut app = App::new() .app_data(edge_source) - .app_data(web::Data::from(metrics_cache.clone())) - .wrap(Etag::default()) + .app_data(web::Data::from(metrics_cache.clone())); + if validator.is_some() { + app = app.app_data(web::Data::from(validator.clone().unwrap())) + } + app.wrap(Etag::default()) .wrap(cors_middleware) .wrap(RequestTracing::new()) .wrap(request_metrics.clone()) diff --git a/server/src/middleware/validate_token.rs b/server/src/middleware/validate_token.rs index d1d148e6..82a4a872 100644 --- a/server/src/middleware/validate_token.rs +++ b/server/src/middleware/validate_token.rs @@ -1,5 +1,5 @@ use crate::auth::token_validator::TokenValidator; -use crate::types::{EdgeToken, TokenType, TokenValidationStatus}; +use crate::types::{EdgeSource, EdgeToken, TokenType, TokenValidationStatus}; use actix_web::{ body::MessageBody, dev::{ServiceRequest, ServiceResponse}, @@ -7,46 +7,67 @@ use actix_web::{ HttpResponse, }; use tokio::sync::RwLock; -use tracing::instrument; -#[instrument(skip(srv, req, validator))] pub async fn validate_token( token: EdgeToken, - validator: Data>, req: ServiceRequest, srv: crate::middleware::as_async_middleware::Next, ) -> Result, actix_web::Error> { - let mut validator_lock = validator.write().await; + let maybe_validator = req.app_data::>>(); + let source = req + .app_data::>>() + .unwrap() + .clone() + .into_inner(); - let known_token = validator_lock.register_token(token.token.clone()).await?; - let res = match known_token.status { - TokenValidationStatus::Validated => match known_token.token_type { - Some(TokenType::Frontend) => { - if req.path().contains("/api/frontend") || req.path().contains("/api/proxy") { - srv.call(req).await?.map_into_left_body() - } else { - req.into_response(HttpResponse::Forbidden().finish()) - .map_into_right_body() - } - } - Some(TokenType::Client) => { - if req.path().contains("/api/client") { - srv.call(req).await?.map_into_left_body() - } else { - req.into_response(HttpResponse::Forbidden().finish()) - .map_into_right_body() - } - } - _ => req - .into_response(HttpResponse::Forbidden().finish()) - .map_into_right_body(), - }, - TokenValidationStatus::Unknown => req - .into_response(HttpResponse::Unauthorized().finish()) - .map_into_right_body(), - TokenValidationStatus::Invalid => req - .into_response(HttpResponse::Forbidden().finish()) - .map_into_right_body(), - }; - Ok(res) + match maybe_validator { + Some(validator) => { + let known_token = validator + .write() + .await + .register_token(token.token.clone()) + .await?; + let res = match known_token.status { + TokenValidationStatus::Validated => match known_token.token_type { + Some(TokenType::Frontend) => { + if req.path().contains("/api/frontend") || req.path().contains("/api/proxy") + { + srv.call(req).await?.map_into_left_body() + } else { + req.into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body() + } + } + Some(TokenType::Client) => { + if req.path().contains("/api/client") { + srv.call(req).await?.map_into_left_body() + } else { + req.into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body() + } + } + _ => req + .into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body(), + }, + TokenValidationStatus::Unknown => req + .into_response(HttpResponse::Unauthorized().finish()) + .map_into_right_body(), + TokenValidationStatus::Invalid => req + .into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body(), + }; + Ok(res) + } + None => { + let res = match source.read().await.token_details(token.token).await? { + Some(_) => srv.call(req).await?.map_into_left_body(), + None => req + .into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body(), + }; + + Ok(res) + } + } } From d364786d88a2b41df0cf22248f7f13b6b1bdb948 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nuno=20G=C3=B3is?= Date: Fri, 10 Feb 2023 10:25:28 +0000 Subject: [PATCH 3/9] test: make token_validator tests great again --- server/src/auth/token_validator.rs | 8 ++------ server/src/data_sources/offline_provider.rs | 2 +- server/src/types.rs | 10 +++++++++- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/server/src/auth/token_validator.rs b/server/src/auth/token_validator.rs index c0f31a98..e557456e 100644 --- a/server/src/auth/token_validator.rs +++ b/server/src/auth/token_validator.rs @@ -27,7 +27,6 @@ impl TokenValidator { Err(EdgeError::TokenParseError) } else { Ok(tokens_with_valid_format - .clone() .into_iter() .partition(|t| !source_known_tokens.iter().any(|e| e.token == t.token))) } @@ -183,10 +182,7 @@ mod tests { edge_sink: test_provider.clone(), }; let invalid_tokens = vec!["jamesbond".into(), "invalidtoken".into()]; - let validated_tokens = validation_holder - .register_tokens(invalid_tokens) - .await - .expect("Couldn't register tokens"); - assert!(validated_tokens.is_empty()); + let validated_tokens = validation_holder.register_tokens(invalid_tokens).await; + assert!(validated_tokens.is_err()); } } diff --git a/server/src/data_sources/offline_provider.rs b/server/src/data_sources/offline_provider.rs index d3a9e1eb..1ac3bff7 100644 --- a/server/src/data_sources/offline_provider.rs +++ b/server/src/data_sources/offline_provider.rs @@ -75,7 +75,7 @@ impl OfflineProvider { features, valid_tokens: valid_tokens .into_iter() - .filter_map(|t| EdgeToken::try_from(t).ok()) + .map(|t| EdgeToken::offline_token(t.as_str())) .map(|t| (t.token.clone(), t)) .collect(), } diff --git a/server/src/types.rs b/server/src/types.rs index 813d4a16..7ca67d80 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -185,11 +185,19 @@ impl FromStr for EdgeToken { Err(EdgeError::TokenParseError) } } else { - Ok(EdgeToken::no_project_or_environment(s)) + Err(EdgeError::TokenParseError) } } } +impl EdgeToken { + pub fn offline_token(s: &str) -> Self { + EdgeToken::try_from(s.to_string()) + .ok() + .unwrap_or_else(|| EdgeToken::no_project_or_environment(s)) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct TokenStrings { pub tokens: Vec, From 6773deab8e1957d276a7ef2104a38b18f076c7f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nuno=20G=C3=B3is?= Date: Fri, 10 Feb 2023 10:36:46 +0000 Subject: [PATCH 4/9] test: fix tests --- server/src/data_sources/memory_provider.rs | 25 +++++++++++++--------- server/src/types.rs | 10 +++++++-- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/server/src/data_sources/memory_provider.rs b/server/src/data_sources/memory_provider.rs index de52757d..604d02b2 100644 --- a/server/src/data_sources/memory_provider.rs +++ b/server/src/data_sources/memory_provider.rs @@ -131,14 +131,16 @@ mod test { let mut provider = MemoryProvider::new(send); let _ = provider .sink_tokens(vec![EdgeToken { - token: "some_secret".into(), + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f" + .into(), ..EdgeToken::default() }]) .await; let _ = provider .sink_tokens(vec![EdgeToken { - token: "some_secret".into(), + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f" + .into(), ..EdgeToken::default() }]) .await; @@ -152,7 +154,8 @@ mod test { let mut provider = MemoryProvider::new(send); let _ = provider .sink_tokens(vec![EdgeToken { - token: "some_secret".into(), + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f" + .into(), status: TokenValidationStatus::Validated, ..EdgeToken::default() }]) @@ -160,7 +163,9 @@ mod test { assert_eq!( provider - .token_details("some_secret".into()) + .token_details( + "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into() + ) .await .expect("Could not retrieve token details") .unwrap() @@ -177,7 +182,7 @@ mod test { let token = EdgeToken { environment: Some("development".into()), projects: vec!["default".into()], - token: "some-secret".into(), + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1ft".into(), ..EdgeToken::default() }; @@ -202,11 +207,11 @@ mod test { async fn memory_provider_can_yield_list_of_validated_tokens() { let james_bond = EdgeToken { status: TokenValidationStatus::Validated, - ..EdgeToken::from_str("jamesbond").unwrap() + ..EdgeToken::from_str("*:development.jamesbond").unwrap() }; let frank_drebin = EdgeToken { status: TokenValidationStatus::Validated, - ..EdgeToken::from_str("frankdrebin").unwrap() + ..EdgeToken::from_str("*:development.frankdrebin").unwrap() }; let (send, _) = mpsc::channel::(32); @@ -216,9 +221,9 @@ mod test { .await; let valid_tokens = provider .filter_valid_tokens(vec![ - "jamesbond".into(), - "anotherinvalidone".into(), - "frankdrebin".into(), + "*:development.jamesbond".into(), + "*:development.anotherinvalidone".into(), + "*:development.frankdrebin".into(), ]) .await .unwrap(); diff --git a/server/src/types.rs b/server/src/types.rs index 7ca67d80..27f08617 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -328,9 +328,7 @@ mod tests { } } - #[test_case("943ca9171e2c884c545c5d82417a655fb77cec970cc3b78a8ff87f4406b495d0"; "old java client token")] #[test_case("demo-app:production.614a75cf68bef8703aa1bd8304938a81ec871f86ea40c975468eabd6"; "demo token with project and environment")] - #[test_case("secret-123"; "old example proxy token")] #[test_case("*:default.5fa5ac2580c7094abf0d87c68b1eeb54bdc485014aef40f9fcb0673b"; "demo token with access to all projects and default environment")] fn edge_token_from_string(token: &str) { let parsed_token = EdgeToken::from_str(token); @@ -345,6 +343,14 @@ mod tests { } } + #[test_case("943ca9171e2c884c545c5d82417a655fb77cec970cc3b78a8ff87f4406b495d0"; "old java client token")] + #[test_case("secret-123"; "old example proxy token")] + fn offline_token_from_string(token: &str) { + let offline_token = EdgeToken::offline_token(token); + assert_eq!(offline_token.environment, None); + assert!(offline_token.projects.is_empty()); + } + #[test_case( "demo-app:production", "demo-app:production" From d2523eb6da725bf66d2e9d0a641c65ac3cd74e31 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Fri, 10 Feb 2023 12:17:17 +0100 Subject: [PATCH 5/9] Store validation info and instantiate tokenvalidator behind an RwLock to match expected format for app_data call --- server/src/auth/token_validator.rs | 25 +++++++++------- server/src/client_api.rs | 1 - server/src/data_sources/builder.rs | 14 ++++----- server/src/data_sources/memory_provider.rs | 35 ++++++++-------------- server/src/data_sources/redis_provider.rs | 6 ++-- server/src/middleware/validate_token.rs | 1 - server/tests/redis_test.rs | 20 ++++--------- 7 files changed, 41 insertions(+), 61 deletions(-) diff --git a/server/src/auth/token_validator.rs b/server/src/auth/token_validator.rs index e557456e..635d6b90 100644 --- a/server/src/auth/token_validator.rs +++ b/server/src/auth/token_validator.rs @@ -21,14 +21,20 @@ impl TokenValidator { .filter_map(|t| EdgeToken::try_from(t).ok()) .collect(); - let source_known_tokens = self.edge_source.read().await.get_known_tokens().await?; - if tokens_with_valid_format.is_empty() { Err(EdgeError::TokenParseError) } else { - Ok(tokens_with_valid_format - .into_iter() - .partition(|t| !source_known_tokens.iter().any(|e| e.token == t.token))) + let mut tokens = vec![]; + for token in tokens_with_valid_format { + let known_data = self + .edge_source + .read() + .await + .token_details(token.token.clone()) + .await?; + tokens.push(known_data.unwrap_or(token)); + } + Ok(tokens.into_iter().partition(|t| t.token_type.is_none())) } } @@ -75,7 +81,6 @@ impl TokenValidator { } }) .collect(); - println!("Going to sink {} tokens", tokens_to_sink.len()); let mut sink_to_write = self.edge_sink.write().await; let _ = sink_to_write.sink_tokens(tokens_to_sink.clone()).await; Ok(tokens_to_sink.merge(known_tokens)) @@ -93,7 +98,7 @@ mod tests { use actix_web::{dev::AppConfig, web, App, HttpResponse}; use serde::{Deserialize, Serialize}; use std::sync::Arc; - use tokio::sync::{mpsc, RwLock}; + use tokio::sync::RwLock; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct EdgeTokens { @@ -132,8 +137,7 @@ mod tests { #[tokio::test] pub async fn can_validate_tokens() { use crate::types::TokenSource; - let (sender, _) = mpsc::channel::(32); - let test_provider = Arc::new(RwLock::new(MemoryProvider::new(sender))); + let test_provider = Arc::new(RwLock::new(MemoryProvider::default())); let srv = test_validation_server().await; let unleash_client = crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None) @@ -170,8 +174,7 @@ mod tests { #[tokio::test] pub async fn tokens_with_wrong_format_is_not_included() { - let (sender, _) = mpsc::channel::(32); - let test_provider = Arc::new(RwLock::new(MemoryProvider::new(sender))); + let test_provider = Arc::new(RwLock::new(MemoryProvider::default())); let srv = test_validation_server().await; let unleash_client = crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None) diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 916c8613..9117f9e7 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -3,7 +3,6 @@ use crate::types::{EdgeJsonResult, EdgeResult, EdgeSource, EdgeToken}; use actix_web::web::{self, Json}; use actix_web::{get, post, HttpRequest, HttpResponse}; use tokio::sync::RwLock; -use tracing::info; use unleash_types::client_features::ClientFeatures; use unleash_types::client_metrics::{ from_bucket_app_name_and_env, ClientApplication, ClientMetrics, diff --git a/server/src/data_sources/builder.rs b/server/src/data_sources/builder.rs index 40b15dc0..3f16d854 100644 --- a/server/src/data_sources/builder.rs +++ b/server/src/data_sources/builder.rs @@ -31,7 +31,7 @@ pub struct SinkInfo { pub validated_receive: mpsc::Receiver, pub unvalidated_receive: mpsc::Receiver, pub unleash_client: UnleashClient, - pub token_validator: Arc, + pub token_validator: Arc>, pub metrics_interval_seconds: u64, } @@ -44,13 +44,13 @@ fn build_offline(offline_args: OfflineArgs) -> EdgeResult) -> EdgeResult { - let data_source = Arc::new(RwLock::new(MemoryProvider::new(sender))); +fn build_memory(_sender: Sender) -> EdgeResult { + let data_source = Arc::new(RwLock::new(MemoryProvider::new())); Ok((data_source.clone(), data_source)) } -fn build_redis(redis_url: String, sender: Sender) -> EdgeResult { - let data_source = Arc::new(RwLock::new(RedisProvider::new(&redis_url, sender)?)); +fn build_redis(redis_url: String, _sender: Sender) -> EdgeResult { + let data_source = Arc::new(RwLock::new(RedisProvider::new(&redis_url)?)); Ok((data_source.clone(), data_source)) } @@ -87,8 +87,8 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult { validated_send: validated_sender, validated_receive: validated_receiver, unvalidated_receive: unvalidated_receiver, - unleash_client: unleash_client, - token_validator: Arc::new(token_validator), + unleash_client, + token_validator: Arc::new(RwLock::new(token_validator)), metrics_interval_seconds: edge_args.metrics_interval_seconds, }), }) diff --git a/server/src/data_sources/memory_provider.rs b/server/src/data_sources/memory_provider.rs index 604d02b2..4777ecb0 100644 --- a/server/src/data_sources/memory_provider.rs +++ b/server/src/data_sources/memory_provider.rs @@ -7,7 +7,6 @@ use crate::types::{ }; use async_trait::async_trait; use dashmap::DashMap; -use tokio::sync::mpsc::Sender; use unleash_types::client_features::ClientFeatures; use unleash_types::Merge; @@ -17,19 +16,21 @@ use super::ProjectFilter; pub struct MemoryProvider { data_store: DashMap, token_store: HashMap, - sender: Sender, } fn key(key: &EdgeToken) -> String { key.environment.clone().unwrap() } - +impl Default for MemoryProvider { + fn default() -> Self { + Self::new() + } +} impl MemoryProvider { - pub fn new(sender: Sender) -> Self { + pub fn new() -> Self { Self { data_store: DashMap::new(), token_store: HashMap::new(), - sender, } } @@ -118,17 +119,15 @@ impl FeatureSink for MemoryProvider { } #[cfg(test)] -mod test { +mod tests { use std::str::FromStr; - use tokio::sync::mpsc; use unleash_types::client_features::ClientFeature; use super::*; #[tokio::test] async fn memory_provider_correctly_deduplicates_tokens() { - let (send, _) = mpsc::channel::(32); - let mut provider = MemoryProvider::new(send); + let mut provider = MemoryProvider::default(); let _ = provider .sink_tokens(vec![EdgeToken { token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f" @@ -150,8 +149,7 @@ mod test { #[tokio::test] async fn memory_provider_correctly_determines_token_to_be_valid() { - let (send, _) = mpsc::channel::(32); - let mut provider = MemoryProvider::new(send); + let mut provider = MemoryProvider::new(); let _ = provider .sink_tokens(vec![EdgeToken { token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f" @@ -176,9 +174,7 @@ mod test { #[tokio::test] async fn memory_provider_yields_correct_response_for_token() { - let (send, _) = mpsc::channel::(32); - - let mut provider = MemoryProvider::new(send); + let mut provider = MemoryProvider::new(); let token = EdgeToken { environment: Some("development".into()), projects: vec!["default".into()], @@ -214,8 +210,7 @@ mod test { ..EdgeToken::from_str("*:development.frankdrebin").unwrap() }; - let (send, _) = mpsc::channel::(32); - let mut provider = MemoryProvider::new(send); + let mut provider = MemoryProvider::new(); let _ = provider .sink_tokens(vec![james_bond.clone(), frank_drebin.clone()]) .await; @@ -234,9 +229,7 @@ mod test { #[tokio::test] async fn memory_provider_filters_out_features_by_token() { - let (send, _) = mpsc::channel::(32); - - let mut provider = MemoryProvider::new(send); + let mut provider = MemoryProvider::new(); let token = EdgeToken { environment: Some("development".into()), projects: vec!["default".into()], @@ -273,9 +266,7 @@ mod test { #[tokio::test] async fn memory_provider_respects_all_projects_in_token() { - let (send, _) = mpsc::channel::(32); - - let mut provider = MemoryProvider::new(send); + let mut provider = MemoryProvider::new(); let token = EdgeToken { environment: Some("development".into()), projects: vec!["*".into()], diff --git a/server/src/data_sources/redis_provider.rs b/server/src/data_sources/redis_provider.rs index 62b24a33..790017c6 100644 --- a/server/src/data_sources/redis_provider.rs +++ b/server/src/data_sources/redis_provider.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use redis::AsyncCommands; use redis::{Client, Commands, RedisError}; -use tokio::sync::{mpsc::Sender, RwLock}; +use tokio::sync::RwLock; use unleash_types::client_features::{ClientFeature, ClientFeatures}; use unleash_types::Merge; @@ -19,7 +19,6 @@ use crate::{ pub struct RedisProvider { redis_client: RwLock, - sender: Sender, } impl From for EdgeError { @@ -29,10 +28,9 @@ impl From for EdgeError { } impl RedisProvider { - pub fn new(url: &str, sender: Sender) -> Result { + pub fn new(url: &str) -> Result { let client = redis::Client::open(url)?; Ok(Self { - sender, redis_client: RwLock::new(client), }) } diff --git a/server/src/middleware/validate_token.rs b/server/src/middleware/validate_token.rs index 82a4a872..006c4f2e 100644 --- a/server/src/middleware/validate_token.rs +++ b/server/src/middleware/validate_token.rs @@ -19,7 +19,6 @@ pub async fn validate_token( .unwrap() .clone() .into_inner(); - match maybe_validator { Some(validator) => { let known_token = validator diff --git a/server/tests/redis_test.rs b/server/tests/redis_test.rs index 8c0659a0..ff5139d4 100644 --- a/server/tests/redis_test.rs +++ b/server/tests/redis_test.rs @@ -2,7 +2,6 @@ use std::str::FromStr; use redis::{Client, Commands}; use testcontainers::{clients::Cli, images::redis::Redis, Container}; -use tokio::sync::mpsc; use unleash_edge::{ data_sources::redis_provider::{RedisProvider, FEATURE_PREFIX}, @@ -33,9 +32,7 @@ async fn redis_sink_returns_stores_data_correctly() { let docker = Cli::default(); let (mut client, url, _node) = setup_redis(&docker); - let (send, _) = mpsc::channel::(32); - - let mut sink: Box = Box::new(RedisProvider::new(&url, send).unwrap()); + let mut sink: Box = Box::new(RedisProvider::new(&url).unwrap()); let token = EdgeToken { status: TokenValidationStatus::Validated, @@ -67,9 +64,7 @@ async fn redis_sink_returns_merges_features_by_environment() { let docker = Cli::default(); let (mut client, url, _node) = setup_redis(&docker); - let (send, _) = mpsc::channel::(32); - - let mut sink: Box = Box::new(RedisProvider::new(&url, send).unwrap()); + let mut sink: Box = Box::new(RedisProvider::new(&url).unwrap()); let token = EdgeToken { environment: Some("some-env-2".to_string()), @@ -125,9 +120,7 @@ async fn redis_sink_returns_splits_out_data_with_different_environments() { let docker = Cli::default(); let (mut client, url, _node) = setup_redis(&docker); - let (send, _) = mpsc::channel::(32); - - let mut sink: Box = Box::new(RedisProvider::new(&url, send).unwrap()); + let mut sink: Box = Box::new(RedisProvider::new(&url).unwrap()); let dev_token = EdgeToken { status: TokenValidationStatus::Validated, @@ -193,11 +186,8 @@ async fn redis_source_filters_by_projects() { let docker = Cli::default(); let (_client, url, _node) = setup_redis(&docker); - let (send, _) = mpsc::channel::(32); - let (other_send, _) = mpsc::channel::(32); - - let source: Box = Box::new(RedisProvider::new(&url, send).unwrap()); - let mut sink: Box = Box::new(RedisProvider::new(&url, other_send).unwrap()); + let source: Box = Box::new(RedisProvider::new(&url).unwrap()); + let mut sink: Box = Box::new(RedisProvider::new(&url).unwrap()); let features = ClientFeatures { features: vec![ From 7e00cbb3cf4a9b7e4a322e76934aeed6fe3d13af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nuno=20G=C3=B3is?= Date: Fri, 10 Feb 2023 11:56:53 +0000 Subject: [PATCH 6/9] task: implement validate endpoint --- server/src/edge_api.rs | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/server/src/edge_api.rs b/server/src/edge_api.rs index e8393ebd..ca612c3c 100644 --- a/server/src/edge_api.rs +++ b/server/src/edge_api.rs @@ -1,11 +1,14 @@ use actix_web::{ post, - web::{self, Json}, - HttpResponse, + web::{self, Data, Json}, + HttpRequest, HttpResponse, }; use tokio::sync::RwLock; -use crate::types::{EdgeJsonResult, EdgeSource, TokenStrings, ValidatedTokens}; +use crate::{ + auth::token_validator::TokenValidator, + types::{EdgeJsonResult, EdgeSource, TokenStrings, TokenValidationStatus, ValidatedTokens}, +}; use crate::{ metrics::client_metrics::MetricsCache, types::{BatchMetricsRequestBody, EdgeResult}, @@ -14,16 +17,32 @@ use crate::{ #[post("/validate")] async fn validate( token_provider: web::Data>, + req: HttpRequest, tokens: Json, ) -> EdgeJsonResult { - let valid_tokens = token_provider - .read() - .await - .filter_valid_tokens(tokens.into_inner().tokens) - .await?; - Ok(Json(ValidatedTokens { - tokens: valid_tokens, - })) + let maybe_validator = req.app_data::>>(); + match maybe_validator { + Some(validator) => { + let known_tokens = validator + .write() + .await + .register_tokens(tokens.into_inner().tokens) + .await?; + Ok(Json(ValidatedTokens { + tokens: known_tokens + .into_iter() + .filter(|t| t.status == TokenValidationStatus::Validated) + .collect(), + })) + } + None => Ok(Json(ValidatedTokens { + tokens: token_provider + .read() + .await + .filter_valid_tokens(tokens.into_inner().tokens) + .await?, + })), + } } #[post("/metrics")] From c8b48d8201f405ca74b8f470da29a397ceac6382 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Fri, 10 Feb 2023 16:16:55 +0100 Subject: [PATCH 7/9] feat: now fetches data for validated tokens --- server/src/cli.rs | 4 +- server/src/data_sources/builder.rs | 8 +- server/src/data_sources/memory_provider.rs | 163 +++++++++++----- server/src/data_sources/offline_provider.rs | 6 +- server/src/data_sources/redis_provider.rs | 11 +- server/src/frontend_api.rs | 203 ++++++++++---------- server/src/http/background_refresh.rs | 71 +++---- server/src/main.rs | 7 +- server/src/types.rs | 38 ++++ server/tests/redis_test.rs | 27 ++- 10 files changed, 332 insertions(+), 206 deletions(-) diff --git a/server/src/cli.rs b/server/src/cli.rs index 7039edae..cdb85d31 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -35,8 +35,10 @@ pub struct EdgeArgs { pub unleash_url: String, #[clap(short, long, env)] pub redis_url: Option, - #[clap(short, long, env, default_value_t = 10)] + #[clap(short, long, env, default_value_t = 60)] pub metrics_interval_seconds: u64, + #[clap(short, long, env, default_value_t = 10)] + pub features_refresh_interval_seconds: i64, } #[derive(Args, Debug, Clone)] diff --git a/server/src/data_sources/builder.rs b/server/src/data_sources/builder.rs index 3f16d854..b9238182 100644 --- a/server/src/data_sources/builder.rs +++ b/server/src/data_sources/builder.rs @@ -44,8 +44,10 @@ fn build_offline(offline_args: OfflineArgs) -> EdgeResult) -> EdgeResult { - let data_source = Arc::new(RwLock::new(MemoryProvider::new())); +fn build_memory(features_refresh_interval_seconds: i64) -> EdgeResult { + let data_source = Arc::new(RwLock::new(MemoryProvider::new( + features_refresh_interval_seconds, + ))); Ok((data_source.clone(), data_source)) } @@ -72,7 +74,7 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult { let (validated_sender, validated_receiver) = mpsc::channel::(32); let (source, sink) = match arg { EdgeArg::Redis(redis_url) => build_redis(redis_url, unvalidated_sender), - EdgeArg::InMemory => build_memory(unvalidated_sender), + EdgeArg::InMemory => build_memory(edge_args.features_refresh_interval_seconds), }?; let token_validator = TokenValidator { unleash_client: Arc::new(unleash_client.clone()), diff --git a/server/src/data_sources/memory_provider.rs b/server/src/data_sources/memory_provider.rs index 4777ecb0..ecaf1f79 100644 --- a/server/src/data_sources/memory_provider.rs +++ b/server/src/data_sources/memory_provider.rs @@ -1,12 +1,15 @@ use std::collections::HashMap; -use crate::types::TokenValidationStatus; use crate::types::{ EdgeResult, EdgeSink, EdgeSource, EdgeToken, FeatureSink, FeaturesSource, TokenSink, TokenSource, }; +use crate::types::{FeatureRefresh, TokenValidationStatus}; +use actix_web::http::header::EntityTag; use async_trait::async_trait; +use chrono::{Duration, Utc}; use dashmap::DashMap; +use tracing::info; use unleash_types::client_features::ClientFeatures; use unleash_types::Merge; @@ -14,8 +17,10 @@ use super::ProjectFilter; #[derive(Debug, Clone)] pub struct MemoryProvider { + features_refresh_interval: Duration, data_store: DashMap, token_store: HashMap, + tokens_to_refresh: HashMap, } fn key(key: &EdgeToken) -> String { @@ -23,23 +28,58 @@ fn key(key: &EdgeToken) -> String { } impl Default for MemoryProvider { fn default() -> Self { - Self::new() + Self::new(10) } } impl MemoryProvider { - pub fn new() -> Self { + pub fn new(features_refresh_interval_seconds: i64) -> Self { Self { + features_refresh_interval: Duration::seconds(features_refresh_interval_seconds), data_store: DashMap::new(), token_store: HashMap::new(), + tokens_to_refresh: HashMap::new(), } } - fn sink_features(&mut self, token: &EdgeToken, features: ClientFeatures) { + fn update_last_check(&mut self, token: &EdgeToken) { + self.tokens_to_refresh + .entry(token.token.clone()) + .and_modify(|f| f.last_check = Some(Utc::now())); + } + fn sink_tokens(&mut self, tokens: Vec) { + for token in tokens { + self.token_store.insert(token.token.clone(), token.clone()); + if token.token_type == Some(crate::types::TokenType::Client) { + self.tokens_to_refresh + .insert(token.token.clone(), FeatureRefresh::new(token)); + } + } + } + + fn sink_features( + &mut self, + token: &EdgeToken, + features: ClientFeatures, + etag: Option, + ) { + info!("Sinking features"); + self.tokens_to_refresh + .entry(token.token.clone()) + .and_modify(|feature_refresh| { + feature_refresh.etag = etag.clone(); + feature_refresh.last_refreshed = Some(Utc::now()); + feature_refresh.last_check = Some(Utc::now()); + }) + .or_insert(FeatureRefresh { + token: token.clone(), + etag, + last_refreshed: Some(Utc::now()), + last_check: Some(Utc::now()), + }); self.data_store .entry(key(token)) - .and_modify(|client_features| { - let new_features = client_features.clone().merge(features.clone()); - *client_features = new_features; + .and_modify(|data| { + data.clone().merge(features.clone()); }) .or_insert(features); } @@ -48,12 +88,19 @@ impl MemoryProvider { impl EdgeSource for MemoryProvider {} impl EdgeSink for MemoryProvider {} +pub fn empty_client_features() -> ClientFeatures { + ClientFeatures { + version: 2, + features: vec![], + segments: None, + query: None, + } +} + #[async_trait] impl TokenSink for MemoryProvider { async fn sink_tokens(&mut self, tokens: Vec) -> EdgeResult<()> { - for token in &tokens { - self.token_store.insert(token.token.clone(), token.clone()); - } + self.sink_tokens(tokens); Ok(()) } } @@ -68,12 +115,7 @@ impl FeaturesSource for MemoryProvider { features: client_features.features.filter_by_projects(token), ..client_features }) - .unwrap_or_else(|| ClientFeatures { - version: 2, - features: vec![], - segments: None, - query: None, - })) + .unwrap_or_else(empty_client_features)) } } @@ -104,6 +146,29 @@ impl TokenSource for MemoryProvider { .cloned() .collect()) } + + async fn get_tokens_due_for_refresh(&self) -> EdgeResult> { + info!("Calling tokens due for refresh"); + let refreshes = self + .tokens_to_refresh + .iter() + .filter(|(_k, value)| match value.last_check { + Some(last) => { + info!( + "Last checked {last:?}. Last update: {:?}", + value.last_refreshed + ); + Utc::now() - last > self.features_refresh_interval + } + None => { + info!("No last check date, definitely need to update this"); + true + } + }) + .map(|(_k, refresh)| refresh.clone()) + .collect(); + Ok(refreshes) + } } #[async_trait] @@ -112,8 +177,13 @@ impl FeatureSink for MemoryProvider { &mut self, token: &EdgeToken, features: ClientFeatures, + etag: Option, ) -> EdgeResult<()> { - self.sink_features(token, features); + self.sink_features(token, features, etag); + Ok(()) + } + async fn update_last_check(&mut self, token: &EdgeToken) -> EdgeResult<()> { + self.update_last_check(token); Ok(()) } } @@ -123,41 +193,34 @@ mod tests { use std::str::FromStr; use unleash_types::client_features::ClientFeature; + use crate::types::into_entity_tag; + use super::*; #[tokio::test] async fn memory_provider_correctly_deduplicates_tokens() { let mut provider = MemoryProvider::default(); - let _ = provider - .sink_tokens(vec![EdgeToken { - token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f" - .into(), - ..EdgeToken::default() - }]) - .await; - - let _ = provider - .sink_tokens(vec![EdgeToken { - token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f" - .into(), - ..EdgeToken::default() - }]) - .await; + provider.sink_tokens(vec![EdgeToken { + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(), + ..EdgeToken::default() + }]); + + provider.sink_tokens(vec![EdgeToken { + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(), + ..EdgeToken::default() + }]); assert!(provider.get_known_tokens().await.unwrap().len() == 1); } #[tokio::test] async fn memory_provider_correctly_determines_token_to_be_valid() { - let mut provider = MemoryProvider::new(); - let _ = provider - .sink_tokens(vec![EdgeToken { - token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f" - .into(), - status: TokenValidationStatus::Validated, - ..EdgeToken::default() - }]) - .await; + let mut provider = MemoryProvider::default(); + provider.sink_tokens(vec![EdgeToken { + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(), + status: TokenValidationStatus::Validated, + ..EdgeToken::default() + }]); assert_eq!( provider @@ -174,7 +237,7 @@ mod tests { #[tokio::test] async fn memory_provider_yields_correct_response_for_token() { - let mut provider = MemoryProvider::new(); + let mut provider = MemoryProvider::default(); let token = EdgeToken { environment: Some("development".into()), projects: vec!["default".into()], @@ -193,7 +256,7 @@ mod tests { query: None, }; - provider.sink_features(&token, features); + provider.sink_features(&token, features.clone(), into_entity_tag(features)); let found_feature = provider.get_client_features(&token).await.unwrap().features[0].clone(); assert!(found_feature.name == *"James Bond"); @@ -210,10 +273,8 @@ mod tests { ..EdgeToken::from_str("*:development.frankdrebin").unwrap() }; - let mut provider = MemoryProvider::new(); - let _ = provider - .sink_tokens(vec![james_bond.clone(), frank_drebin.clone()]) - .await; + let mut provider = MemoryProvider::default(); + provider.sink_tokens(vec![james_bond.clone(), frank_drebin.clone()]); let valid_tokens = provider .filter_valid_tokens(vec![ "*:development.jamesbond".into(), @@ -229,7 +290,7 @@ mod tests { #[tokio::test] async fn memory_provider_filters_out_features_by_token() { - let mut provider = MemoryProvider::new(); + let mut provider = MemoryProvider::default(); let token = EdgeToken { environment: Some("development".into()), projects: vec!["default".into()], @@ -255,7 +316,7 @@ mod tests { query: None, }; - provider.sink_features(&token, features); + provider.sink_features(&token, features.clone(), into_entity_tag(features)); let all_features = provider.get_client_features(&token).await.unwrap().features; let found_feature = all_features[0].clone(); @@ -266,7 +327,7 @@ mod tests { #[tokio::test] async fn memory_provider_respects_all_projects_in_token() { - let mut provider = MemoryProvider::new(); + let mut provider = MemoryProvider::default(); let token = EdgeToken { environment: Some("development".into()), projects: vec!["*".into()], @@ -292,7 +353,7 @@ mod tests { query: None, }; - provider.sink_features(&token, features); + provider.sink_features(&token, features.clone(), into_entity_tag(features)); let all_features = provider.get_client_features(&token).await.unwrap().features; let first_feature = all_features diff --git a/server/src/data_sources/offline_provider.rs b/server/src/data_sources/offline_provider.rs index 1ac3bff7..8b14de02 100644 --- a/server/src/data_sources/offline_provider.rs +++ b/server/src/data_sources/offline_provider.rs @@ -1,6 +1,7 @@ use crate::error::EdgeError; use crate::types::{ - EdgeResult, EdgeSource, EdgeToken, FeaturesSource, TokenSource, TokenValidationStatus, + EdgeResult, EdgeSource, EdgeToken, FeatureRefresh, FeaturesSource, TokenSource, + TokenValidationStatus, }; use async_trait::async_trait; use std::collections::HashMap; @@ -49,6 +50,9 @@ impl TokenSource for OfflineProvider { .map(|(_k, t)| t) .collect()) } + async fn get_tokens_due_for_refresh(&self) -> EdgeResult> { + Ok(vec![]) + } } impl EdgeSource for OfflineProvider {} diff --git a/server/src/data_sources/redis_provider.rs b/server/src/data_sources/redis_provider.rs index 790017c6..22de82e2 100644 --- a/server/src/data_sources/redis_provider.rs +++ b/server/src/data_sources/redis_provider.rs @@ -1,3 +1,4 @@ +use actix_web::http::header::EntityTag; use async_trait::async_trait; use redis::AsyncCommands; use redis::{Client, Commands, RedisError}; @@ -8,7 +9,7 @@ use unleash_types::Merge; pub const FEATURE_PREFIX: &str = "unleash-feature-namespace:"; pub const TOKENS_KEY: &str = "unleash-token-namespace:"; -use crate::types::TokenValidationStatus; +use crate::types::{FeatureRefresh, TokenValidationStatus}; use crate::{ error::EdgeError, types::{ @@ -52,6 +53,7 @@ impl FeatureSink for RedisProvider { &mut self, token: &EdgeToken, features: ClientFeatures, + _etag: Option, ) -> EdgeResult<()> { let mut lock = self.redis_client.write().await; let mut con = lock.get_async_connection().await?; @@ -69,6 +71,10 @@ impl FeatureSink for RedisProvider { let _: () = lock.set(key, serialized_features)?; Ok(()) } + + async fn update_last_check(&mut self, _token: &EdgeToken) -> EdgeResult<()> { + todo!() + } } #[async_trait] impl TokenSink for RedisProvider { @@ -140,4 +146,7 @@ impl TokenSource for RedisProvider { let tokens = self.get_known_tokens().await?; Ok(tokens.into_iter().find(|t| t.token == secret)) } + async fn get_tokens_due_for_refresh(&self) -> EdgeResult> { + todo!() + } } diff --git a/server/src/frontend_api.rs b/server/src/frontend_api.rs index c77d0fb6..ae895ce4 100644 --- a/server/src/frontend_api.rs +++ b/server/src/frontend_api.rs @@ -123,10 +123,10 @@ pub fn configure_frontend_api(cfg: &mut web::ServiceConfig) { mod tests { use std::sync::Arc; - use crate::data_sources::builder::DataProviderPair; + use crate::data_sources::memory_provider::MemoryProvider; use crate::types::{ - EdgeResult, EdgeSink, EdgeSource, EdgeToken, FeatureSink, FeaturesSource, TokenSink, - TokenSource, + into_entity_tag, EdgeSink, EdgeSource, EdgeToken, FeatureSink, TokenSink, TokenType, + TokenValidationStatus, }; use actix_web::{ http::header::ContentType, @@ -134,7 +134,6 @@ mod tests { web::{self, Data}, App, }; - use async_trait::async_trait; use serde_json::json; use tokio::sync::RwLock; use unleash_types::{ @@ -142,75 +141,6 @@ mod tests { frontend::{EvaluatedToggle, EvaluatedVariant, FrontendResult}, }; - #[derive(Clone, Default)] - struct MockEdgeProvider { - features: Option, - } - - impl MockEdgeProvider { - fn with(self, features: ClientFeatures) -> DataProviderPair { - let provider = Arc::new(RwLock::new(MockEdgeProvider { - features: Some(features), - })); - let source: Arc> = provider.clone(); - let sink: Arc> = provider; - - (source, sink) - } - } - - #[async_trait] - impl FeaturesSource for MockEdgeProvider { - async fn get_client_features(&self, _token: &EdgeToken) -> EdgeResult { - Ok(self - .features - .as_ref() - .expect("You need to populate the mock data for your test") - .clone()) - } - } - - #[async_trait] - impl TokenSource for MockEdgeProvider { - async fn get_known_tokens(&self) -> EdgeResult> { - todo!() - } - - async fn get_valid_tokens(&self) -> EdgeResult> { - todo!() - } - - async fn token_details(&self, _secret: String) -> EdgeResult> { - todo!() - } - - async fn filter_valid_tokens(&self, _tokens: Vec) -> EdgeResult> { - todo!() - } - } - - impl EdgeSource for MockEdgeProvider {} - - #[async_trait] - impl TokenSink for MockEdgeProvider { - async fn sink_tokens(&mut self, _tokens: Vec) -> EdgeResult<()> { - todo!() - } - } - - impl EdgeSink for MockEdgeProvider {} - - #[async_trait] - impl FeatureSink for MockEdgeProvider { - async fn sink_features( - &mut self, - _token: &EdgeToken, - _features: ClientFeatures, - ) -> EdgeResult<()> { - todo!() - } - } - fn client_features_with_constraint_requiring_user_id_of_seven() -> ClientFeatures { ClientFeatures { version: 1, @@ -263,13 +193,38 @@ mod tests { #[actix_web::test] async fn calling_post_requests_resolves_context_values_correctly() { - let (source, sink) = MockEdgeProvider::default() - .with(client_features_with_constraint_requiring_user_id_of_seven()); + let shareable_provider = Arc::new(RwLock::new(MemoryProvider::default())); + let edge_source: Arc> = shareable_provider.clone(); + let edge_sink: Arc> = shareable_provider.clone(); + let token = EdgeToken::try_from( + "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7".to_string(), + ) + .expect("Valid token"); + let validated_token = EdgeToken { + token_type: Some(TokenType::Client), + status: TokenValidationStatus::Validated, + ..token + }; + let _ = shareable_provider + .write() + .await + .sink_tokens(vec![validated_token.clone()]) + .await; + let features = client_features_with_constraint_requiring_user_id_of_seven(); + let _ = shareable_provider + .write() + .await + .sink_features( + &validated_token, + features.clone(), + into_entity_tag(features), + ) + .await; let app = test::init_service( App::new() - .app_data(Data::from(source)) - .app_data(Data::from(sink)) + .app_data(Data::from(edge_source)) + .app_data(Data::from(edge_sink)) .service(web::scope("/api").service(super::post_frontend_features)), ) .await; @@ -285,34 +240,55 @@ mod tests { "userId": "7" })) .to_request(); + let second_req = test::TestRequest::post() + .uri("/api/proxy/all") + .insert_header(ContentType::json()) + .insert_header(( + "Authorization", + "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7", + )) + .set_json(json!({ + "userId": "7" + })) + .to_request(); - let result = test::call_and_read_body(&app, req).await; - - let expected = FrontendResult { - toggles: vec![EvaluatedToggle { - name: "test".into(), - enabled: true, - variant: EvaluatedVariant { - name: "disabled".into(), - enabled: false, - payload: None, - }, - impression_data: false, - }], - }; - - assert_eq!(result, serde_json::to_vec(&expected).unwrap()); + let _result: FrontendResult = test::call_and_read_body_json(&app, req).await; + let result: FrontendResult = test::call_and_read_body_json(&app, second_req).await; + assert_eq!(result.toggles.len(), 1); + assert!(result.toggles.get(0).unwrap().enabled) } #[actix_web::test] async fn calling_get_requests_resolves_context_values_correctly() { - let (source, sink) = MockEdgeProvider::default() - .with(client_features_with_constraint_requiring_user_id_of_seven()); - + let provider = Arc::new(RwLock::new(MemoryProvider::default())); + let token = EdgeToken::try_from( + "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7".to_string(), + ) + .expect("Valid token"); + let validated_token = EdgeToken { + token_type: Some(TokenType::Client), + status: TokenValidationStatus::Validated, + ..token + }; + let _ = provider + .write() + .await + .sink_tokens(vec![validated_token.clone()]) + .await; + let features = client_features_with_constraint_requiring_user_id_of_seven(); + let _ = provider + .write() + .await + .sink_features( + &validated_token, + features.clone(), + into_entity_tag(features), + ) + .await; let app = test::init_service( App::new() - .app_data(Data::from(source)) - .app_data(Data::from(sink)) + .app_data(Data::from(provider.clone())) + .app_data(Data::from(provider)) .service(web::scope("/api").service(super::get_frontend_features)), ) .await; @@ -346,13 +322,36 @@ mod tests { #[actix_web::test] async fn calling_get_requests_resolves_context_values_correctly_with_enabled_filter() { - let (source, sink) = MockEdgeProvider::default() - .with(client_features_with_constraint_one_enabled_toggle_and_one_disabled_toggle()); + let provider = Arc::new(RwLock::new(MemoryProvider::default())); + let token = EdgeToken::try_from( + "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7".to_string(), + ) + .expect("Valid token"); + let validated_token = EdgeToken { + token_type: Some(TokenType::Client), + status: TokenValidationStatus::Validated, + ..token + }; + let _ = provider + .write() + .await + .sink_tokens(vec![validated_token.clone()]) + .await; + let features = client_features_with_constraint_one_enabled_toggle_and_one_disabled_toggle(); + let _ = provider + .write() + .await + .sink_features( + &validated_token, + features.clone(), + into_entity_tag(features), + ) + .await; let app = test::init_service( App::new() - .app_data(Data::from(source)) - .app_data(Data::from(sink)) + .app_data(Data::from(provider.clone())) + .app_data(Data::from(provider)) .service(web::scope("/api").service(super::get_enabled_frontend_features)), ) .await; diff --git a/server/src/http/background_refresh.rs b/server/src/http/background_refresh.rs index 838e6276..fdf4863c 100644 --- a/server/src/http/background_refresh.rs +++ b/server/src/http/background_refresh.rs @@ -1,8 +1,8 @@ -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use crate::types::{ - ClientFeaturesRequest, ClientFeaturesResponse, EdgeSink, EdgeToken, TokenType, - TokenValidationStatus, ValidateTokensRequest, + ClientFeaturesRequest, ClientFeaturesResponse, EdgeSink, EdgeSource, EdgeToken, + ValidateTokensRequest, }; use tokio::sync::{mpsc::Receiver, mpsc::Sender, RwLock}; use tracing::{info, warn}; @@ -49,48 +49,49 @@ pub async fn poll_for_token_status( } pub async fn refresh_features( - mut channel: Receiver, + source: Arc>, sink: Arc>, unleash_client: UnleashClient, ) { - let mut tokens = HashSet::new(); loop { tokio::select! { - token = channel.recv() => { // Got a new token - if let Some(token) = token { - if token.token_type == Some(TokenType::Client) && token.status == TokenValidationStatus::Validated { - tokens.insert(token); - } - } else { - break; - } - } - , - _ = tokio::time::sleep(Duration::from_secs(10)) => { // Iterating over known tokens - let mut write_lock = sink.write().await; - info!("Updating features for known tokens. Know of {} tokens", tokens.len()); - for token in tokens.iter() { - let features_result = unleash_client.get_client_features(ClientFeaturesRequest { - api_key: token.token.clone(), - etag: None, - }).await; - match features_result { - Ok(feature_response) => match feature_response { - ClientFeaturesResponse::NoUpdate(_) => info!("No update needed"), - ClientFeaturesResponse::Updated(features, _) => { - info!("Got updated client features. Writing to sink {features:?}"); - let sink_result = write_lock.sink_features(token, features).await; - if let Err(err) = sink_result { - warn!("Failed to sink features in updater {err:?}"); + _ = tokio::time::sleep(Duration::from_secs(5)) => { + let read_lock = source.read().await; + let to_refresh = read_lock.get_tokens_due_for_refresh().await; + drop(read_lock); + if let Ok(refreshes) = to_refresh { + info!("Had {} tokens to refresh", refreshes.len()); + for refresh in refreshes { + info!("{refresh:?}"); + let features_result = unleash_client.get_client_features(ClientFeaturesRequest { + api_key: refresh.token.token.clone(), + etag: refresh.etag, + }).await; + + match features_result { + Ok(feature_response) => match feature_response { + ClientFeaturesResponse::NoUpdate(_) => { + info!("No update needed, will update last check time"); + let mut write_lock = sink.write().await; + let _ = write_lock.update_last_check(&refresh.token).await; + } + ClientFeaturesResponse::Updated(features, etag) => { + info!("Got updated client features. Writing to sink {features:?}"); + let mut write_lock = sink.write().await; + let sink_result = write_lock.sink_features(&refresh.token, features, etag).await; + drop(write_lock); + if let Err(err) = sink_result { + warn!("Failed to sink features in updater {err:?}"); + } } + }, + Err(e) => { + warn!("Couldn't refresh features: {e:?}"); } - }, - Err(e) => { - warn!("Couldn't refresh features: {e:?}"); } } } - }, + } } } } diff --git a/server/src/main.rs b/server/src/main.rs index 1d8aae90..54d7030a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -14,7 +14,7 @@ use unleash_edge::client_api; use unleash_edge::data_sources::builder::build_source_and_sink; use unleash_edge::edge_api; use unleash_edge::frontend_api; -use unleash_edge::http::background_refresh::{poll_for_token_status, refresh_features}; +use unleash_edge::http::background_refresh::refresh_features; use unleash_edge::http::background_send_metrics::send_metrics_task; use unleash_edge::internal_backstage; use unleash_edge::metrics::client_metrics::MetricsCache; @@ -88,10 +88,7 @@ async fn main() -> Result<(), anyhow::Error> { _ = server.run() => { tracing::info!("Actix was shutdown properly"); }, - _ = poll_for_token_status(sink_info.unvalidated_receive, sink_info.validated_send.clone(), sink_info.sink.clone(), sink_info.unleash_client.clone()) => { - tracing::info!("Token validator task is shutting down") - }, - _ = refresh_features(sink_info.validated_receive, sink_info.sink, sink_info.unleash_client.clone()) => { + _ = refresh_features(source_clone.clone(), sink_info.sink, sink_info.unleash_client.clone()) => { tracing::info!("Refresh task is shutting down"); }, _ = send_metrics_task(metrics_cache_clone, source_clone, sink_info.unleash_client, sink_info.metrics_interval_seconds) => { diff --git a/server/src/types.rs b/server/src/types.rs index 27f08617..f03cb233 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::{ future::{ready, Ready}, hash::{Hash, Hasher}, @@ -218,6 +219,37 @@ pub trait TokenSource { async fn get_valid_tokens(&self) -> EdgeResult>; async fn token_details(&self, secret: String) -> EdgeResult>; async fn filter_valid_tokens(&self, tokens: Vec) -> EdgeResult>; + async fn get_tokens_due_for_refresh(&self) -> EdgeResult>; +} + +#[derive(Clone)] +pub struct FeatureRefresh { + pub token: EdgeToken, + pub etag: Option, + pub last_refreshed: Option>, + pub last_check: Option>, +} + +impl FeatureRefresh { + pub fn new(token: EdgeToken) -> Self { + Self { + token, + etag: None, + last_refreshed: None, + last_check: None, + } + } +} + +impl fmt::Debug for FeatureRefresh { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FeatureRefresh") + .field("token", &"***") + .field("etag", &self.etag) + .field("last_refreshed", &self.last_refreshed) + .field("last_check", &self.last_check) + .finish() + } } pub trait EdgeSource: FeaturesSource + TokenSource + Send + Sync {} @@ -229,7 +261,13 @@ pub trait FeatureSink { &mut self, token: &EdgeToken, features: ClientFeatures, + etag: Option, ) -> EdgeResult<()>; + async fn update_last_check(&mut self, token: &EdgeToken) -> EdgeResult<()>; +} + +pub fn into_entity_tag(client_features: ClientFeatures) -> Option { + client_features.xx3_hash().ok().map(EntityTag::new_weak) } #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/server/tests/redis_test.rs b/server/tests/redis_test.rs index ff5139d4..a7da3264 100644 --- a/server/tests/redis_test.rs +++ b/server/tests/redis_test.rs @@ -1,11 +1,12 @@ use std::str::FromStr; +use actix_web::http::header::EntityTag; use redis::{Client, Commands}; use testcontainers::{clients::Cli, images::redis::Redis, Container}; use unleash_edge::{ data_sources::redis_provider::{RedisProvider, FEATURE_PREFIX}, - types::{EdgeSink, EdgeSource, EdgeToken, TokenValidationStatus}, + types::{into_entity_tag, EdgeSink, EdgeSource, EdgeToken, TokenValidationStatus}, }; use unleash_types::client_features::{ClientFeature, ClientFeatures}; @@ -53,7 +54,9 @@ async fn redis_sink_returns_stores_data_correctly() { let key = build_features_key(&token); - sink.sink_features(&token, features.clone()).await.unwrap(); + sink.sink_features(&token, features.clone(), into_entity_tag(features.clone())) + .await + .unwrap(); let stored_features: String = client.get::<&str, String>(key.as_str()).unwrap(); let stored_features: ClientFeatures = serde_json::from_str(&stored_features).unwrap(); assert_eq!(stored_features, features.clone()); @@ -85,7 +88,9 @@ async fn redis_sink_returns_merges_features_by_environment() { version: 2, }; - sink.sink_features(&token, features1.clone()).await.unwrap(); + sink.sink_features(&token, features1.clone(), into_entity_tag(features1)) + .await + .unwrap(); let features2 = ClientFeatures { features: vec![ClientFeature { @@ -97,7 +102,9 @@ async fn redis_sink_returns_merges_features_by_environment() { version: 2, }; - sink.sink_features(&token, features2.clone()).await.unwrap(); + sink.sink_features(&token, features2.clone(), into_entity_tag(features2)) + .await + .unwrap(); let first_expected_toggle = ClientFeature { name: "some-other-test".to_string(), @@ -148,7 +155,7 @@ async fn redis_sink_returns_splits_out_data_with_different_environments() { version: 2, }; - sink.sink_features(&dev_token, features1.clone()) + sink.sink_features(&dev_token, features1.clone(), into_entity_tag(features1)) .await .unwrap(); @@ -162,7 +169,7 @@ async fn redis_sink_returns_splits_out_data_with_different_environments() { version: 2, }; - sink.sink_features(&prod_token, features2.clone()) + sink.sink_features(&prod_token, features2.clone(), into_entity_tag(features2)) .await .unwrap(); @@ -225,7 +232,13 @@ async fn redis_source_filters_by_projects() { version: 2, }; - sink.sink_features(&token, features.clone()).await.unwrap(); + sink.sink_features( + &token, + features.clone(), + Some(EntityTag::new_weak(features.xx3_hash().unwrap())), + ) + .await + .unwrap(); let stored_features = source.get_client_features(&token).await.unwrap(); assert_eq!(stored_features, expected); From 725b536a0d693c27318447d8f56f9fbfe1cd35f1 Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Fri, 10 Feb 2023 16:22:51 +0100 Subject: [PATCH 8/9] fix: failing frontend tests --- server/src/frontend_api.rs | 113 +++++++++---------------------------- 1 file changed, 27 insertions(+), 86 deletions(-) diff --git a/server/src/frontend_api.rs b/server/src/frontend_api.rs index ae895ce4..fb3f683a 100644 --- a/server/src/frontend_api.rs +++ b/server/src/frontend_api.rs @@ -123,11 +123,8 @@ pub fn configure_frontend_api(cfg: &mut web::ServiceConfig) { mod tests { use std::sync::Arc; - use crate::data_sources::memory_provider::MemoryProvider; - use crate::types::{ - into_entity_tag, EdgeSink, EdgeSource, EdgeToken, FeatureSink, TokenSink, TokenType, - TokenValidationStatus, - }; + use crate::data_sources::offline_provider::OfflineProvider; + use crate::types::EdgeSource; use actix_web::{ http::header::ContentType, test, @@ -193,38 +190,18 @@ mod tests { #[actix_web::test] async fn calling_post_requests_resolves_context_values_correctly() { - let shareable_provider = Arc::new(RwLock::new(MemoryProvider::default())); + let shareable_provider = Arc::new(RwLock::new(OfflineProvider::new( + client_features_with_constraint_requiring_user_id_of_seven(), + vec![ + "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" + .to_string(), + ], + ))); let edge_source: Arc> = shareable_provider.clone(); - let edge_sink: Arc> = shareable_provider.clone(); - let token = EdgeToken::try_from( - "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7".to_string(), - ) - .expect("Valid token"); - let validated_token = EdgeToken { - token_type: Some(TokenType::Client), - status: TokenValidationStatus::Validated, - ..token - }; - let _ = shareable_provider - .write() - .await - .sink_tokens(vec![validated_token.clone()]) - .await; - let features = client_features_with_constraint_requiring_user_id_of_seven(); - let _ = shareable_provider - .write() - .await - .sink_features( - &validated_token, - features.clone(), - into_entity_tag(features), - ) - .await; let app = test::init_service( App::new() .app_data(Data::from(edge_source)) - .app_data(Data::from(edge_sink)) .service(web::scope("/api").service(super::post_frontend_features)), ) .await; @@ -260,35 +237,17 @@ mod tests { #[actix_web::test] async fn calling_get_requests_resolves_context_values_correctly() { - let provider = Arc::new(RwLock::new(MemoryProvider::default())); - let token = EdgeToken::try_from( - "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7".to_string(), - ) - .expect("Valid token"); - let validated_token = EdgeToken { - token_type: Some(TokenType::Client), - status: TokenValidationStatus::Validated, - ..token - }; - let _ = provider - .write() - .await - .sink_tokens(vec![validated_token.clone()]) - .await; - let features = client_features_with_constraint_requiring_user_id_of_seven(); - let _ = provider - .write() - .await - .sink_features( - &validated_token, - features.clone(), - into_entity_tag(features), - ) - .await; + let shareable_provider = Arc::new(RwLock::new(OfflineProvider::new( + client_features_with_constraint_requiring_user_id_of_seven(), + vec![ + "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" + .to_string(), + ], + ))); + let edge_source: Arc> = shareable_provider.clone(); let app = test::init_service( App::new() - .app_data(Data::from(provider.clone())) - .app_data(Data::from(provider)) + .app_data(Data::from(edge_source.clone())) .service(web::scope("/api").service(super::get_frontend_features)), ) .await; @@ -322,36 +281,18 @@ mod tests { #[actix_web::test] async fn calling_get_requests_resolves_context_values_correctly_with_enabled_filter() { - let provider = Arc::new(RwLock::new(MemoryProvider::default())); - let token = EdgeToken::try_from( - "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7".to_string(), - ) - .expect("Valid token"); - let validated_token = EdgeToken { - token_type: Some(TokenType::Client), - status: TokenValidationStatus::Validated, - ..token - }; - let _ = provider - .write() - .await - .sink_tokens(vec![validated_token.clone()]) - .await; - let features = client_features_with_constraint_one_enabled_toggle_and_one_disabled_toggle(); - let _ = provider - .write() - .await - .sink_features( - &validated_token, - features.clone(), - into_entity_tag(features), - ) - .await; + let shareable_provider = Arc::new(RwLock::new(OfflineProvider::new( + client_features_with_constraint_one_enabled_toggle_and_one_disabled_toggle(), + vec![ + "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" + .to_string(), + ], + ))); + let edge_source: Arc> = shareable_provider.clone(); let app = test::init_service( App::new() - .app_data(Data::from(provider.clone())) - .app_data(Data::from(provider)) + .app_data(Data::from(edge_source.clone())) .service(web::scope("/api").service(super::get_enabled_frontend_features)), ) .await; From f1de2bcfa319ca712dd50664aead613b1a5912cb Mon Sep 17 00:00:00 2001 From: Christopher Kolstad Date: Fri, 10 Feb 2023 19:02:04 +0100 Subject: [PATCH 9/9] task: prewarm keys/features cache with client-keys as args to edge mode --- server/src/cli.rs | 8 ++++++++ server/src/data_sources/builder.rs | 8 +++++--- server/src/main.rs | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/server/src/cli.rs b/server/src/cli.rs index cdb85d31..f05eab31 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -31,14 +31,22 @@ impl From for EdgeArg { .args(["redis_url"]), ))] pub struct EdgeArgs { + /// Where is your upstream URL. Remember, this is the URL to your instance, without any trailing /api suffix #[clap(short, long, env)] pub unleash_url: String, + #[clap(short, long, env)] pub redis_url: Option, + /// How often should we post metrics upstream? #[clap(short, long, env, default_value_t = 60)] pub metrics_interval_seconds: u64, + /// How long between each refresh for a token #[clap(short, long, env, default_value_t = 10)] pub features_refresh_interval_seconds: i64, + + /// Get data for these client keys at startup. Hot starts your feature cache + #[clap(short, long, env)] + pub client_keys: Vec, } #[derive(Args, Debug, Clone)] diff --git a/server/src/data_sources/builder.rs b/server/src/data_sources/builder.rs index b9238182..2857177e 100644 --- a/server/src/data_sources/builder.rs +++ b/server/src/data_sources/builder.rs @@ -56,7 +56,7 @@ fn build_redis(redis_url: String, _sender: Sender) -> EdgeResult EdgeResult { +pub async fn build_source_and_sink(args: CliArgs) -> EdgeResult { match args.mode { EdgeMode::Offline(offline_args) => { let source = build_offline(offline_args)?; @@ -76,12 +76,14 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult { EdgeArg::Redis(redis_url) => build_redis(redis_url, unvalidated_sender), EdgeArg::InMemory => build_memory(edge_args.features_refresh_interval_seconds), }?; - let token_validator = TokenValidator { + let mut token_validator = TokenValidator { unleash_client: Arc::new(unleash_client.clone()), edge_source: source.clone(), edge_sink: sink.clone(), }; - + if !edge_args.client_keys.is_empty() { + let _ = token_validator.register_tokens(edge_args.client_keys).await; + } Ok(RepositoryInfo { source, sink_info: Some(SinkInfo { diff --git a/server/src/main.rs b/server/src/main.rs index 54d7030a..a47e917c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -29,7 +29,7 @@ async fn main() -> Result<(), anyhow::Error> { let args = CliArgs::parse(); let http_args = args.clone().http; let (metrics_handler, request_metrics) = prom_metrics::instantiate(None); - let repo_info = build_source_and_sink(args).unwrap(); + let repo_info = build_source_and_sink(args).await.unwrap(); let source = repo_info.source; let source_clone = source.clone(); let sink_info = repo_info.sink_info;