Skip to content

Commit

Permalink
task: token validator (#58)
Browse files Browse the repository at this point in the history
* task: add token validator

Co-authored-by: Christopher Kolstad <christopher.kolstad@gmail.com>
Co-authored-by: Gastón Fournier <gaston@getunleash.ai>
Co-authored-by: Simon Hornby <sighphyre@users.noreply.github.com>

* fix: token validation

Co-authored-by: Christopher Kolstad <christopher.kolstad@gmail.com>
Co-authored-by: Simon Hornby <sighphyre@users.noreply.github.com>
Co-authored-by: Gastón Fournier <gaston@getunleash.ai>

* test: make token_validator tests great again

* test: fix tests

* Store validation info and instantiate tokenvalidator behind an RwLock to match expected format for app_data call

* task: implement validate endpoint

* feat: now fetches data for validated tokens

* fix: failing frontend tests

* task: prewarm keys/features cache with client-keys as args to edge mode

---------

Co-authored-by: Christopher Kolstad <christopher.kolstad@gmail.com>
Co-authored-by: Gastón Fournier <gaston@getunleash.ai>
Co-authored-by: Simon Hornby <sighphyre@users.noreply.github.com>
Co-authored-by: Christopher Kolstad <chriswk@getunleash.ai>
  • Loading branch information
5 people authored Feb 11, 2023
1 parent 869294b commit 749b3ad
Show file tree
Hide file tree
Showing 16 changed files with 630 additions and 359 deletions.
1 change: 1 addition & 0 deletions server/src/auth/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod token_validator;
191 changes: 191 additions & 0 deletions server/src/auth/token_validator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use crate::error::EdgeError;
use crate::http::unleash_client::UnleashClient;
use crate::types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken, ValidateTokensRequest};
use std::sync::Arc;
use tokio::sync::RwLock;
use unleash_types::Merge;
#[derive(Clone)]
pub struct TokenValidator {
pub unleash_client: Arc<UnleashClient>,
pub edge_source: Arc<RwLock<dyn EdgeSource>>,
pub edge_sink: Arc<RwLock<dyn EdgeSink>>,
}

impl TokenValidator {
async fn get_unknown_and_known_tokens(
&mut self,
tokens: Vec<String>,
) -> EdgeResult<(Vec<EdgeToken>, Vec<EdgeToken>)> {
let tokens_with_valid_format: Vec<EdgeToken> = tokens
.into_iter()
.filter_map(|t| EdgeToken::try_from(t).ok())
.collect();

if tokens_with_valid_format.is_empty() {
Err(EdgeError::TokenParseError)
} else {
let mut tokens = vec![];
for token in tokens_with_valid_format {
let known_data = self
.edge_source
.read()
.await
.token_details(token.token.clone())
.await?;
tokens.push(known_data.unwrap_or(token));
}
Ok(tokens.into_iter().partition(|t| t.token_type.is_none()))
}
}

pub async fn register_token(&mut self, token: String) -> EdgeResult<EdgeToken> {
Ok(self
.register_tokens(vec![token])
.await?
.first()
.expect("Couldn't validate token")
.clone())
}

pub async fn register_tokens(&mut self, tokens: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
let (unknown_tokens, known_tokens) = self.get_unknown_and_known_tokens(tokens).await?;
if unknown_tokens.is_empty() {
Ok(known_tokens)
} else {
let token_strings_to_validate: Vec<String> =
unknown_tokens.iter().map(|t| t.token.clone()).collect();

let validation_result = self
.unleash_client
.validate_tokens(ValidateTokensRequest {
tokens: token_strings_to_validate,
})
.await?;

let tokens_to_sink: Vec<EdgeToken> = unknown_tokens
.into_iter()
.map(|maybe_valid| {
if let Some(validated_token) = validation_result
.iter()
.find(|v| maybe_valid.token == v.token)
{
EdgeToken {
status: crate::types::TokenValidationStatus::Validated,
..validated_token.clone()
}
} else {
EdgeToken {
status: crate::types::TokenValidationStatus::Invalid,
..maybe_valid
}
}
})
.collect();
let mut sink_to_write = self.edge_sink.write().await;
let _ = sink_to_write.sink_tokens(tokens_to_sink.clone()).await;
Ok(tokens_to_sink.merge(known_tokens))
}
}
}

#[cfg(test)]
mod tests {
use crate::data_sources::memory_provider::MemoryProvider;
use crate::types::{EdgeToken, TokenType, TokenValidationStatus};
use actix_http::HttpService;
use actix_http_test::{test_server, TestServer};
use actix_service::map_config;
use actix_web::{dev::AppConfig, web, App, HttpResponse};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EdgeTokens {
pub tokens: Vec<EdgeToken>,
}

async fn return_validated_tokens() -> HttpResponse {
HttpResponse::Ok().json(EdgeTokens {
tokens: valid_tokens(),
})
}

fn valid_tokens() -> Vec<EdgeToken> {
vec![EdgeToken {
token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(),
projects: vec!["*".into()],
environment: Some("development".into()),
token_type: Some(TokenType::Client),
status: TokenValidationStatus::Validated,
}]
}

async fn test_validation_server() -> TestServer {
test_server(move || {
HttpService::new(map_config(
App::new().service(
web::resource("/edge/validate").route(web::post().to(return_validated_tokens)),
),
|_| AppConfig::default(),
))
.tcp()
})
.await
}

#[tokio::test]
pub async fn can_validate_tokens() {
use crate::types::TokenSource;
let test_provider = Arc::new(RwLock::new(MemoryProvider::default()));
let srv = test_validation_server().await;
let unleash_client =
crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None)
.expect("Couldn't build client");

let mut validation_holder = super::TokenValidator {
unleash_client: Arc::new(unleash_client),
edge_source: test_provider.clone(),
edge_sink: test_provider.clone(),
};
let tokens_to_validate = vec![
"*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(),
"*:production.abcdef1234567890".into(),
];
validation_holder
.register_tokens(tokens_to_validate)
.await
.expect("Couldn't register tokens");
let known_tokens = test_provider
.read()
.await
.get_known_tokens()
.await
.expect("Couldn't get tokens");
assert_eq!(known_tokens.len(), 2);
assert!(known_tokens.iter().any(|t| t.token
== "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f"
&& t.status == TokenValidationStatus::Validated));
assert!(known_tokens
.iter()
.any(|t| t.token == "*:production.abcdef1234567890"
&& t.status == TokenValidationStatus::Invalid));
}

