Skip to content

Commit

Permalink
🎨 Session cleanup as queueable job (stumpapp#201)
Browse files Browse the repository at this point in the history
In an effort to reduce the liklihood of multiple DB writers at a given time, I made the session cleanup task a job. This way, it has to be queued before it can do its thing
  • Loading branch information
aaronleopold authored and JMicheli committed Dec 4, 2023
1 parent 29ac0a1 commit d192efd
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 39 deletions.
41 changes: 41 additions & 0 deletions apps/server/src/config/session/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use prisma_client_rust::chrono::Utc;
use stump_core::{
job::{Job, JobError, JobTrait, WorkerCtx},
prisma::session,
};

pub const SESSION_CLEANUP_JOB_NAME: &str = "session_cleanup";

pub struct SessionCleanupJob;

impl SessionCleanupJob {
pub fn new() -> Box<Job<SessionCleanupJob>> {
Job::new(Self)
}
}

#[async_trait::async_trait]
impl JobTrait for SessionCleanupJob {
fn name(&self) -> &'static str {
SESSION_CLEANUP_JOB_NAME
}

fn description(&self) -> Option<Box<&str>> {
None
}

async fn run(&mut self, ctx: WorkerCtx) -> Result<u64, JobError> {
tracing::trace!("Deleting expired sessions");

let client = ctx.core_ctx.db.clone();
let affected_rows = client
.session()
.delete_many(vec![session::expires_at::lt(Utc::now().into())])
.exec()
.await?;

tracing::trace!(affected_rows = ?affected_rows, "Deleted expired sessions");

Ok(1)
}
}
4 changes: 3 additions & 1 deletion apps/server/src/config/session/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod cleanup;
mod store;
mod utils;

