Skip to content

Commit

Permalink
chore: redesign source/sink architecture (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
sighphyre authored Feb 7, 2023
1 parent ba72e09 commit cdfa7c2
Show file tree
Hide file tree
Showing 13 changed files with 290 additions and 229 deletions.
2 changes: 2 additions & 0 deletions server/src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use crate::types::{EdgeJsonResult, EdgeSource, EdgeToken};
use actix_web::get;
use actix_web::web::{self, Json};
use tokio::sync::RwLock;
use tracing::info;
use unleash_types::client_features::ClientFeatures;

#[get("/client/features")]
async fn features(
edge_token: EdgeToken,
features_source: web::Data<RwLock<dyn EdgeSource>>,
) -> EdgeJsonResult<ClientFeatures> {
info!("Getting data for {edge_token:?}");
features_source
.read()
.await
Expand Down
64 changes: 52 additions & 12 deletions server/src/data_sources/builder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
use std::sync::Arc;

use reqwest::Url;
use tokio::sync::mpsc;

use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;

use crate::{
cli::{CliArgs, EdgeArg, EdgeMode, OfflineArgs},
types::{EdgeResult, EdgeSink, EdgeSource},
http::unleash_client::UnleashClient,
types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken},
};

use super::{
Expand All @@ -14,6 +19,19 @@ use super::{

pub type DataProviderPair = (Arc<RwLock<dyn EdgeSource>>, Arc<RwLock<dyn EdgeSink>>);

pub struct RepositoryInfo {
pub source: Arc<RwLock<dyn EdgeSource>>,
pub sink_info: Option<SinkInfo>,
}

pub struct SinkInfo {
pub sink: Arc<RwLock<dyn EdgeSink>>,
pub validated_send: mpsc::Sender<EdgeToken>,
pub validated_receive: mpsc::Receiver<EdgeToken>,
pub unvalidated_receive: mpsc::Receiver<EdgeToken>,
pub unleash_client: UnleashClient,
}

fn build_offline(offline_args: OfflineArgs) -> EdgeResult<DataProviderPair> {
let provider = OfflineProvider::instantiate_provider(
offline_args.bootstrap_file,
Expand All @@ -23,25 +41,47 @@ fn build_offline(offline_args: OfflineArgs) -> EdgeResult<DataProviderPair> {
Ok((provider.clone(), provider))
}

fn build_memory() -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(MemoryProvider::default()));
fn build_memory(sender: Sender<EdgeToken>) -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(MemoryProvider::new(sender)));
Ok((data_source.clone(), data_source))
}

fn build_redis(redis_url: String) -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(RedisProvider::new(&redis_url)?));
fn build_redis(redis_url: String, sender: Sender<EdgeToken>) -> EdgeResult<DataProviderPair> {
let data_source = Arc::new(RwLock::new(RedisProvider::new(&redis_url, sender)?));
Ok((data_source.clone(), data_source))
}

pub fn build_source_and_sink(args: CliArgs) -> EdgeResult<DataProviderPair> {
pub fn build_source_and_sink(args: CliArgs) -> EdgeResult<RepositoryInfo> {
match args.mode {
EdgeMode::Offline(offline_args) => build_offline(offline_args),
EdgeMode::Offline(offline_args) => {
let (source, _) = build_offline(offline_args)?;
Ok(RepositoryInfo {
source,
sink_info: None,
})
}
EdgeMode::Edge(edge_args) => {
let arg: EdgeArg = edge_args.into();
match arg {
EdgeArg::Redis(redis_url) => build_redis(redis_url),
EdgeArg::InMemory => build_memory(),
}
let arg: EdgeArg = edge_args.clone().into();
let unleash_client = UnleashClient::from_url(
Url::parse(edge_args.unleash_url.as_str()).expect("Cannot parse Unleash URL"),
);
let (unvalidated_sender, unvalidated_receiver) = mpsc::channel::<EdgeToken>(32);
let (validated_sender, validated_receiver) = mpsc::channel::<EdgeToken>(32);
let (source, sink) = match arg {
EdgeArg::Redis(redis_url) => build_redis(redis_url, unvalidated_sender),
EdgeArg::InMemory => build_memory(unvalidated_sender),
}?;

Ok(RepositoryInfo {
source,
sink_info: Some(SinkInfo {
sink,
validated_send: validated_sender,
validated_receive: validated_receiver,
unvalidated_receive: unvalidated_receiver,
unleash_client,
}),
})
}
}
}
87 changes: 41 additions & 46 deletions server/src/data_sources/memory_provider.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,37 @@
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;

use crate::types::ClientFeaturesResponse;
use crate::types::TokenValidationStatus;
use crate::{
error::EdgeError,
types::{
EdgeProvider, EdgeResult, EdgeSink, EdgeSource, EdgeToken, FeatureSink, FeaturesSource,
TokenSink, TokenSource, ValidateTokensRequest,
},
};
use crate::{
http::unleash_client::UnleashClient,
types::{ClientFeaturesRequest, ClientFeaturesResponse},
use crate::types::{
EdgeResult, EdgeSink, EdgeSource, EdgeToken, FeatureSink, FeaturesSource, TokenSink,
TokenSource,
};
use async_trait::async_trait;
use dashmap::DashMap;
use tokio::sync::mpsc::Sender;
use unleash_types::client_features::ClientFeatures;

#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct MemoryProvider {
data_store: DashMap<String, ClientFeatures>,
token_store: HashMap<String, EdgeToken>,
unleash_client: UnleashClient,
sender: Sender<EdgeToken>,
}

impl MemoryProvider {
pub fn new(sender: Sender<EdgeToken>) -> Self {
Self {
data_store: DashMap::new(),
token_store: HashMap::new(),
sender,
}
}

fn sink_features(&mut self, token: &EdgeToken, features: ClientFeatures) {
self.data_store.insert(token.token.clone(), features);
}
}
impl EdgeProvider for MemoryProvider {}

impl EdgeSource for MemoryProvider {}
impl EdgeSink for MemoryProvider {}

Expand All @@ -41,24 +43,21 @@ impl TokenSink for MemoryProvider {
}
Ok(())
}

async fn validate(&mut self, tokens: Vec<EdgeToken>) -> EdgeResult<Vec<EdgeToken>> {
let validation_request = ValidateTokensRequest {
tokens: tokens.into_iter().map(|t| t.token).collect(),
};
self.unleash_client
.validate_tokens(validation_request)
.await
}
}

