Skip to content

Commit

Permalink
chore: move server startup and traits to async
Browse files Browse the repository at this point in the history
  • Loading branch information
sighphyre committed Feb 2, 2023
1 parent f224459 commit 0e0d498
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 28 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ actix-web = { version = "4.3.0", features = ["rustls"] }
actix-web-opentelemetry = { version = "0.13.0", features = ["metrics", "metrics-prometheus"] }

anyhow = "1.0.68"
async-trait = "0.1.64"
awc = { version = "3.1.0", features = ["rustls"] }
chrono = { version = "0.4.23", features = ["serde"] }
clap = { version = "4.1.4", features = ["derive", "env"] }
dashmap = "5.4.0"
dotenv = { version = "0.15.0", features = ["clap"] }
futures = "0.3.26"
opentelemetry = { version = "0.18.0", features = ["trace", "rt-tokio", "metrics"] }
opentelemetry-prometheus = "0.11.0"
prometheus = { version = "0.13.3", features = ["process"] }
Expand Down
54 changes: 35 additions & 19 deletions server/src/data_sources/memory_provider.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::collections::HashMap;

use async_trait::async_trait;
use dashmap::DashMap;
use unleash_types::client_features::ClientFeatures;

use crate::{
error::EdgeError,
types::{EdgeProvider, EdgeResult, EdgeToken, FeaturesProvider, TokenProvider},
types::{EdgeProvider, EdgeResult, EdgeSink, EdgeToken, FeaturesProvider, TokenProvider},
};

#[derive(Debug, Clone, Default)]
Expand All @@ -12,6 +15,20 @@ pub struct MemoryProvider {
token_store: Vec<EdgeToken>,
}

impl MemoryProvider {
fn sink_features(&mut self, token: &EdgeToken, features: ClientFeatures) {
self.data_store.insert(token.secret.clone(), features);
}

fn sink_tokens(&mut self, tokens: Vec<EdgeToken>) {
let joined_tokens = tokens.iter().chain(self.token_store.iter());
let deduplicated: HashMap<String, EdgeToken> = joined_tokens
.map(|x| (x.secret.clone(), x.clone()))
.collect();
self.token_store = deduplicated.into_values().collect();
}
}

impl EdgeProvider for MemoryProvider {}

impl FeaturesProvider for MemoryProvider {
Expand All @@ -38,30 +55,29 @@ impl TokenProvider for MemoryProvider {
}
}

#[async_trait]
impl EdgeSink for MemoryProvider {
async fn sink_features(
&mut self,
token: &EdgeToken,
features: ClientFeatures,
) -> EdgeResult<()> {
self.sink_features(token, features);
Ok(())
}

async fn sink_tokens(&mut self, tokens: Vec<EdgeToken>) -> EdgeResult<()> {
self.sink_tokens(tokens);
Ok(())
}
}

#[cfg(test)]
mod test {
use std::collections::HashMap;

use unleash_types::client_features::ClientFeature;

use crate::types::EdgeSink;

use super::*;

impl EdgeSink for MemoryProvider {
fn sink_features(&mut self, token: &EdgeToken, features: ClientFeatures) {
self.data_store.insert(token.secret.clone(), features);
}

fn sink_tokens(&mut self, tokens: Vec<EdgeToken>) {
let joined_tokens = tokens.iter().chain(self.token_store.iter());
let deduplicated: HashMap<String, EdgeToken> = joined_tokens
.map(|x| (x.secret.clone(), x.clone()))
.collect();
self.token_store = deduplicated.into_values().collect();
}
}

#[test]
fn memory_provider_correctly_deduplicates_tokens() {
let mut provider = MemoryProvider::default();
Expand Down
1 change: 1 addition & 0 deletions server/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod unleash_client;
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ impl UnleashClient {
#[cfg(test)]
mod tests {
use crate::{
fetch::unleash_client::UnleashClient,
types::{ClientFeaturesRequest, ClientFeaturesResponse},
unleash_client::UnleashClient,
};
use actix_http::HttpService;
use actix_http_test::{test_server, TestServer};
Expand Down
1 change: 0 additions & 1 deletion server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pub mod error;
pub mod frontend_api;
pub mod metrics;
pub mod types;
pub mod unleash_client;
pub mod urls;

pub mod tokens;
Expand Down
13 changes: 8 additions & 5 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,12 @@ async fn main() -> Result<(), anyhow::Error> {
} else {
server.bind(http_args.http_server_tuple())
};
server?
.shutdown_timeout(5)
.run()
.await
.map_err(anyhow::Error::new)
let server = server?.shutdown_timeout(5);

tokio::select! {
_ = server.run() => {
}
}

Ok(())
}
10 changes: 8 additions & 2 deletions server/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use actix_web::{
web::{Data, Json},
FromRequest, HttpRequest,
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use shadow_rs::shadow;
Expand Down Expand Up @@ -191,9 +192,14 @@ pub trait TokenProvider {

pub trait EdgeProvider: FeaturesProvider + TokenProvider + Send + Sync {}

#[async_trait]
pub trait EdgeSink {
fn sink_features(&mut self, token: &EdgeToken, features: ClientFeatures);
fn sink_tokens(&mut self, token: Vec<EdgeToken>);
async fn sink_features(
&mut self,
token: &EdgeToken,
features: ClientFeatures,
) -> EdgeResult<()>;
async fn sink_tokens(&mut self, token: Vec<EdgeToken>) -> EdgeResult<()>;
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand Down

0 comments on commit 0e0d498

Please sign in to comment.