pub use store::{PrismaSessionStore, SessionError, SessionResult};
pub use cleanup::SessionCleanupJob;
pub use store::{PrismaSessionStore, SessionError};
pub use utils::{
get_session_expiry_cleanup_interval, get_session_layer, get_session_ttl,
handle_session_service_error, SESSION_USER_KEY,
Expand Down
37 changes: 15 additions & 22 deletions apps/server/src/config/session/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use prisma_client_rust::chrono::{DateTime, Duration, FixedOffset, Utc};
use stump_core::{
db::entity::User,
prisma::{session, user, PrismaClient},
Ctx,
};
use time::OffsetDateTime;
use tokio::time::MissedTickBehavior;
use tower_sessions::{session::SessionId, Session, SessionRecord, SessionStore};

use super::{get_session_ttl, SESSION_USER_KEY};
use super::{get_session_ttl, SessionCleanupJob, SESSION_USER_KEY};

#[derive(Debug, thiserror::Error)]
pub enum SessionError {
Expand All @@ -22,8 +24,6 @@ pub enum SessionError {
SerdeError(#[from] serde_json::Error),
}

pub type SessionResult<T> = Result<T, SessionError>;

#[derive(Clone)]
pub struct PrismaSessionStore {
client: Arc<PrismaClient>,
Expand All @@ -34,28 +34,21 @@ impl PrismaSessionStore {
Self { client }
}

async fn delete_expired(&self) -> SessionResult<()> {
tracing::trace!("Deleting expired sessions");

let affected_rows = self
.client
.session()
.delete_many(vec![session::expires_at::lt(Utc::now().into())])
.exec()
.await?;

tracing::trace!(affected_rows = ?affected_rows, "Deleted expired sessions");

Ok(())
}

pub async fn continuously_delete_expired(self, period: tokio::time::Duration) {
pub async fn continuously_delete_expired(
self,
period: tokio::time::Duration,
ctx: Arc<Ctx>,
) {
let mut interval = tokio::time::interval(period);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
if let Err(error) = self.delete_expired().await {
tracing::error!(error = ?error, "Failed to delete expired sessions");
interval.tick().await; // The first tick completes immediately
if let Err(error) = ctx.dispatch_job(SessionCleanupJob::new()) {
tracing::error!(error = ?error, "Failed to dispatch session cleanup job");
} else {
tracing::trace!("Dispatched session cleanup job");
}
interval.tick().await;
tracing::trace!("Waiting for next session cleanup interval...");
}
}
}
Expand Down
36 changes: 23 additions & 13 deletions apps/server/src/config/session/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use axum::{
};
use hyper::StatusCode;
use std::{env, sync::Arc};
use stump_core::prisma::PrismaClient;
use stump_core::Ctx;
use time::Duration;

use tower_sessions::{cookie::SameSite, SessionManagerLayer};
Expand All @@ -16,40 +16,50 @@ pub const SESSION_USER_KEY: &str = "user";
pub const SESSION_NAME: &str = "stump_session";
pub const SESSION_PATH: &str = "/";

pub const DEFAULT_SESSION_TTL: i64 = 3600 * 24 * 3; // 3 days
pub const DEFAULT_SESSION_EXPIRY_CLEANUP_INTERVAL: u64 = 60 * 60 * 24; // 24 hours

pub fn get_session_ttl() -> i64 {
env::var("SESSION_TTL")
.map(|s| {
s.parse::<i64>().unwrap_or_else(|e| {
tracing::error!(error = ?e, "Failed to parse provided SESSION_TTL");
3600 * 24 * 3
s.parse::<i64>().unwrap_or_else(|error| {
tracing::error!(?error, "Failed to parse provided SESSION_TTL");
DEFAULT_SESSION_TTL
})
})
.unwrap_or(3600 * 24 * 3)
.unwrap_or(DEFAULT_SESSION_TTL)
}

pub fn get_session_expiry_cleanup_interval() -> u64 {
env::var("SESSION_EXPIRY_CLEANUP_INTERVAL")
.map(|s| {
s.parse::<u64>().unwrap_or_else(|e| {
tracing::error!(error = ?e, "Failed to parse provided SESSION_EXPIRY_CLEANUP_INTERVAL");
60
s.parse::<u64>().unwrap_or_else(|error| {
tracing::error!(
?error,
"Failed to parse provided SESSION_EXPIRY_CLEANUP_INTERVAL"
);
DEFAULT_SESSION_EXPIRY_CLEANUP_INTERVAL
})
})
.unwrap_or(60)
.unwrap_or(DEFAULT_SESSION_EXPIRY_CLEANUP_INTERVAL)
}

pub fn get_session_layer(
client: Arc<PrismaClient>,
) -> SessionManagerLayer<PrismaSessionStore> {
pub fn get_session_layer(ctx: Arc<Ctx>) -> SessionManagerLayer<PrismaSessionStore> {
let client = ctx.db.clone();
let store = PrismaSessionStore::new(client);

let cleanup_interval = get_session_expiry_cleanup_interval();
if cleanup_interval > 0 {
tracing::trace!(
cleanup_interval = cleanup_interval,
"Spawning session expiry cleanup task"
);
tokio::task::spawn(store.clone().continuously_delete_expired(
tokio::time::Duration::from_secs(cleanup_interval),
ctx,
));
} else {
tracing::debug!("SESSION_EXPIRY_CLEANUP_INTERVAL is set to 0, session expiry cleanup is disabled.");
tracing::debug!("SESSION_EXPIRY_CLEANUP_INTERVAL is set to 0. Session expiry cleanup is disabled");
}
let session_ttl = get_session_ttl();

Expand Down
2 changes: 1 addition & 1 deletion apps/server/src/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(crate) async fn run_http_server(port: u16) -> ServerResult<()> {

let session_service = ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_session_service_error))
.layer(session::get_session_layer(app_state.db.clone()));
.layer(session::get_session_layer(app_state.clone()));

let app = Router::new()
.merge(routers::mount(app_state.clone()))
Expand Down
14 changes: 12 additions & 2 deletions core/src/job/job_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,18 @@ impl JobManager {
if workers.get(&job_id).is_some() {
tracing::trace!(job_id, "Sending shutdown signal to worker");
self.shutdown_tx
.send(JobManagerShutdownSignal::Worker(job_id.clone()))?;

.send(JobManagerShutdownSignal::Worker(job_id.clone()))
.map_or_else(
|error| {
tracing::error!(
?error,
"Failed to send shutdown signal to worker!"
);
},
|_| {
tracing::trace!(job_id, "Shutdown signal sent to worker");
},
);
workers.remove(&job_id);
drop(workers);
return Ok(());
Expand Down

0 comments on commit d192efd

Please sign in to comment.