#[async_trait]
impl FeaturesSource for MemoryProvider {
async fn get_client_features(&self, token: &EdgeToken) -> EdgeResult<ClientFeatures> {
self.data_store
Ok(self
.data_store
.get(&token.token)
.map(|v| v.value().clone())
.ok_or_else(|| EdgeError::DataSourceError("Token not found".to_string()))
.unwrap_or_else(|| ClientFeatures {
version: 2,
features: vec![],
segments: None,
query: None,
}))
}
}

Expand All @@ -68,15 +67,14 @@ impl TokenSource for MemoryProvider {
Ok(self.token_store.values().into_iter().cloned().collect())
}

async fn get_token_validation_status(
&self,
secret: &str,
sender: Arc<Sender<EdgeToken>>,
) -> EdgeResult<TokenValidationStatus> {
async fn get_token_validation_status(&self, secret: &str) -> EdgeResult<TokenValidationStatus> {
if let Some(token) = self.token_store.get(secret) {
Ok(token.clone().status)
} else {
let _ = sender.send(EdgeToken::try_from(secret.to_string())?).await;
let _ = self
.sender
.send(EdgeToken::try_from(secret.to_string())?)
.await;
Ok(TokenValidationStatus::Unknown)
}
}
Expand Down Expand Up @@ -106,28 +104,23 @@ impl FeatureSink for MemoryProvider {
Ok(())
}

async fn fetch_features(&mut self, token: &EdgeToken) -> EdgeResult<ClientFeaturesResponse> {
self.unleash_client
.get_client_features(ClientFeaturesRequest {
api_key: token.token.clone(),
etag: None,
})
.await
async fn fetch_features(&mut self, _token: &EdgeToken) -> EdgeResult<ClientFeaturesResponse> {
todo!()
}
}

#[cfg(test)]
mod test {
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::mpsc;
use unleash_types::client_features::ClientFeature;

use super::*;

#[tokio::test]
async fn memory_provider_correctly_deduplicates_tokens() {
let mut provider = MemoryProvider::default();
let (send, _) = mpsc::channel::<EdgeToken>(32);
let mut provider = MemoryProvider::new(send);
let _ = provider
.sink_tokens(vec![EdgeToken {
token: "some_secret".into(),
Expand All @@ -147,7 +140,8 @@ mod test {

#[tokio::test]
async fn memory_provider_correctly_determines_token_to_be_valid() {
let mut provider = MemoryProvider::default();
let (send, _) = mpsc::channel::<EdgeToken>(32);
let mut provider = MemoryProvider::new(send);
let _ = provider
.sink_tokens(vec![EdgeToken {
token: "some_secret".into(),
Expand All @@ -156,11 +150,9 @@ mod test {
}])
.await;

let (send, _) = mpsc::channel::<EdgeToken>(32);

assert_eq!(
provider
.get_token_validation_status("some_secret", Arc::new(send))
.get_token_validation_status("some_secret")
.await
.unwrap(),
TokenValidationStatus::Validated
Expand All @@ -169,7 +161,9 @@ mod test {

#[tokio::test]
async fn memory_provider_yields_correct_response_for_token() {
let mut provider = MemoryProvider::default();
let (send, _) = mpsc::channel::<EdgeToken>(32);

let mut provider = MemoryProvider::new(send);
let token = EdgeToken {
token: "some-secret".into(),
..EdgeToken::default()
Expand Down Expand Up @@ -202,7 +196,8 @@ mod test {
..EdgeToken::from_str("frankdrebin").unwrap()
};

let mut provider = MemoryProvider::default();
let (send, _) = mpsc::channel::<EdgeToken>(32);
let mut provider = MemoryProvider::new(send);
let _ = provider
.sink_tokens(vec![james_bond.clone(), frank_drebin.clone()])
.await;
Expand Down
16 changes: 3 additions & 13 deletions server/src/data_sources/offline_provider.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::error::EdgeError;
use crate::types::{
ClientFeaturesResponse, EdgeProvider, EdgeResult, EdgeSink, EdgeSource, EdgeToken, FeatureSink,
ClientFeaturesResponse, EdgeResult, EdgeSink, EdgeSource, EdgeToken, FeatureSink,
FeaturesSource, TokenSink, TokenSource, TokenValidationStatus,
};
use actix_web::http::header::EntityTag;
Expand All @@ -9,8 +9,6 @@ use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use unleash_types::client_features::ClientFeatures;

#[derive(Debug, Clone)]
Expand All @@ -32,11 +30,7 @@ impl TokenSource for OfflineProvider {
Ok(self.valid_tokens.values().cloned().collect())
}

async fn get_token_validation_status(
&self,
secret: &str,
_: Arc<Sender<EdgeToken>>,
) -> EdgeResult<TokenValidationStatus> {
async fn get_token_validation_status(&self, secret: &str) -> EdgeResult<TokenValidationStatus> {
Ok(if self.valid_tokens.contains_key(secret) {
TokenValidationStatus::Validated
} else {
Expand All @@ -58,7 +52,6 @@ impl TokenSource for OfflineProvider {
}
}

impl EdgeProvider for OfflineProvider {}
impl EdgeSource for OfflineProvider {}
impl EdgeSink for OfflineProvider {}

Expand All @@ -77,15 +70,12 @@ impl FeatureSink for OfflineProvider {
)))
}
}

#[async_trait]
impl TokenSink for OfflineProvider {
async fn sink_tokens(&mut self, _token: Vec<EdgeToken>) -> EdgeResult<()> {
todo!()
}

async fn validate(&mut self, _token: Vec<EdgeToken>) -> EdgeResult<Vec<EdgeToken>> {
todo!()
}
}

impl OfflineProvider {
Expand Down
Loading

0 comments on commit cdfa7c2

Please sign in to comment.