Skip to content

Commit

Permalink
feat: s3 persister (#542)
Browse files Browse the repository at this point in the history
Co-authored-by: Christopher Kolstad <chriswk@getunleash.io>
  • Loading branch information
sighphyre and chriswk authored Sep 30, 2024
1 parent 19048e3 commit 1522e29
Show file tree
Hide file tree
Showing 9 changed files with 1,172 additions and 82 deletions.
942 changes: 870 additions & 72 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ actix-web = { version = "4.9.0", features = ["rustls-0_23", "compress-zstd"] }
ahash = "0.8.11"
anyhow = "1.0.89"
async-trait = "0.1.82"
aws-config = { version = "1.5.6", features = ["behavior-version-latest"] }
aws-sdk-s3 = { version = "1.51.0", features = ["behavior-version-latest"] }
chrono = { version = "0.4.38", features = ["serde"] }
cidr = "0.2.3"
clap = { version = "4.5.16", features = ["derive", "env"] }
Expand Down Expand Up @@ -98,8 +100,11 @@ env_logger = "0.11.5"
maplit = "1.0.2"
rand = "0.8.5"
test-case = "3.3.1"
testcontainers = "0.22.0"
testcontainers-modules = { version = "0.10.0", features = ["redis"] }
testcontainers = "0.23.1"
testcontainers-modules = { version = "0.11.1", features = [
"redis",
"localstack",
] }
tracing-test = "0.2.5"

[build-dependencies]
Expand Down
13 changes: 13 additions & 0 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::http::unleash_client::new_reqwest_client;
use crate::offline::offline_hotload::{load_bootstrap, load_offline_engine_cache};
use crate::persistence::file::FilePersister;
use crate::persistence::redis::RedisPersister;
use crate::persistence::s3::S3Persister;
use crate::persistence::EdgePersistence;
use crate::{
auth::token_validator::TokenValidator,
Expand Down Expand Up @@ -199,6 +200,17 @@ async fn get_data_source(args: &EdgeArgs) -> Option<Arc<dyn EdgePersistence>> {
return Some(Arc::new(redis_persister));
}

if let Some(s3_args) = args.s3.clone() {
let s3_persister = S3Persister::new_from_env(
&s3_args
.s3_bucket_name
.clone()
.expect("Clap is confused, there's no bucket name"),
)
.await;
return Some(Arc::new(s3_persister));
}

if let Some(backup_folder) = args.backup_folder.clone() {
debug!("Configuring file persistence {backup_folder:?}");
let backup_client = FilePersister::new(&backup_folder);
Expand Down Expand Up @@ -335,6 +347,7 @@ mod tests {
dynamic: false,
tokens: vec![],
redis: None,
s3: None,
client_identity: Default::default(),
skip_ssl_verification: false,
upstream_request_timeout: Default::default(),
Expand Down
16 changes: 14 additions & 2 deletions server/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ impl Display for RedisScheme {
}
}
}

#[derive(Args, Debug, Clone)]
pub struct S3Args {
/// Bucket name to use for storing feature and token data
#[clap(long, env)]
pub s3_bucket_name: Option<String>,
}

#[derive(Copy, Debug, Clone, Eq, PartialEq, PartialOrd, Ord, ValueEnum)]
pub enum RedisMode {
Single,
Expand Down Expand Up @@ -131,7 +139,7 @@ pub struct ClientIdentity {
#[derive(Args, Debug, Clone)]
#[command(group(
ArgGroup::new("data-provider")
.args(["redis_url", "backup_folder"]),
.args(["redis_url", "backup_folder", "s3_bucket_name"]),
))]
pub struct EdgeArgs {
/// Where is your upstream URL. Remember, this is the URL to your instance, without any trailing /api suffix
Expand Down Expand Up @@ -180,10 +188,14 @@ pub struct EdgeArgs {
#[clap(long, env, default_value_t = 5)]
pub upstream_socket_timeout: i64,

/// A URL pointing to a running Redis instance. Edge will use this instance to persist feature and token data and read this back after restart. Mutually exclusive with the --backup-folder option
/// A URL pointing to a running Redis instance. Edge will use this instance to persist feature and token data and read this back after restart. Mutually exclusive with the --backup-folder and --s3-bucket options
#[clap(flatten)]
pub redis: Option<RedisArgs>,

/// Configuration for S3 storage. Edge will use this instance to persist feature and token data and read this back after restart. Mutually exclusive with the --redis-url and --backup-folder options
#[clap(flatten)]
pub s3: Option<S3Args>,

/// Token header to use for both edge authorization and communication with the upstream server.
#[clap(long, env, global = true, default_value = "Authorization")]
pub token_header: TokenHeader,
Expand Down
1 change: 1 addition & 0 deletions server/src/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::types::{EdgeResult, EdgeToken, TokenValidationStatus};

pub mod file;
pub mod redis;
pub mod s3;

#[async_trait]
pub trait EdgePersistence: Send + Sync {
Expand Down
1 change: 0 additions & 1 deletion server/src/persistence/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use super::EdgePersistence;

pub const FEATURES_KEY: &str = "unleash-features";
pub const TOKENS_KEY: &str = "unleash-tokens";
pub const REFRESH_TARGETS_KEY: &str = "unleash-refresh-targets";

impl From<RedisError> for EdgeError {
fn from(err: RedisError) -> Self {
Expand Down
177 changes: 177 additions & 0 deletions server/src/persistence/s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
use std::collections::HashMap;

use async_trait::async_trait;
use unleash_types::client_features::ClientFeatures;

use super::EdgePersistence;
use crate::{
error::EdgeError,
types::{EdgeResult, EdgeToken},
};
use aws_sdk_s3::{
self as s3,
error::SdkError,
operation::{get_object::GetObjectError, put_object::PutObjectError},
primitives::{ByteStream, SdkBody},
};

pub const FEATURES_KEY: &str = "/unleash-features.json";
pub const TOKENS_KEY: &str = "/unleash-tokens.json";

pub struct S3Persister {
client: s3::Client,
bucket: String,
}

impl S3Persister {
pub fn new_with_config(bucket_name: &str, config: s3::config::Config) -> Self {
let client = s3::Client::from_conf(config);
Self {
client,
bucket: bucket_name.to_string(),
}
}
pub async fn new_from_env(bucket_name: &str) -> Self {
let shared_config = aws_config::load_from_env().await;
let client = s3::Client::new(&shared_config);
Self {
client,
bucket: bucket_name.to_string(),
}
}
}

impl From<SdkError<GetObjectError>> for EdgeError {
fn from(err: SdkError<GetObjectError>) -> Self {
EdgeError::PersistenceError(format!("failed to get object {}", err))
}
}

impl From<SdkError<PutObjectError>> for EdgeError {
fn from(err: SdkError<PutObjectError>) -> Self {
EdgeError::PersistenceError(format!("failed to put object {}", err))
}
}

impl S3Persister {
async fn create_bucket_if_not_exists(&self) -> EdgeResult<()> {
match self
.client
.create_bucket()
.bucket(&self.bucket)
.send()
.await
{
Ok(_) => Ok(()),
Err(err) => {
if err.to_string().contains("BucketAlreadyOwnedByYou")
|| err.to_string().contains("BucketAlreadyExists")
{
Ok(())
} else {
Err(EdgeError::PersistenceError(format!(
"Failed to create bucket: {}",
err
)))
}
}
}
}
}

#[async_trait]
impl EdgePersistence for S3Persister {
async fn load_tokens(&self) -> EdgeResult<Vec<EdgeToken>> {
let response = self
.client
.get_object()
.bucket(self.bucket.clone())
.key(TOKENS_KEY)
.response_content_type("application/json")
.send()
.await?;
let data = response.body.collect().await.expect("Failed data");
serde_json::from_slice(&data.to_vec())
.map_err(|_| EdgeError::PersistenceError("Failed to deserialize tokens".to_string()))
}

async fn save_tokens(&self, tokens: Vec<EdgeToken>) -> EdgeResult<()> {
self.create_bucket_if_not_exists().await?;
let body_data = serde_json::to_vec(&tokens)
.map_err(|_| EdgeError::PersistenceError("Failed to serialize tokens".to_string()))
.map(SdkBody::from)?;
let byte_stream = aws_sdk_s3::primitives::ByteStream::new(body_data);
self.client
.put_object()
.bucket(self.bucket.clone())
.key(TOKENS_KEY)
.body(byte_stream)
.send()
.await
.map(|_| ())
.map_err(|err| {
dbg!(err);
EdgeError::PersistenceError("Failed to save tokens".to_string())
})
}

async fn load_features(&self) -> EdgeResult<HashMap<String, ClientFeatures>> {
let query = self
.client
.get_object()
.bucket(self.bucket.clone())
.key(FEATURES_KEY)
.response_content_type("application/json")
.send()
.await
.map_err(|err| {
if err.to_string().contains("NoSuchKey") {
return EdgeError::PersistenceError("No features found".to_string());
}
dbg!(err);
EdgeError::PersistenceError("Failed to load features".to_string())
});
match query {
Ok(response) => {
let data = response.body.collect().await.expect("Failed data");
let deser: Vec<(String, ClientFeatures)> = serde_json::from_slice(&data.to_vec())
.map_err(|_| {
EdgeError::PersistenceError("Failed to deserialize features".to_string())
})?;
Ok(deser
.iter()
.cloned()
.collect::<HashMap<String, ClientFeatures>>())
}
Err(e) => {
eprintln!("Err Arg, failed to read features");
dbg!(e);
Ok(HashMap::new())
}
}
}

async fn save_features(&self, features: Vec<(String, ClientFeatures)>) -> EdgeResult<()> {
self.create_bucket_if_not_exists().await?;
let body_data = serde_json::to_vec(&features)
.map_err(|_| EdgeError::PersistenceError("Failed to serialize features".to_string()))?;
let byte_stream = ByteStream::new(SdkBody::from(body_data));
match self
.client
.put_object()
.bucket(self.bucket.clone())
.key(FEATURES_KEY)
.body(byte_stream)
.send()
.await
{
Ok(_) => Ok(()),
Err(s3_err) => {
dbg!(s3_err);
Err(EdgeError::PersistenceError(
"Failed to save features".to_string(),
))
}
}
}
}
12 changes: 7 additions & 5 deletions server/tests/redis_test.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
use std::{str::FromStr, time::Duration};

use redis::Client;
use testcontainers::runners::AsyncRunner;
use testcontainers::ContainerAsync;
use testcontainers_modules::redis::Redis;
use testcontainers_modules::redis::RedisStack;
use unleash_types::client_features::{ClientFeature, ClientFeatures};

use testcontainers::{runners::AsyncRunner, ContainerAsync};
use unleash_edge::{
persistence::{redis::RedisPersister, EdgePersistence},
types::{EdgeToken, TokenType},
};

const TEST_TIMEOUT: Duration = std::time::Duration::from_millis(1000);

async fn setup_redis() -> (Client, String, ContainerAsync<Redis>) {
let node = Redis.start().await.expect("Failed to start redis");
async fn setup_redis() -> (Client, String, ContainerAsync<RedisStack>) {
let node = RedisStack::default()
.start()
.await
.expect("Failed to start redis");
let host_port = node
.get_host_port_ipv4(6379)
.await
Expand Down
Loading

0 comments on commit 1522e29

Please sign in to comment.