#[tokio::test]
pub async fn tokens_with_wrong_format_is_not_included() {
let test_provider = Arc::new(RwLock::new(MemoryProvider::default()));
let srv = test_validation_server().await;
let unleash_client =
crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None)
.expect("Couldn't build client");
let mut validation_holder = super::TokenValidator {
unleash_client: Arc::new(unleash_client),
edge_source: test_provider.clone(),
edge_sink: test_provider.clone(),
};
let invalid_tokens = vec!["jamesbond".into(), "invalidtoken".into()];
let validated_tokens = validation_holder.register_tokens(invalid_tokens).await;
assert!(validated_tokens.is_err());
}
}
12 changes: 11 additions & 1 deletion server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,22 @@ impl From<EdgeArgs> for EdgeArg {
.args(["redis_url"]),
))]
pub struct EdgeArgs {
/// Where is your upstream URL. Remember, this is the URL to your instance, without any trailing /api suffix
#[clap(short, long, env)]
pub unleash_url: String,

#[clap(short, long, env)]
pub redis_url: Option<String>,
#[clap(short, long, env, default_value_t = 10)]
/// How often should we post metrics upstream?
#[clap(short, long, env, default_value_t = 60)]
pub metrics_interval_seconds: u64,
/// How long between each refresh for a token
#[clap(short, long, env, default_value_t = 10)]
pub features_refresh_interval_seconds: i64,

/// Get data for these client keys at startup. Hot starts your feature cache
#[clap(short, long, env)]
pub client_keys: Vec<String>,
}

#[derive(Args, Debug, Clone)]
Expand Down
2 changes: 0 additions & 2 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::types::{EdgeJsonResult, EdgeResult, EdgeSource, EdgeToken};
use actix_web::web::{self, Json};
use actix_web::{get, post, HttpRequest, HttpResponse};
use tokio::sync::RwLock;
use tracing::info;
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::{
from_bucket_app_name_and_env, ClientApplication, ClientMetrics,
Expand All @@ -14,7 +13,6 @@ async fn features(
edge_token: EdgeToken,
features_source: web::Data<RwLock<dyn EdgeSource>>,
) -> EdgeJsonResult<ClientFeatures> {
info!("Getting data for {edge_token:?}");
features_source
.read()
.await
Expand Down
26 changes: 19 additions & 7 deletions server/src/data_sources/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;

use crate::{
auth::token_validator::TokenValidator,
cli::{CliArgs, EdgeArg, EdgeMode, OfflineArgs},
http::unleash_client::UnleashClient,
types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken},
Expand All @@ -30,6 +31,7 @@ pub struct SinkInfo {
pub validated_receive: mpsc::Receiver<EdgeToken>,
pub unvalidated_receive: mpsc::Receiver<EdgeToken>,
pub unleash_client: UnleashClient,
pub token_validator: Arc<RwLock<TokenValidator>>,
pub metrics_interval_seconds: u64,
}

Expand All @@ -42,17 +44,19 @@ fn build_offline(offline_args: OfflineArgs) -> EdgeResult<Arc<RwLock<dyn EdgeSou
Ok(provider)
}

fn build_memory(sender: Sender<EdgeToken>) -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(MemoryProvider::new(sender)));
fn build_memory(features_refresh_interval_seconds: i64) -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(MemoryProvider::new(
features_refresh_interval_seconds,
)));
Ok((data_source.clone(), data_source))
}

fn build_redis(redis_url: String, sender: Sender<EdgeToken>) -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(RedisProvider::new(&redis_url, sender)?));
fn build_redis(redis_url: String, _sender: Sender<EdgeToken>) -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(RedisProvider::new(&redis_url)?));
Ok((data_source.clone(), data_source))
}

pub fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
pub async fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
match args.mode {
EdgeMode::Offline(offline_args) => {
let source = build_offline(offline_args)?;
Expand All @@ -70,9 +74,16 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
let (validated_sender, validated_receiver) = mpsc::channel::<EdgeToken>(32);
let (source, sink) = match arg {
EdgeArg::Redis(redis_url) => build_redis(redis_url, unvalidated_sender),
EdgeArg::InMemory => build_memory(unvalidated_sender),
EdgeArg::InMemory => build_memory(edge_args.features_refresh_interval_seconds),
}?;

let mut token_validator = TokenValidator {
unleash_client: Arc::new(unleash_client.clone()),
edge_source: source.clone(),
edge_sink: sink.clone(),
};
if !edge_args.client_keys.is_empty() {
let _ = token_validator.register_tokens(edge_args.client_keys).await;
}
Ok(RepositoryInfo {
source,
sink_info: Some(SinkInfo {
Expand All @@ -81,6 +92,7 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
validated_receive: validated_receiver,
unvalidated_receive: unvalidated_receiver,
unleash_client,
token_validator: Arc::new(RwLock::new(token_validator)),
metrics_interval_seconds: edge_args.metrics_interval_seconds,
}),
})
Expand Down
Loading

0 comments on commit 749b3ad

Please sign in to comment.