Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: token validator #58

Merged
merged 9 commits into from
Feb 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/src/auth/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod token_validator;
191 changes: 191 additions & 0 deletions server/src/auth/token_validator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
use crate::error::EdgeError;
use crate::http::unleash_client::UnleashClient;
use crate::types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken, ValidateTokensRequest};
use std::sync::Arc;
use tokio::sync::RwLock;
use unleash_types::Merge;
#[derive(Clone)]
pub struct TokenValidator {
pub unleash_client: Arc<UnleashClient>,
pub edge_source: Arc<RwLock<dyn EdgeSource>>,
pub edge_sink: Arc<RwLock<dyn EdgeSink>>,
}

impl TokenValidator {
async fn get_unknown_and_known_tokens(
&mut self,
tokens: Vec<String>,
) -> EdgeResult<(Vec<EdgeToken>, Vec<EdgeToken>)> {
let tokens_with_valid_format: Vec<EdgeToken> = tokens
.into_iter()
.filter_map(|t| EdgeToken::try_from(t).ok())
.collect();

if tokens_with_valid_format.is_empty() {
Err(EdgeError::TokenParseError)
} else {
let mut tokens = vec![];
for token in tokens_with_valid_format {
let known_data = self
.edge_source
.read()
.await
.token_details(token.token.clone())
.await?;
tokens.push(known_data.unwrap_or(token));
}
Ok(tokens.into_iter().partition(|t| t.token_type.is_none()))
}
}

pub async fn register_token(&mut self, token: String) -> EdgeResult<EdgeToken> {
Ok(self
.register_tokens(vec![token])
.await?
.first()
.expect("Couldn't validate token")
.clone())
}

pub async fn register_tokens(&mut self, tokens: Vec<String>) -> EdgeResult<Vec<EdgeToken>> {
let (unknown_tokens, known_tokens) = self.get_unknown_and_known_tokens(tokens).await?;
if unknown_tokens.is_empty() {
Ok(known_tokens)
} else {
let token_strings_to_validate: Vec<String> =
unknown_tokens.iter().map(|t| t.token.clone()).collect();

let validation_result = self
.unleash_client
.validate_tokens(ValidateTokensRequest {
tokens: token_strings_to_validate,
})
.await?;

let tokens_to_sink: Vec<EdgeToken> = unknown_tokens
.into_iter()
.map(|maybe_valid| {
if let Some(validated_token) = validation_result
.iter()
.find(|v| maybe_valid.token == v.token)
{
EdgeToken {
status: crate::types::TokenValidationStatus::Validated,
..validated_token.clone()
}
} else {
EdgeToken {
status: crate::types::TokenValidationStatus::Invalid,
..maybe_valid
}
}
})
.collect();
let mut sink_to_write = self.edge_sink.write().await;
let _ = sink_to_write.sink_tokens(tokens_to_sink.clone()).await;
Ok(tokens_to_sink.merge(known_tokens))
}
}
}

#[cfg(test)]
mod tests {
use crate::data_sources::memory_provider::MemoryProvider;
use crate::types::{EdgeToken, TokenType, TokenValidationStatus};
use actix_http::HttpService;
use actix_http_test::{test_server, TestServer};
use actix_service::map_config;
use actix_web::{dev::AppConfig, web, App, HttpResponse};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EdgeTokens {
pub tokens: Vec<EdgeToken>,
}

async fn return_validated_tokens() -> HttpResponse {
HttpResponse::Ok().json(EdgeTokens {
tokens: valid_tokens(),
})
}

fn valid_tokens() -> Vec<EdgeToken> {
vec![EdgeToken {
token: "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(),
projects: vec!["*".into()],
environment: Some("development".into()),
token_type: Some(TokenType::Client),
status: TokenValidationStatus::Validated,
}]
}

async fn test_validation_server() -> TestServer {
test_server(move || {
HttpService::new(map_config(
App::new().service(
web::resource("/edge/validate").route(web::post().to(return_validated_tokens)),
),
|_| AppConfig::default(),
))
.tcp()
})
.await
}

#[tokio::test]
pub async fn can_validate_tokens() {
use crate::types::TokenSource;
let test_provider = Arc::new(RwLock::new(MemoryProvider::default()));
let srv = test_validation_server().await;
let unleash_client =
crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None)
.expect("Couldn't build client");

let mut validation_holder = super::TokenValidator {
unleash_client: Arc::new(unleash_client),
edge_source: test_provider.clone(),
edge_sink: test_provider.clone(),
};
let tokens_to_validate = vec![
"*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(),
"*:production.abcdef1234567890".into(),
];
validation_holder
.register_tokens(tokens_to_validate)
.await
.expect("Couldn't register tokens");
let known_tokens = test_provider
.read()
.await
.get_known_tokens()
.await
.expect("Couldn't get tokens");
assert_eq!(known_tokens.len(), 2);
assert!(known_tokens.iter().any(|t| t.token
== "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f"
&& t.status == TokenValidationStatus::Validated));
assert!(known_tokens
.iter()
.any(|t| t.token == "*:production.abcdef1234567890"
&& t.status == TokenValidationStatus::Invalid));
}

