diff --git a/apps/server/src/config/session/cleanup.rs b/apps/server/src/config/session/cleanup.rs new file mode 100644 index 000000000..46b85f4c8 --- /dev/null +++ b/apps/server/src/config/session/cleanup.rs @@ -0,0 +1,41 @@ +use prisma_client_rust::chrono::Utc; +use stump_core::{ + job::{Job, JobError, JobTrait, WorkerCtx}, + prisma::session, +}; + +pub const SESSION_CLEANUP_JOB_NAME: &str = "session_cleanup"; + +pub struct SessionCleanupJob; + +impl SessionCleanupJob { + pub fn new() -> Box> { + Job::new(Self) + } +} + +#[async_trait::async_trait] +impl JobTrait for SessionCleanupJob { + fn name(&self) -> &'static str { + SESSION_CLEANUP_JOB_NAME + } + + fn description(&self) -> Option> { + None + } + + async fn run(&mut self, ctx: WorkerCtx) -> Result { + tracing::trace!("Deleting expired sessions"); + + let client = ctx.core_ctx.db.clone(); + let affected_rows = client + .session() + .delete_many(vec![session::expires_at::lt(Utc::now().into())]) + .exec() + .await?; + + tracing::trace!(affected_rows = ?affected_rows, "Deleted expired sessions"); + + Ok(1) + } +} diff --git a/apps/server/src/config/session/mod.rs b/apps/server/src/config/session/mod.rs index 7a3dcf296..db8f4eb88 100644 --- a/apps/server/src/config/session/mod.rs +++ b/apps/server/src/config/session/mod.rs @@ -1,7 +1,9 @@ +mod cleanup; mod store; mod utils; -pub use store::{PrismaSessionStore, SessionError, SessionResult}; +pub use cleanup::SessionCleanupJob; +pub use store::{PrismaSessionStore, SessionError}; pub use utils::{ get_session_expiry_cleanup_interval, get_session_layer, get_session_ttl, handle_session_service_error, SESSION_USER_KEY, diff --git a/apps/server/src/config/session/store.rs b/apps/server/src/config/session/store.rs index f3f429020..bd5738dd4 100644 --- a/apps/server/src/config/session/store.rs +++ b/apps/server/src/config/session/store.rs @@ -4,11 +4,13 @@ use prisma_client_rust::chrono::{DateTime, Duration, FixedOffset, Utc}; use stump_core::{ db::entity::User, prisma::{session, user, PrismaClient}, + Ctx, }; use time::OffsetDateTime; +use tokio::time::MissedTickBehavior; use tower_sessions::{session::SessionId, Session, SessionRecord, SessionStore}; -use super::{get_session_ttl, SESSION_USER_KEY}; +use super::{get_session_ttl, SessionCleanupJob, SESSION_USER_KEY}; #[derive(Debug, thiserror::Error)] pub enum SessionError { @@ -22,8 +24,6 @@ pub enum SessionError { SerdeError(#[from] serde_json::Error), } -pub type SessionResult = Result; - #[derive(Clone)] pub struct PrismaSessionStore { client: Arc, @@ -34,28 +34,21 @@ impl PrismaSessionStore { Self { client } } - async fn delete_expired(&self) -> SessionResult<()> { - tracing::trace!("Deleting expired sessions"); - - let affected_rows = self - .client - .session() - .delete_many(vec![session::expires_at::lt(Utc::now().into())]) - .exec() - .await?; - - tracing::trace!(affected_rows = ?affected_rows, "Deleted expired sessions"); - - Ok(()) - } - - pub async fn continuously_delete_expired(self, period: tokio::time::Duration) { + pub async fn continuously_delete_expired( + self, + period: tokio::time::Duration, + ctx: Arc, + ) { let mut interval = tokio::time::interval(period); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); loop { - if let Err(error) = self.delete_expired().await { - tracing::error!(error = ?error, "Failed to delete expired sessions"); + interval.tick().await; // The first tick completes immediately + if let Err(error) = ctx.dispatch_job(SessionCleanupJob::new()) { + tracing::error!(error = ?error, "Failed to dispatch session cleanup job"); + } else { + tracing::trace!("Dispatched session cleanup job"); } - interval.tick().await; + tracing::trace!("Waiting for next session cleanup interval..."); } } } diff --git a/apps/server/src/config/session/utils.rs b/apps/server/src/config/session/utils.rs index e6b29db4d..a43dbe892 100644 --- a/apps/server/src/config/session/utils.rs +++ b/apps/server/src/config/session/utils.rs @@ -5,7 +5,7 @@ use axum::{ }; use hyper::StatusCode; use std::{env, sync::Arc}; -use stump_core::prisma::PrismaClient; +use stump_core::Ctx; use time::Duration; use tower_sessions::{cookie::SameSite, SessionManagerLayer}; @@ -16,40 +16,50 @@ pub const SESSION_USER_KEY: &str = "user"; pub const SESSION_NAME: &str = "stump_session"; pub const SESSION_PATH: &str = "/"; +pub const DEFAULT_SESSION_TTL: i64 = 3600 * 24 * 3; // 3 days +pub const DEFAULT_SESSION_EXPIRY_CLEANUP_INTERVAL: u64 = 60 * 60 * 24; // 24 hours + pub fn get_session_ttl() -> i64 { env::var("SESSION_TTL") .map(|s| { - s.parse::().unwrap_or_else(|e| { - tracing::error!(error = ?e, "Failed to parse provided SESSION_TTL"); - 3600 * 24 * 3 + s.parse::().unwrap_or_else(|error| { + tracing::error!(?error, "Failed to parse provided SESSION_TTL"); + DEFAULT_SESSION_TTL }) }) - .unwrap_or(3600 * 24 * 3) + .unwrap_or(DEFAULT_SESSION_TTL) } pub fn get_session_expiry_cleanup_interval() -> u64 { env::var("SESSION_EXPIRY_CLEANUP_INTERVAL") .map(|s| { - s.parse::().unwrap_or_else(|e| { - tracing::error!(error = ?e, "Failed to parse provided SESSION_EXPIRY_CLEANUP_INTERVAL"); - 60 + s.parse::().unwrap_or_else(|error| { + tracing::error!( + ?error, + "Failed to parse provided SESSION_EXPIRY_CLEANUP_INTERVAL" + ); + DEFAULT_SESSION_EXPIRY_CLEANUP_INTERVAL }) }) - .unwrap_or(60) + .unwrap_or(DEFAULT_SESSION_EXPIRY_CLEANUP_INTERVAL) } -pub fn get_session_layer( - client: Arc, -) -> SessionManagerLayer { +pub fn get_session_layer(ctx: Arc) -> SessionManagerLayer { + let client = ctx.db.clone(); let store = PrismaSessionStore::new(client); let cleanup_interval = get_session_expiry_cleanup_interval(); if cleanup_interval > 0 { + tracing::trace!( + cleanup_interval = cleanup_interval, + "Spawning session expiry cleanup task" + ); tokio::task::spawn(store.clone().continuously_delete_expired( tokio::time::Duration::from_secs(cleanup_interval), + ctx, )); } else { - tracing::debug!("SESSION_EXPIRY_CLEANUP_INTERVAL is set to 0, session expiry cleanup is disabled."); + tracing::debug!("SESSION_EXPIRY_CLEANUP_INTERVAL is set to 0. Session expiry cleanup is disabled"); } let session_ttl = get_session_ttl(); diff --git a/apps/server/src/http_server.rs b/apps/server/src/http_server.rs index 54a12b6fd..67b99b2a0 100644 --- a/apps/server/src/http_server.rs +++ b/apps/server/src/http_server.rs @@ -48,7 +48,7 @@ pub(crate) async fn run_http_server(port: u16) -> ServerResult<()> { let session_service = ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_session_service_error)) - .layer(session::get_session_layer(app_state.db.clone())); + .layer(session::get_session_layer(app_state.clone())); let app = Router::new() .merge(routers::mount(app_state.clone())) diff --git a/core/src/job/job_manager.rs b/core/src/job/job_manager.rs index 16b7b03ef..920cccfd5 100644 --- a/core/src/job/job_manager.rs +++ b/core/src/job/job_manager.rs @@ -90,8 +90,18 @@ impl JobManager { if workers.get(&job_id).is_some() { tracing::trace!(job_id, "Sending shutdown signal to worker"); self.shutdown_tx - .send(JobManagerShutdownSignal::Worker(job_id.clone()))?; - + .send(JobManagerShutdownSignal::Worker(job_id.clone())) + .map_or_else( + |error| { + tracing::error!( + ?error, + "Failed to send shutdown signal to worker!" + ); + }, + |_| { + tracing::trace!(job_id, "Shutdown signal sent to worker"); + }, + ); workers.remove(&job_id); drop(workers); return Ok(());