From 749b3ad08de04644d0182d891e4f097dc0c438f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nuno=20G=C3=B3is?= Date: Sat, 11 Feb 2023 11:08:07 +0000 Subject: [PATCH] task: token validator (#58) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * task: add token validator Co-authored-by: Christopher Kolstad Co-authored-by: Gastón Fournier Co-authored-by: Simon Hornby * fix: token validation Co-authored-by: Christopher Kolstad Co-authored-by: Simon Hornby Co-authored-by: Gastón Fournier * test: make token_validator tests great again * test: fix tests * Store validation info and instantiate tokenvalidator behind an RwLock to match expected format for app_data call * task: implement validate endpoint * feat: now fetches data for validated tokens * fix: failing frontend tests * task: prewarm keys/features cache with client-keys as args to edge mode --------- Co-authored-by: Christopher Kolstad Co-authored-by: Gastón Fournier Co-authored-by: Simon Hornby Co-authored-by: Christopher Kolstad --- server/src/auth/mod.rs | 1 + server/src/auth/token_validator.rs | 191 ++++++++++++++++++ server/src/cli.rs | 12 +- server/src/client_api.rs | 2 - server/src/data_sources/builder.rs | 26 ++- server/src/data_sources/memory_provider.rs | 213 ++++++++++++-------- server/src/data_sources/offline_provider.rs | 17 +- server/src/data_sources/redis_provider.rs | 34 ++-- server/src/edge_api.rs | 41 +++- server/src/frontend_api.rs | 155 ++++---------- server/src/http/background_refresh.rs | 71 +++---- server/src/lib.rs | 1 + server/src/main.rs | 19 +- server/src/middleware/validate_token.rs | 93 +++++---- server/src/types.rs | 66 +++++- server/tests/redis_test.rs | 47 +++-- 16 files changed, 630 insertions(+), 359 deletions(-) create mode 100644 server/src/auth/mod.rs create mode 100644 server/src/auth/token_validator.rs diff --git a/server/src/auth/mod.rs b/server/src/auth/mod.rs new file mode 100644 index 00000000..40cd9758 --- /dev/null +++ b/server/src/auth/mod.rs @@ -0,0 +1 @@ +pub mod token_validator; diff --git a/server/src/auth/token_validator.rs b/server/src/auth/token_validator.rs new file mode 100644 index 00000000..635d6b90 --- /dev/null +++ b/server/src/auth/token_validator.rs @@ -0,0 +1,191 @@ +use crate::error::EdgeError; +use crate::http::unleash_client::UnleashClient; +use crate::types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken, ValidateTokensRequest}; +use std::sync::Arc; +use tokio::sync::RwLock; +use unleash_types::Merge; +#[derive(Clone)] +pub struct TokenValidator { + pub unleash_client: Arc, + pub edge_source: Arc>, + pub edge_sink: Arc>, +} + +impl TokenValidator { + async fn get_unknown_and_known_tokens( + &mut self, + tokens: Vec, + ) -> EdgeResult<(Vec, Vec)> { + let tokens_with_valid_format: Vec = tokens + .into_iter() + .filter_map(|t| EdgeToken::try_from(t).ok()) + .collect(); + + if tokens_with_valid_format.is_empty() { + Err(EdgeError::TokenParseError) + } else { + let mut tokens = vec![]; + for token in tokens_with_valid_format { + let known_data = self + .edge_source + .read() + .await + .token_details(token.token.clone()) + .await?; + tokens.push(known_data.unwrap_or(token)); + } + Ok(tokens.into_iter().partition(|t| t.token_type.is_none())) + } + } + + pub async fn register_token(&mut self, token: String) -> EdgeResult { + Ok(self + .register_tokens(vec![token]) + .await? + .first() + .expect("Couldn't validate token") + .clone()) + } + + pub async fn register_tokens(&mut self, tokens: Vec) -> EdgeResult> { + let (unknown_tokens, known_tokens) = self.get_unknown_and_known_tokens(tokens).await?; + if unknown_tokens.is_empty() { + Ok(known_tokens) + } else { + let token_strings_to_validate: Vec = + unknown_tokens.iter().map(|t| t.token.clone()).collect(); + + let validation_result = self + .unleash_client + .validate_tokens(ValidateTokensRequest { + tokens: token_strings_to_validate, + }) + .await?; + + let tokens_to_sink: Vec = unknown_tokens + .into_iter() + .map(|maybe_valid| { + if let Some(validated_token) = validation_result + .iter() + .find(|v| maybe_valid.token == v.token) + { + EdgeToken { + status: crate::types::TokenValidationStatus::Validated, + ..validated_token.clone() + } + } else { + EdgeToken { + status: crate::types::TokenValidationStatus::Invalid, + ..maybe_valid + } + } + }) + .collect(); + let mut sink_to_write = self.edge_sink.write().await; + let _ = sink_to_write.sink_tokens(tokens_to_sink.clone()).await; + Ok(tokens_to_sink.merge(known_tokens)) + } + } +} + +#[cfg(test)] +mod tests { + use crate::data_sources::memory_provider::MemoryProvider; + use crate::types::{EdgeToken, TokenType, TokenValidationStatus}; + use actix_http::HttpService; + use actix_http_test::{test_server, TestServer}; + use actix_service::map_config; + use actix_web::{dev::AppConfig, web, App, HttpResponse}; + use serde::{Deserialize, Serialize}; + use std::sync::Arc; + use tokio::sync::RwLock; + + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct EdgeTokens { + pub tokens: Vec, + } + + async fn return_validated_tokens() -> HttpResponse { + HttpResponse::Ok().json(EdgeTokens { + tokens: valid_tokens(), + }) + } + + fn valid_tokens() -> Vec { + vec![EdgeToken { + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(), + projects: vec!["*".into()], + environment: Some("development".into()), + token_type: Some(TokenType::Client), + status: TokenValidationStatus::Validated, + }] + } + + async fn test_validation_server() -> TestServer { + test_server(move || { + HttpService::new(map_config( + App::new().service( + web::resource("/edge/validate").route(web::post().to(return_validated_tokens)), + ), + |_| AppConfig::default(), + )) + .tcp() + }) + .await + } + + #[tokio::test] + pub async fn can_validate_tokens() { + use crate::types::TokenSource; + let test_provider = Arc::new(RwLock::new(MemoryProvider::default())); + let srv = test_validation_server().await; + let unleash_client = + crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None) + .expect("Couldn't build client"); + + let mut validation_holder = super::TokenValidator { + unleash_client: Arc::new(unleash_client), + edge_source: test_provider.clone(), + edge_sink: test_provider.clone(), + }; + let tokens_to_validate = vec![ + "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(), + "*:production.abcdef1234567890".into(), + ]; + validation_holder + .register_tokens(tokens_to_validate) + .await + .expect("Couldn't register tokens"); + let known_tokens = test_provider + .read() + .await + .get_known_tokens() + .await + .expect("Couldn't get tokens"); + assert_eq!(known_tokens.len(), 2); + assert!(known_tokens.iter().any(|t| t.token + == "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f" + && t.status == TokenValidationStatus::Validated)); + assert!(known_tokens + .iter() + .any(|t| t.token == "*:production.abcdef1234567890" + && t.status == TokenValidationStatus::Invalid)); + } + + #[tokio::test] + pub async fn tokens_with_wrong_format_is_not_included() { + let test_provider = Arc::new(RwLock::new(MemoryProvider::default())); + let srv = test_validation_server().await; + let unleash_client = + crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None) + .expect("Couldn't build client"); + let mut validation_holder = super::TokenValidator { + unleash_client: Arc::new(unleash_client), + edge_source: test_provider.clone(), + edge_sink: test_provider.clone(), + }; + let invalid_tokens = vec!["jamesbond".into(), "invalidtoken".into()]; + let validated_tokens = validation_holder.register_tokens(invalid_tokens).await; + assert!(validated_tokens.is_err()); + } +} diff --git a/server/src/cli.rs b/server/src/cli.rs index 7039edae..f05eab31 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -31,12 +31,22 @@ impl From for EdgeArg { .args(["redis_url"]), ))] pub struct EdgeArgs { + /// Where is your upstream URL. Remember, this is the URL to your instance, without any trailing /api suffix #[clap(short, long, env)] pub unleash_url: String, + #[clap(short, long, env)] pub redis_url: Option, - #[clap(short, long, env, default_value_t = 10)] + /// How often should we post metrics upstream? + #[clap(short, long, env, default_value_t = 60)] pub metrics_interval_seconds: u64, + /// How long between each refresh for a token + #[clap(short, long, env, default_value_t = 10)] + pub features_refresh_interval_seconds: i64, + + /// Get data for these client keys at startup. Hot starts your feature cache + #[clap(short, long, env)] + pub client_keys: Vec, } #[derive(Args, Debug, Clone)] diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 3ff4e429..9117f9e7 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -3,7 +3,6 @@ use crate::types::{EdgeJsonResult, EdgeResult, EdgeSource, EdgeToken}; use actix_web::web::{self, Json}; use actix_web::{get, post, HttpRequest, HttpResponse}; use tokio::sync::RwLock; -use tracing::info; use unleash_types::client_features::ClientFeatures; use unleash_types::client_metrics::{ from_bucket_app_name_and_env, ClientApplication, ClientMetrics, @@ -14,7 +13,6 @@ async fn features( edge_token: EdgeToken, features_source: web::Data>, ) -> EdgeJsonResult { - info!("Getting data for {edge_token:?}"); features_source .read() .await diff --git a/server/src/data_sources/builder.rs b/server/src/data_sources/builder.rs index e42bacff..2857177e 100644 --- a/server/src/data_sources/builder.rs +++ b/server/src/data_sources/builder.rs @@ -7,6 +7,7 @@ use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; use crate::{ + auth::token_validator::TokenValidator, cli::{CliArgs, EdgeArg, EdgeMode, OfflineArgs}, http::unleash_client::UnleashClient, types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken}, @@ -30,6 +31,7 @@ pub struct SinkInfo { pub validated_receive: mpsc::Receiver, pub unvalidated_receive: mpsc::Receiver, pub unleash_client: UnleashClient, + pub token_validator: Arc>, pub metrics_interval_seconds: u64, } @@ -42,17 +44,19 @@ fn build_offline(offline_args: OfflineArgs) -> EdgeResult) -> EdgeResult { - let data_source = Arc::new(RwLock::new(MemoryProvider::new(sender))); +fn build_memory(features_refresh_interval_seconds: i64) -> EdgeResult { + let data_source = Arc::new(RwLock::new(MemoryProvider::new( + features_refresh_interval_seconds, + ))); Ok((data_source.clone(), data_source)) } -fn build_redis(redis_url: String, sender: Sender) -> EdgeResult { - let data_source = Arc::new(RwLock::new(RedisProvider::new(&redis_url, sender)?)); +fn build_redis(redis_url: String, _sender: Sender) -> EdgeResult { + let data_source = Arc::new(RwLock::new(RedisProvider::new(&redis_url)?)); Ok((data_source.clone(), data_source)) } -pub fn build_source_and_sink(args: CliArgs) -> EdgeResult { +pub async fn build_source_and_sink(args: CliArgs) -> EdgeResult { match args.mode { EdgeMode::Offline(offline_args) => { let source = build_offline(offline_args)?; @@ -70,9 +74,16 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult { let (validated_sender, validated_receiver) = mpsc::channel::(32); let (source, sink) = match arg { EdgeArg::Redis(redis_url) => build_redis(redis_url, unvalidated_sender), - EdgeArg::InMemory => build_memory(unvalidated_sender), + EdgeArg::InMemory => build_memory(edge_args.features_refresh_interval_seconds), }?; - + let mut token_validator = TokenValidator { + unleash_client: Arc::new(unleash_client.clone()), + edge_source: source.clone(), + edge_sink: sink.clone(), + }; + if !edge_args.client_keys.is_empty() { + let _ = token_validator.register_tokens(edge_args.client_keys).await; + } Ok(RepositoryInfo { source, sink_info: Some(SinkInfo { @@ -81,6 +92,7 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult { validated_receive: validated_receiver, unvalidated_receive: unvalidated_receiver, unleash_client, + token_validator: Arc::new(RwLock::new(token_validator)), metrics_interval_seconds: edge_args.metrics_interval_seconds, }), }) diff --git a/server/src/data_sources/memory_provider.rs b/server/src/data_sources/memory_provider.rs index e059e2a0..ecaf1f79 100644 --- a/server/src/data_sources/memory_provider.rs +++ b/server/src/data_sources/memory_provider.rs @@ -1,13 +1,15 @@ use std::collections::HashMap; -use crate::types::TokenValidationStatus; use crate::types::{ EdgeResult, EdgeSink, EdgeSource, EdgeToken, FeatureSink, FeaturesSource, TokenSink, TokenSource, }; +use crate::types::{FeatureRefresh, TokenValidationStatus}; +use actix_web::http::header::EntityTag; use async_trait::async_trait; +use chrono::{Duration, Utc}; use dashmap::DashMap; -use tokio::sync::mpsc::Sender; +use tracing::info; use unleash_types::client_features::ClientFeatures; use unleash_types::Merge; @@ -15,30 +17,69 @@ use super::ProjectFilter; #[derive(Debug, Clone)] pub struct MemoryProvider { + features_refresh_interval: Duration, data_store: DashMap, token_store: HashMap, - sender: Sender, + tokens_to_refresh: HashMap, } fn key(key: &EdgeToken) -> String { key.environment.clone().unwrap() } - +impl Default for MemoryProvider { + fn default() -> Self { + Self::new(10) + } +} impl MemoryProvider { - pub fn new(sender: Sender) -> Self { + pub fn new(features_refresh_interval_seconds: i64) -> Self { Self { + features_refresh_interval: Duration::seconds(features_refresh_interval_seconds), data_store: DashMap::new(), token_store: HashMap::new(), - sender, + tokens_to_refresh: HashMap::new(), } } - fn sink_features(&mut self, token: &EdgeToken, features: ClientFeatures) { + fn update_last_check(&mut self, token: &EdgeToken) { + self.tokens_to_refresh + .entry(token.token.clone()) + .and_modify(|f| f.last_check = Some(Utc::now())); + } + fn sink_tokens(&mut self, tokens: Vec) { + for token in tokens { + self.token_store.insert(token.token.clone(), token.clone()); + if token.token_type == Some(crate::types::TokenType::Client) { + self.tokens_to_refresh + .insert(token.token.clone(), FeatureRefresh::new(token)); + } + } + } + + fn sink_features( + &mut self, + token: &EdgeToken, + features: ClientFeatures, + etag: Option, + ) { + info!("Sinking features"); + self.tokens_to_refresh + .entry(token.token.clone()) + .and_modify(|feature_refresh| { + feature_refresh.etag = etag.clone(); + feature_refresh.last_refreshed = Some(Utc::now()); + feature_refresh.last_check = Some(Utc::now()); + }) + .or_insert(FeatureRefresh { + token: token.clone(), + etag, + last_refreshed: Some(Utc::now()), + last_check: Some(Utc::now()), + }); self.data_store .entry(key(token)) - .and_modify(|client_features| { - let new_features = client_features.clone().merge(features.clone()); - *client_features = new_features; + .and_modify(|data| { + data.clone().merge(features.clone()); }) .or_insert(features); } @@ -47,12 +88,19 @@ impl MemoryProvider { impl EdgeSource for MemoryProvider {} impl EdgeSink for MemoryProvider {} +pub fn empty_client_features() -> ClientFeatures { + ClientFeatures { + version: 2, + features: vec![], + segments: None, + query: None, + } +} + #[async_trait] impl TokenSink for MemoryProvider { async fn sink_tokens(&mut self, tokens: Vec) -> EdgeResult<()> { - for token in &tokens { - self.token_store.insert(token.token.clone(), token.clone()); - } + self.sink_tokens(tokens); Ok(()) } } @@ -67,12 +115,7 @@ impl FeaturesSource for MemoryProvider { features: client_features.features.filter_by_projects(token), ..client_features }) - .unwrap_or_else(|| ClientFeatures { - version: 2, - features: vec![], - segments: None, - query: None, - })) + .unwrap_or_else(empty_client_features)) } } @@ -91,18 +134,6 @@ impl TokenSource for MemoryProvider { .collect()) } - async fn get_token_validation_status(&self, secret: &str) -> EdgeResult { - if let Some(token) = self.token_store.get(secret) { - Ok(token.clone().status) - } else { - let _ = self - .sender - .send(EdgeToken::try_from(secret.to_string())?) - .await; - Ok(TokenValidationStatus::Unknown) - } - } - async fn token_details(&self, secret: String) -> EdgeResult> { Ok(self.token_store.get(&secret).cloned()) } @@ -115,6 +146,29 @@ impl TokenSource for MemoryProvider { .cloned() .collect()) } + + async fn get_tokens_due_for_refresh(&self) -> EdgeResult> { + info!("Calling tokens due for refresh"); + let refreshes = self + .tokens_to_refresh + .iter() + .filter(|(_k, value)| match value.last_check { + Some(last) => { + info!( + "Last checked {last:?}. Last update: {:?}", + value.last_refreshed + ); + Utc::now() - last > self.features_refresh_interval + } + None => { + info!("No last check date, definitely need to update this"); + true + } + }) + .map(|(_k, refresh)| refresh.clone()) + .collect(); + Ok(refreshes) + } } #[async_trait] @@ -123,71 +177,71 @@ impl FeatureSink for MemoryProvider { &mut self, token: &EdgeToken, features: ClientFeatures, + etag: Option, ) -> EdgeResult<()> { - self.sink_features(token, features); + self.sink_features(token, features, etag); + Ok(()) + } + async fn update_last_check(&mut self, token: &EdgeToken) -> EdgeResult<()> { + self.update_last_check(token); Ok(()) } } #[cfg(test)] -mod test { +mod tests { use std::str::FromStr; - use tokio::sync::mpsc; use unleash_types::client_features::ClientFeature; + use crate::types::into_entity_tag; + use super::*; #[tokio::test] async fn memory_provider_correctly_deduplicates_tokens() { - let (send, _) = mpsc::channel::(32); - let mut provider = MemoryProvider::new(send); - let _ = provider - .sink_tokens(vec![EdgeToken { - token: "some_secret".into(), - ..EdgeToken::default() - }]) - .await; - - let _ = provider - .sink_tokens(vec![EdgeToken { - token: "some_secret".into(), - ..EdgeToken::default() - }]) - .await; + let mut provider = MemoryProvider::default(); + provider.sink_tokens(vec![EdgeToken { + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(), + ..EdgeToken::default() + }]); + + provider.sink_tokens(vec![EdgeToken { + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(), + ..EdgeToken::default() + }]); assert!(provider.get_known_tokens().await.unwrap().len() == 1); } #[tokio::test] async fn memory_provider_correctly_determines_token_to_be_valid() { - let (send, _) = mpsc::channel::(32); - let mut provider = MemoryProvider::new(send); - let _ = provider - .sink_tokens(vec![EdgeToken { - token: "some_secret".into(), - status: TokenValidationStatus::Validated, - ..EdgeToken::default() - }]) - .await; + let mut provider = MemoryProvider::default(); + provider.sink_tokens(vec![EdgeToken { + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(), + status: TokenValidationStatus::Validated, + ..EdgeToken::default() + }]); assert_eq!( provider - .get_token_validation_status("some_secret") + .token_details( + "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into() + ) .await - .unwrap(), + .expect("Could not retrieve token details") + .unwrap() + .status, TokenValidationStatus::Validated ) } #[tokio::test] async fn memory_provider_yields_correct_response_for_token() { - let (send, _) = mpsc::channel::(32); - - let mut provider = MemoryProvider::new(send); + let mut provider = MemoryProvider::default(); let token = EdgeToken { environment: Some("development".into()), projects: vec!["default".into()], - token: "some-secret".into(), + token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1ft".into(), ..EdgeToken::default() }; @@ -202,7 +256,7 @@ mod test { query: None, }; - provider.sink_features(&token, features); + provider.sink_features(&token, features.clone(), into_entity_tag(features)); let found_feature = provider.get_client_features(&token).await.unwrap().features[0].clone(); assert!(found_feature.name == *"James Bond"); @@ -212,23 +266,20 @@ mod test { async fn memory_provider_can_yield_list_of_validated_tokens() { let james_bond = EdgeToken { status: TokenValidationStatus::Validated, - ..EdgeToken::from_str("jamesbond").unwrap() + ..EdgeToken::from_str("*:development.jamesbond").unwrap() }; let frank_drebin = EdgeToken { status: TokenValidationStatus::Validated, - ..EdgeToken::from_str("frankdrebin").unwrap() + ..EdgeToken::from_str("*:development.frankdrebin").unwrap() }; - let (send, _) = mpsc::channel::(32); - let mut provider = MemoryProvider::new(send); - let _ = provider - .sink_tokens(vec![james_bond.clone(), frank_drebin.clone()]) - .await; + let mut provider = MemoryProvider::default(); + provider.sink_tokens(vec![james_bond.clone(), frank_drebin.clone()]); let valid_tokens = provider .filter_valid_tokens(vec![ - "jamesbond".into(), - "anotherinvalidone".into(), - "frankdrebin".into(), + "*:development.jamesbond".into(), + "*:development.anotherinvalidone".into(), + "*:development.frankdrebin".into(), ]) .await .unwrap(); @@ -239,9 +290,7 @@ mod test { #[tokio::test] async fn memory_provider_filters_out_features_by_token() { - let (send, _) = mpsc::channel::(32); - - let mut provider = MemoryProvider::new(send); + let mut provider = MemoryProvider::default(); let token = EdgeToken { environment: Some("development".into()), projects: vec!["default".into()], @@ -267,7 +316,7 @@ mod test { query: None, }; - provider.sink_features(&token, features); + provider.sink_features(&token, features.clone(), into_entity_tag(features)); let all_features = provider.get_client_features(&token).await.unwrap().features; let found_feature = all_features[0].clone(); @@ -278,9 +327,7 @@ mod test { #[tokio::test] async fn memory_provider_respects_all_projects_in_token() { - let (send, _) = mpsc::channel::(32); - - let mut provider = MemoryProvider::new(send); + let mut provider = MemoryProvider::default(); let token = EdgeToken { environment: Some("development".into()), projects: vec!["*".into()], @@ -306,7 +353,7 @@ mod test { query: None, }; - provider.sink_features(&token, features); + provider.sink_features(&token, features.clone(), into_entity_tag(features)); let all_features = provider.get_client_features(&token).await.unwrap().features; let first_feature = all_features diff --git a/server/src/data_sources/offline_provider.rs b/server/src/data_sources/offline_provider.rs index f58072ea..8b14de02 100644 --- a/server/src/data_sources/offline_provider.rs +++ b/server/src/data_sources/offline_provider.rs @@ -1,6 +1,7 @@ use crate::error::EdgeError; use crate::types::{ - EdgeResult, EdgeSource, EdgeToken, FeaturesSource, TokenSource, TokenValidationStatus, + EdgeResult, EdgeSource, EdgeToken, FeatureRefresh, FeaturesSource, TokenSource, + TokenValidationStatus, }; use async_trait::async_trait; use std::collections::HashMap; @@ -37,14 +38,6 @@ impl TokenSource for OfflineProvider { .collect()) } - async fn get_token_validation_status(&self, secret: &str) -> EdgeResult { - Ok(if self.valid_tokens.contains_key(secret) { - TokenValidationStatus::Validated - } else { - TokenValidationStatus::Invalid - }) - } - async fn token_details(&self, secret: String) -> EdgeResult> { Ok(self.valid_tokens.get(&secret).cloned()) } @@ -57,6 +50,9 @@ impl TokenSource for OfflineProvider { .map(|(_k, t)| t) .collect()) } + async fn get_tokens_due_for_refresh(&self) -> EdgeResult> { + Ok(vec![]) + } } impl EdgeSource for OfflineProvider {} @@ -83,8 +79,7 @@ impl OfflineProvider { features, valid_tokens: valid_tokens .into_iter() - .map(EdgeToken::try_from) - .filter_map(|t| t.ok()) + .map(|t| EdgeToken::offline_token(t.as_str())) .map(|t| (t.token.clone(), t)) .collect(), } diff --git a/server/src/data_sources/redis_provider.rs b/server/src/data_sources/redis_provider.rs index d614a7c2..22de82e2 100644 --- a/server/src/data_sources/redis_provider.rs +++ b/server/src/data_sources/redis_provider.rs @@ -1,14 +1,15 @@ +use actix_web::http::header::EntityTag; use async_trait::async_trait; use redis::AsyncCommands; use redis::{Client, Commands, RedisError}; -use tokio::sync::{mpsc::Sender, RwLock}; +use tokio::sync::RwLock; use unleash_types::client_features::{ClientFeature, ClientFeatures}; use unleash_types::Merge; pub const FEATURE_PREFIX: &str = "unleash-feature-namespace:"; pub const TOKENS_KEY: &str = "unleash-token-namespace:"; -use crate::types::TokenValidationStatus; +use crate::types::{FeatureRefresh, TokenValidationStatus}; use crate::{ error::EdgeError, types::{ @@ -19,7 +20,6 @@ use crate::{ pub struct RedisProvider { redis_client: RwLock, - sender: Sender, } impl From for EdgeError { @@ -29,10 +29,9 @@ impl From for EdgeError { } impl RedisProvider { - pub fn new(url: &str, sender: Sender) -> Result { + pub fn new(url: &str) -> Result { let client = redis::Client::open(url)?; Ok(Self { - sender, redis_client: RwLock::new(client), }) } @@ -54,6 +53,7 @@ impl FeatureSink for RedisProvider { &mut self, token: &EdgeToken, features: ClientFeatures, + _etag: Option, ) -> EdgeResult<()> { let mut lock = self.redis_client.write().await; let mut con = lock.get_async_connection().await?; @@ -71,6 +71,10 @@ impl FeatureSink for RedisProvider { let _: () = lock.set(key, serialized_features)?; Ok(()) } + + async fn update_last_check(&mut self, _token: &EdgeToken) -> EdgeResult<()> { + todo!() + } } #[async_trait] impl TokenSink for RedisProvider { @@ -134,23 +138,6 @@ impl TokenSource for RedisProvider { .collect()) } - async fn get_token_validation_status(&self, secret: &str) -> EdgeResult { - if let Some(t) = self - .get_known_tokens() - .await? - .iter() - .find(|t| t.token == secret) - { - Ok(t.clone().status) - } else { - let _ = self - .sender - .send(EdgeToken::try_from(secret.to_string())?) - .await; - Ok(TokenValidationStatus::Unknown) - } - } - async fn filter_valid_tokens(&self, _secrets: Vec) -> EdgeResult> { todo!() } @@ -159,4 +146,7 @@ impl TokenSource for RedisProvider { let tokens = self.get_known_tokens().await?; Ok(tokens.into_iter().find(|t| t.token == secret)) } + async fn get_tokens_due_for_refresh(&self) -> EdgeResult> { + todo!() + } } diff --git a/server/src/edge_api.rs b/server/src/edge_api.rs index e8393ebd..ca612c3c 100644 --- a/server/src/edge_api.rs +++ b/server/src/edge_api.rs @@ -1,11 +1,14 @@ use actix_web::{ post, - web::{self, Json}, - HttpResponse, + web::{self, Data, Json}, + HttpRequest, HttpResponse, }; use tokio::sync::RwLock; -use crate::types::{EdgeJsonResult, EdgeSource, TokenStrings, ValidatedTokens}; +use crate::{ + auth::token_validator::TokenValidator, + types::{EdgeJsonResult, EdgeSource, TokenStrings, TokenValidationStatus, ValidatedTokens}, +}; use crate::{ metrics::client_metrics::MetricsCache, types::{BatchMetricsRequestBody, EdgeResult}, @@ -14,16 +17,32 @@ use crate::{ #[post("/validate")] async fn validate( token_provider: web::Data>, + req: HttpRequest, tokens: Json, ) -> EdgeJsonResult { - let valid_tokens = token_provider - .read() - .await - .filter_valid_tokens(tokens.into_inner().tokens) - .await?; - Ok(Json(ValidatedTokens { - tokens: valid_tokens, - })) + let maybe_validator = req.app_data::>>(); + match maybe_validator { + Some(validator) => { + let known_tokens = validator + .write() + .await + .register_tokens(tokens.into_inner().tokens) + .await?; + Ok(Json(ValidatedTokens { + tokens: known_tokens + .into_iter() + .filter(|t| t.status == TokenValidationStatus::Validated) + .collect(), + })) + } + None => Ok(Json(ValidatedTokens { + tokens: token_provider + .read() + .await + .filter_valid_tokens(tokens.into_inner().tokens) + .await?, + })), + } } #[post("/metrics")] diff --git a/server/src/frontend_api.rs b/server/src/frontend_api.rs index 07f8d181..fb3f683a 100644 --- a/server/src/frontend_api.rs +++ b/server/src/frontend_api.rs @@ -123,18 +123,14 @@ pub fn configure_frontend_api(cfg: &mut web::ServiceConfig) { mod tests { use std::sync::Arc; - use crate::data_sources::builder::DataProviderPair; - use crate::types::{ - EdgeResult, EdgeSink, EdgeSource, EdgeToken, FeatureSink, FeaturesSource, TokenSink, - TokenSource, TokenValidationStatus, - }; + use crate::data_sources::offline_provider::OfflineProvider; + use crate::types::EdgeSource; use actix_web::{ http::header::ContentType, test, web::{self, Data}, App, }; - use async_trait::async_trait; use serde_json::json; use tokio::sync::RwLock; use unleash_types::{ @@ -142,82 +138,6 @@ mod tests { frontend::{EvaluatedToggle, EvaluatedVariant, FrontendResult}, }; - #[derive(Clone, Default)] - struct MockEdgeProvider { - features: Option, - } - - impl MockEdgeProvider { - fn with(self, features: ClientFeatures) -> DataProviderPair { - let provider = Arc::new(RwLock::new(MockEdgeProvider { - features: Some(features), - })); - let source: Arc> = provider.clone(); - let sink: Arc> = provider; - - (source, sink) - } - } - - #[async_trait] - impl FeaturesSource for MockEdgeProvider { - async fn get_client_features(&self, _token: &EdgeToken) -> EdgeResult { - Ok(self - .features - .as_ref() - .expect("You need to populate the mock data for your test") - .clone()) - } - } - - #[async_trait] - impl TokenSource for MockEdgeProvider { - async fn get_known_tokens(&self) -> EdgeResult> { - todo!() - } - - async fn get_valid_tokens(&self) -> EdgeResult> { - todo!() - } - - async fn get_token_validation_status( - &self, - _secret: &str, - ) -> EdgeResult { - Ok(TokenValidationStatus::Validated) - } - - async fn token_details(&self, _secret: String) -> EdgeResult> { - todo!() - } - - async fn filter_valid_tokens(&self, _tokens: Vec) -> EdgeResult> { - todo!() - } - } - - impl EdgeSource for MockEdgeProvider {} - - #[async_trait] - impl TokenSink for MockEdgeProvider { - async fn sink_tokens(&mut self, _tokens: Vec) -> EdgeResult<()> { - todo!() - } - } - - impl EdgeSink for MockEdgeProvider {} - - #[async_trait] - impl FeatureSink for MockEdgeProvider { - async fn sink_features( - &mut self, - _token: &EdgeToken, - _features: ClientFeatures, - ) -> EdgeResult<()> { - todo!() - } - } - fn client_features_with_constraint_requiring_user_id_of_seven() -> ClientFeatures { ClientFeatures { version: 1, @@ -270,13 +190,18 @@ mod tests { #[actix_web::test] async fn calling_post_requests_resolves_context_values_correctly() { - let (source, sink) = MockEdgeProvider::default() - .with(client_features_with_constraint_requiring_user_id_of_seven()); + let shareable_provider = Arc::new(RwLock::new(OfflineProvider::new( + client_features_with_constraint_requiring_user_id_of_seven(), + vec![ + "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" + .to_string(), + ], + ))); + let edge_source: Arc> = shareable_provider.clone(); let app = test::init_service( App::new() - .app_data(Data::from(source)) - .app_data(Data::from(sink)) + .app_data(Data::from(edge_source)) .service(web::scope("/api").service(super::post_frontend_features)), ) .await; @@ -292,34 +217,37 @@ mod tests { "userId": "7" })) .to_request(); + let second_req = test::TestRequest::post() + .uri("/api/proxy/all") + .insert_header(ContentType::json()) + .insert_header(( + "Authorization", + "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7", + )) + .set_json(json!({ + "userId": "7" + })) + .to_request(); - let result = test::call_and_read_body(&app, req).await; - - let expected = FrontendResult { - toggles: vec![EvaluatedToggle { - name: "test".into(), - enabled: true, - variant: EvaluatedVariant { - name: "disabled".into(), - enabled: false, - payload: None, - }, - impression_data: false, - }], - }; - - assert_eq!(result, serde_json::to_vec(&expected).unwrap()); + let _result: FrontendResult = test::call_and_read_body_json(&app, req).await; + let result: FrontendResult = test::call_and_read_body_json(&app, second_req).await; + assert_eq!(result.toggles.len(), 1); + assert!(result.toggles.get(0).unwrap().enabled) } #[actix_web::test] async fn calling_get_requests_resolves_context_values_correctly() { - let (source, sink) = MockEdgeProvider::default() - .with(client_features_with_constraint_requiring_user_id_of_seven()); - + let shareable_provider = Arc::new(RwLock::new(OfflineProvider::new( + client_features_with_constraint_requiring_user_id_of_seven(), + vec![ + "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" + .to_string(), + ], + ))); + let edge_source: Arc> = shareable_provider.clone(); let app = test::init_service( App::new() - .app_data(Data::from(source)) - .app_data(Data::from(sink)) + .app_data(Data::from(edge_source.clone())) .service(web::scope("/api").service(super::get_frontend_features)), ) .await; @@ -353,13 +281,18 @@ mod tests { #[actix_web::test] async fn calling_get_requests_resolves_context_values_correctly_with_enabled_filter() { - let (source, sink) = MockEdgeProvider::default() - .with(client_features_with_constraint_one_enabled_toggle_and_one_disabled_toggle()); + let shareable_provider = Arc::new(RwLock::new(OfflineProvider::new( + client_features_with_constraint_one_enabled_toggle_and_one_disabled_toggle(), + vec![ + "*:development.03fa5f506428fe80ed5640c351c7232e38940814d2923b08f5c05fa7" + .to_string(), + ], + ))); + let edge_source: Arc> = shareable_provider.clone(); let app = test::init_service( App::new() - .app_data(Data::from(source)) - .app_data(Data::from(sink)) + .app_data(Data::from(edge_source.clone())) .service(web::scope("/api").service(super::get_enabled_frontend_features)), ) .await; diff --git a/server/src/http/background_refresh.rs b/server/src/http/background_refresh.rs index 838e6276..fdf4863c 100644 --- a/server/src/http/background_refresh.rs +++ b/server/src/http/background_refresh.rs @@ -1,8 +1,8 @@ -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use crate::types::{ - ClientFeaturesRequest, ClientFeaturesResponse, EdgeSink, EdgeToken, TokenType, - TokenValidationStatus, ValidateTokensRequest, + ClientFeaturesRequest, ClientFeaturesResponse, EdgeSink, EdgeSource, EdgeToken, + ValidateTokensRequest, }; use tokio::sync::{mpsc::Receiver, mpsc::Sender, RwLock}; use tracing::{info, warn}; @@ -49,48 +49,49 @@ pub async fn poll_for_token_status( } pub async fn refresh_features( - mut channel: Receiver, + source: Arc>, sink: Arc>, unleash_client: UnleashClient, ) { - let mut tokens = HashSet::new(); loop { tokio::select! { - token = channel.recv() => { // Got a new token - if let Some(token) = token { - if token.token_type == Some(TokenType::Client) && token.status == TokenValidationStatus::Validated { - tokens.insert(token); - } - } else { - break; - } - } - , - _ = tokio::time::sleep(Duration::from_secs(10)) => { // Iterating over known tokens - let mut write_lock = sink.write().await; - info!("Updating features for known tokens. Know of {} tokens", tokens.len()); - for token in tokens.iter() { - let features_result = unleash_client.get_client_features(ClientFeaturesRequest { - api_key: token.token.clone(), - etag: None, - }).await; - match features_result { - Ok(feature_response) => match feature_response { - ClientFeaturesResponse::NoUpdate(_) => info!("No update needed"), - ClientFeaturesResponse::Updated(features, _) => { - info!("Got updated client features. Writing to sink {features:?}"); - let sink_result = write_lock.sink_features(token, features).await; - if let Err(err) = sink_result { - warn!("Failed to sink features in updater {err:?}"); + _ = tokio::time::sleep(Duration::from_secs(5)) => { + let read_lock = source.read().await; + let to_refresh = read_lock.get_tokens_due_for_refresh().await; + drop(read_lock); + if let Ok(refreshes) = to_refresh { + info!("Had {} tokens to refresh", refreshes.len()); + for refresh in refreshes { + info!("{refresh:?}"); + let features_result = unleash_client.get_client_features(ClientFeaturesRequest { + api_key: refresh.token.token.clone(), + etag: refresh.etag, + }).await; + + match features_result { + Ok(feature_response) => match feature_response { + ClientFeaturesResponse::NoUpdate(_) => { + info!("No update needed, will update last check time"); + let mut write_lock = sink.write().await; + let _ = write_lock.update_last_check(&refresh.token).await; + } + ClientFeaturesResponse::Updated(features, etag) => { + info!("Got updated client features. Writing to sink {features:?}"); + let mut write_lock = sink.write().await; + let sink_result = write_lock.sink_features(&refresh.token, features, etag).await; + drop(write_lock); + if let Err(err) = sink_result { + warn!("Failed to sink features in updater {err:?}"); + } } + }, + Err(e) => { + warn!("Couldn't refresh features: {e:?}"); } - }, - Err(e) => { - warn!("Couldn't refresh features: {e:?}"); } } } - }, + } } } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 593b00ec..518ebeb6 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -3,6 +3,7 @@ pub mod http; pub mod metrics; pub mod prom_metrics; +pub mod auth; pub mod cli; pub mod client_api; pub mod edge_api; diff --git a/server/src/main.rs b/server/src/main.rs index e9fd8797..a47e917c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -14,7 +14,7 @@ use unleash_edge::client_api; use unleash_edge::data_sources::builder::build_source_and_sink; use unleash_edge::edge_api; use unleash_edge::frontend_api; -use unleash_edge::http::background_refresh::{poll_for_token_status, refresh_features}; +use unleash_edge::http::background_refresh::refresh_features; use unleash_edge::http::background_send_metrics::send_metrics_task; use unleash_edge::internal_backstage; use unleash_edge::metrics::client_metrics::MetricsCache; @@ -29,10 +29,11 @@ async fn main() -> Result<(), anyhow::Error> { let args = CliArgs::parse(); let http_args = args.clone().http; let (metrics_handler, request_metrics) = prom_metrics::instantiate(None); - let repo_info = build_source_and_sink(args).unwrap(); + let repo_info = build_source_and_sink(args).await.unwrap(); let source = repo_info.source; let source_clone = source.clone(); let sink_info = repo_info.sink_info; + let validator = sink_info.as_ref().map(|sink| sink.token_validator.clone()); let metrics_cache = Arc::new(RwLock::new(MetricsCache::default())); let metrics_cache_clone = metrics_cache.clone(); @@ -44,10 +45,13 @@ async fn main() -> Result<(), anyhow::Error> { .send_wildcard() .allow_any_header() .allow_any_method(); - App::new() + let mut app = App::new() .app_data(edge_source) - .app_data(web::Data::from(metrics_cache.clone())) - .wrap(Etag::default()) + .app_data(web::Data::from(metrics_cache.clone())); + if validator.is_some() { + app = app.app_data(web::Data::from(validator.clone().unwrap())) + } + app.wrap(Etag::default()) .wrap(cors_middleware) .wrap(RequestTracing::new()) .wrap(request_metrics.clone()) @@ -84,10 +88,7 @@ async fn main() -> Result<(), anyhow::Error> { _ = server.run() => { tracing::info!("Actix was shutdown properly"); }, - _ = poll_for_token_status(sink_info.unvalidated_receive, sink_info.validated_send.clone(), sink_info.sink.clone(), sink_info.unleash_client.clone()) => { - tracing::info!("Token validator task is shutting down") - }, - _ = refresh_features(sink_info.validated_receive, sink_info.sink, sink_info.unleash_client.clone()) => { + _ = refresh_features(source_clone.clone(), sink_info.sink, sink_info.unleash_client.clone()) => { tracing::info!("Refresh task is shutting down"); }, _ = send_metrics_task(metrics_cache_clone, source_clone, sink_info.unleash_client, sink_info.metrics_interval_seconds) => { diff --git a/server/src/middleware/validate_token.rs b/server/src/middleware/validate_token.rs index caec1b36..006c4f2e 100644 --- a/server/src/middleware/validate_token.rs +++ b/server/src/middleware/validate_token.rs @@ -1,3 +1,4 @@ +use crate::auth::token_validator::TokenValidator; use crate::types::{EdgeSource, EdgeToken, TokenType, TokenValidationStatus}; use actix_web::{ body::MessageBody, @@ -6,56 +7,66 @@ use actix_web::{ HttpResponse, }; use tokio::sync::RwLock; -use tracing::instrument; -#[instrument(skip(srv, req, provider))] pub async fn validate_token( token: EdgeToken, - provider: Data>, req: ServiceRequest, srv: crate::middleware::as_async_middleware::Next, ) -> Result, actix_web::Error> { - let res = if let Some(known_token) = provider - .read() - .await - .token_details(token.token.clone()) - .await? - { - match known_token.status { - TokenValidationStatus::Validated => match known_token.token_type { - Some(TokenType::Frontend) => { - if req.path().contains("/api/frontend") || req.path().contains("/api/proxy") { - srv.call(req).await?.map_into_left_body() - } else { - req.into_response(HttpResponse::Forbidden().finish()) - .map_into_right_body() + let maybe_validator = req.app_data::>>(); + let source = req + .app_data::>>() + .unwrap() + .clone() + .into_inner(); + match maybe_validator { + Some(validator) => { + let known_token = validator + .write() + .await + .register_token(token.token.clone()) + .await?; + let res = match known_token.status { + TokenValidationStatus::Validated => match known_token.token_type { + Some(TokenType::Frontend) => { + if req.path().contains("/api/frontend") || req.path().contains("/api/proxy") + { + srv.call(req).await?.map_into_left_body() + } else { + req.into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body() + } } - } - Some(TokenType::Client) => { - if req.path().contains("/api/client") { - srv.call(req).await?.map_into_left_body() - } else { - req.into_response(HttpResponse::Forbidden().finish()) - .map_into_right_body() + Some(TokenType::Client) => { + if req.path().contains("/api/client") { + srv.call(req).await?.map_into_left_body() + } else { + req.into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body() + } } - } - _ => req + _ => req + .into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body(), + }, + TokenValidationStatus::Unknown => req + .into_response(HttpResponse::Unauthorized().finish()) + .map_into_right_body(), + TokenValidationStatus::Invalid => req .into_response(HttpResponse::Forbidden().finish()) .map_into_right_body(), - }, - TokenValidationStatus::Unknown => req - .into_response(HttpResponse::Unauthorized().finish()) - .map_into_right_body(), - TokenValidationStatus::Invalid => req - .into_response(HttpResponse::Forbidden().finish()) - .map_into_right_body(), + }; + Ok(res) + } + None => { + let res = match source.read().await.token_details(token.token).await? { + Some(_) => srv.call(req).await?.map_into_left_body(), + None => req + .into_response(HttpResponse::Forbidden().finish()) + .map_into_right_body(), + }; + + Ok(res) } - } else { - let lock = provider.read().await; - let _ = lock.get_token_validation_status(token.token.as_str()).await; - drop(lock); - req.into_response(HttpResponse::Unauthorized()) - .map_into_right_body() - }; - Ok(res) + } } diff --git a/server/src/types.rs b/server/src/types.rs index 5a280699..f03cb233 100644 --- a/server/src/types.rs +++ b/server/src/types.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::{ future::{ready, Ready}, hash::{Hash, Hasher}, @@ -185,11 +186,19 @@ impl FromStr for EdgeToken { Err(EdgeError::TokenParseError) } } else { - Ok(EdgeToken::no_project_or_environment(s)) + Err(EdgeError::TokenParseError) } } } +impl EdgeToken { + pub fn offline_token(s: &str) -> Self { + EdgeToken::try_from(s.to_string()) + .ok() + .unwrap_or_else(|| EdgeToken::no_project_or_environment(s)) + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct TokenStrings { pub tokens: Vec, @@ -208,9 +217,39 @@ pub trait FeaturesSource { pub trait TokenSource { async fn get_known_tokens(&self) -> EdgeResult>; async fn get_valid_tokens(&self) -> EdgeResult>; - async fn get_token_validation_status(&self, secret: &str) -> EdgeResult; async fn token_details(&self, secret: String) -> EdgeResult>; async fn filter_valid_tokens(&self, tokens: Vec) -> EdgeResult>; + async fn get_tokens_due_for_refresh(&self) -> EdgeResult>; +} + +#[derive(Clone)] +pub struct FeatureRefresh { + pub token: EdgeToken, + pub etag: Option, + pub last_refreshed: Option>, + pub last_check: Option>, +} + +impl FeatureRefresh { + pub fn new(token: EdgeToken) -> Self { + Self { + token, + etag: None, + last_refreshed: None, + last_check: None, + } + } +} + +impl fmt::Debug for FeatureRefresh { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FeatureRefresh") + .field("token", &"***") + .field("etag", &self.etag) + .field("last_refreshed", &self.last_refreshed) + .field("last_check", &self.last_check) + .finish() + } } pub trait EdgeSource: FeaturesSource + TokenSource + Send + Sync {} @@ -222,7 +261,13 @@ pub trait FeatureSink { &mut self, token: &EdgeToken, features: ClientFeatures, + etag: Option, ) -> EdgeResult<()>; + async fn update_last_check(&mut self, token: &EdgeToken) -> EdgeResult<()>; +} + +pub fn into_entity_tag(client_features: ClientFeatures) -> Option { + client_features.xx3_hash().ok().map(EntityTag::new_weak) } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -242,6 +287,13 @@ pub trait TokenSink { async fn sink_tokens(&mut self, tokens: Vec) -> EdgeResult<()>; } +#[async_trait] +pub trait TokenValidator { + /// Will validate upstream, and add tokens with status from upstream to token cache. + /// Will block until verified with upstream + async fn register_tokens(&mut self, tokens: Vec) -> EdgeResult>; +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct BuildInfo { pub package_version: String, @@ -314,9 +366,7 @@ mod tests { } } - #[test_case("943ca9171e2c884c545c5d82417a655fb77cec970cc3b78a8ff87f4406b495d0"; "old java client token")] #[test_case("demo-app:production.614a75cf68bef8703aa1bd8304938a81ec871f86ea40c975468eabd6"; "demo token with project and environment")] - #[test_case("secret-123"; "old example proxy token")] #[test_case("*:default.5fa5ac2580c7094abf0d87c68b1eeb54bdc485014aef40f9fcb0673b"; "demo token with access to all projects and default environment")] fn edge_token_from_string(token: &str) { let parsed_token = EdgeToken::from_str(token); @@ -331,6 +381,14 @@ mod tests { } } + #[test_case("943ca9171e2c884c545c5d82417a655fb77cec970cc3b78a8ff87f4406b495d0"; "old java client token")] + #[test_case("secret-123"; "old example proxy token")] + fn offline_token_from_string(token: &str) { + let offline_token = EdgeToken::offline_token(token); + assert_eq!(offline_token.environment, None); + assert!(offline_token.projects.is_empty()); + } + #[test_case( "demo-app:production", "demo-app:production" diff --git a/server/tests/redis_test.rs b/server/tests/redis_test.rs index 8c0659a0..a7da3264 100644 --- a/server/tests/redis_test.rs +++ b/server/tests/redis_test.rs @@ -1,12 +1,12 @@ use std::str::FromStr; +use actix_web::http::header::EntityTag; use redis::{Client, Commands}; use testcontainers::{clients::Cli, images::redis::Redis, Container}; -use tokio::sync::mpsc; use unleash_edge::{ data_sources::redis_provider::{RedisProvider, FEATURE_PREFIX}, - types::{EdgeSink, EdgeSource, EdgeToken, TokenValidationStatus}, + types::{into_entity_tag, EdgeSink, EdgeSource, EdgeToken, TokenValidationStatus}, }; use unleash_types::client_features::{ClientFeature, ClientFeatures}; @@ -33,9 +33,7 @@ async fn redis_sink_returns_stores_data_correctly() { let docker = Cli::default(); let (mut client, url, _node) = setup_redis(&docker); - let (send, _) = mpsc::channel::(32); - - let mut sink: Box = Box::new(RedisProvider::new(&url, send).unwrap()); + let mut sink: Box = Box::new(RedisProvider::new(&url).unwrap()); let token = EdgeToken { status: TokenValidationStatus::Validated, @@ -56,7 +54,9 @@ async fn redis_sink_returns_stores_data_correctly() { let key = build_features_key(&token); - sink.sink_features(&token, features.clone()).await.unwrap(); + sink.sink_features(&token, features.clone(), into_entity_tag(features.clone())) + .await + .unwrap(); let stored_features: String = client.get::<&str, String>(key.as_str()).unwrap(); let stored_features: ClientFeatures = serde_json::from_str(&stored_features).unwrap(); assert_eq!(stored_features, features.clone()); @@ -67,9 +67,7 @@ async fn redis_sink_returns_merges_features_by_environment() { let docker = Cli::default(); let (mut client, url, _node) = setup_redis(&docker); - let (send, _) = mpsc::channel::(32); - - let mut sink: Box = Box::new(RedisProvider::new(&url, send).unwrap()); + let mut sink: Box = Box::new(RedisProvider::new(&url).unwrap()); let token = EdgeToken { environment: Some("some-env-2".to_string()), @@ -90,7 +88,9 @@ async fn redis_sink_returns_merges_features_by_environment() { version: 2, }; - sink.sink_features(&token, features1.clone()).await.unwrap(); + sink.sink_features(&token, features1.clone(), into_entity_tag(features1)) + .await + .unwrap(); let features2 = ClientFeatures { features: vec![ClientFeature { @@ -102,7 +102,9 @@ async fn redis_sink_returns_merges_features_by_environment() { version: 2, }; - sink.sink_features(&token, features2.clone()).await.unwrap(); + sink.sink_features(&token, features2.clone(), into_entity_tag(features2)) + .await + .unwrap(); let first_expected_toggle = ClientFeature { name: "some-other-test".to_string(), @@ -125,9 +127,7 @@ async fn redis_sink_returns_splits_out_data_with_different_environments() { let docker = Cli::default(); let (mut client, url, _node) = setup_redis(&docker); - let (send, _) = mpsc::channel::(32); - - let mut sink: Box = Box::new(RedisProvider::new(&url, send).unwrap()); + let mut sink: Box = Box::new(RedisProvider::new(&url).unwrap()); let dev_token = EdgeToken { status: TokenValidationStatus::Validated, @@ -155,7 +155,7 @@ async fn redis_sink_returns_splits_out_data_with_different_environments() { version: 2, }; - sink.sink_features(&dev_token, features1.clone()) + sink.sink_features(&dev_token, features1.clone(), into_entity_tag(features1)) .await .unwrap(); @@ -169,7 +169,7 @@ async fn redis_sink_returns_splits_out_data_with_different_environments() { version: 2, }; - sink.sink_features(&prod_token, features2.clone()) + sink.sink_features(&prod_token, features2.clone(), into_entity_tag(features2)) .await .unwrap(); @@ -193,11 +193,8 @@ async fn redis_source_filters_by_projects() { let docker = Cli::default(); let (_client, url, _node) = setup_redis(&docker); - let (send, _) = mpsc::channel::(32); - let (other_send, _) = mpsc::channel::(32); - - let source: Box = Box::new(RedisProvider::new(&url, send).unwrap()); - let mut sink: Box = Box::new(RedisProvider::new(&url, other_send).unwrap()); + let source: Box = Box::new(RedisProvider::new(&url).unwrap()); + let mut sink: Box = Box::new(RedisProvider::new(&url).unwrap()); let features = ClientFeatures { features: vec![ @@ -235,7 +232,13 @@ async fn redis_source_filters_by_projects() { version: 2, }; - sink.sink_features(&token, features.clone()).await.unwrap(); + sink.sink_features( + &token, + features.clone(), + Some(EntityTag::new_weak(features.xx3_hash().unwrap())), + ) + .await + .unwrap(); let stored_features = source.get_client_features(&token).await.unwrap(); assert_eq!(stored_features, expected);