#[tokio::test]
pub async fn tokens_with_wrong_format_is_not_included() {
let test_provider = Arc::new(RwLock::new(MemoryProvider::default()));
let srv = test_validation_server().await;
let unleash_client =
crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None)
.expect("Couldn't build client");
let mut validation_holder = super::TokenValidator {
unleash_client: Arc::new(unleash_client),
edge_source: test_provider.clone(),
edge_sink: test_provider.clone(),
};
let invalid_tokens = vec!["jamesbond".into(), "invalidtoken".into()];
let validated_tokens = validation_holder.register_tokens(invalid_tokens).await;
assert!(validated_tokens.is_err());
}
}
12 changes: 11 additions & 1 deletion server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,22 @@ impl From<EdgeArgs> for EdgeArg {
.args(["redis_url"]),
))]
pub struct EdgeArgs {
/// Where is your upstream URL. Remember, this is the URL to your instance, without any trailing /api suffix
#[clap(short, long, env)]
pub unleash_url: String,

#[clap(short, long, env)]
pub redis_url: Option<String>,
#[clap(short, long, env, default_value_t = 10)]
/// How often should we post metrics upstream?
#[clap(short, long, env, default_value_t = 60)]
pub metrics_interval_seconds: u64,
/// How long between each refresh for a token
#[clap(short, long, env, default_value_t = 10)]
pub features_refresh_interval_seconds: i64,

/// Get data for these client keys at startup. Hot starts your feature cache
#[clap(short, long, env)]
pub client_keys: Vec<String>,
}

#[derive(Args, Debug, Clone)]
Expand Down
2 changes: 0 additions & 2 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::types::{EdgeJsonResult, EdgeResult, EdgeSource, EdgeToken};
use actix_web::web::{self, Json};
use actix_web::{get, post, HttpRequest, HttpResponse};
use tokio::sync::RwLock;
use tracing::info;
use unleash_types::client_features::ClientFeatures;
use unleash_types::client_metrics::{
from_bucket_app_name_and_env, ClientApplication, ClientMetrics,
Expand All @@ -14,7 +13,6 @@ async fn features(
edge_token: EdgeToken,
features_source: web::Data<RwLock<dyn EdgeSource>>,
) -> EdgeJsonResult<ClientFeatures> {
info!("Getting data for {edge_token:?}");
features_source
.read()
.await
Expand Down
26 changes: 19 additions & 7 deletions server/src/data_sources/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;

use crate::{
auth::token_validator::TokenValidator,
cli::{CliArgs, EdgeArg, EdgeMode, OfflineArgs},
http::unleash_client::UnleashClient,
types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken},
Expand All @@ -30,6 +31,7 @@ pub struct SinkInfo {
pub validated_receive: mpsc::Receiver<EdgeToken>,
pub unvalidated_receive: mpsc::Receiver<EdgeToken>,
pub unleash_client: UnleashClient,
pub token_validator: Arc<RwLock<TokenValidator>>,
pub metrics_interval_seconds: u64,
}

Expand All @@ -42,17 +44,19 @@ fn build_offline(offline_args: OfflineArgs) -> EdgeResult<Arc<RwLock<dyn EdgeSou
Ok(provider)
}

fn build_memory(sender: Sender<EdgeToken>) -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(MemoryProvider::new(sender)));
fn build_memory(features_refresh_interval_seconds: i64) -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(MemoryProvider::new(
features_refresh_interval_seconds,
)));
Ok((data_source.clone(), data_source))
}

fn build_redis(redis_url: String, sender: Sender<EdgeToken>) -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(RedisProvider::new(&redis_url, sender)?));
fn build_redis(redis_url: String, _sender: Sender<EdgeToken>) -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(RedisProvider::new(&redis_url)?));
Ok((data_source.clone(), data_source))
}

pub fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
pub async fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
match args.mode {
EdgeMode::Offline(offline_args) => {
let source = build_offline(offline_args)?;
Expand All @@ -70,9 +74,16 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
let (validated_sender, validated_receiver) = mpsc::channel::<EdgeToken>(32);
let (source, sink) = match arg {
EdgeArg::Redis(redis_url) => build_redis(redis_url, unvalidated_sender),
EdgeArg::InMemory => build_memory(unvalidated_sender),
EdgeArg::InMemory => build_memory(edge_args.features_refresh_interval_seconds),
}?;

let mut token_validator = TokenValidator {
unleash_client: Arc::new(unleash_client.clone()),
edge_source: source.clone(),
edge_sink: sink.clone(),
};
if !edge_args.client_keys.is_empty() {
let _ = token_validator.register_tokens(edge_args.client_keys).await;
}
Ok(RepositoryInfo {
source,
sink_info: Some(SinkInfo {
Expand All @@ -81,6 +92,7 @@ pub fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
validated_receive: validated_receiver,
unvalidated_receive: unvalidated_receiver,
unleash_client,
token_validator: Arc::new(RwLock::new(token_validator)),
metrics_interval_seconds: edge_args.metrics_interval_seconds,
}),
})
Expand Down
Loading