diff --git a/src/lib.rs b/src/lib.rs index 3a80bae..a49a3f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,6 +62,7 @@ pub fn opts() -> EnqueueOpts { queue: "default".into(), retry: RetryOpts::Yes, unique_for: None, + retry_queue: None, } } @@ -69,6 +70,7 @@ pub struct EnqueueOpts { queue: String, retry: RetryOpts, unique_for: Option, + retry_queue: Option, } impl EnqueueOpts { @@ -99,6 +101,14 @@ impl EnqueueOpts { } } + #[must_use] + pub fn retry_queue(self, retry_queue: String) -> Self { + Self { + retry_queue: Some(retry_queue), + ..self + } + } + pub fn create_job(&self, class: String, args: impl serde::Serialize) -> Result { let args = serde_json::to_value(args)?; @@ -120,11 +130,13 @@ impl EnqueueOpts { // Make default eventually... error_message: None, + error_class: None, failed_at: None, retry_count: None, retried_at: None, // Meta for enqueueing + retry_queue: self.retry_queue.clone(), unique_for: self.unique_for, }) } @@ -191,6 +203,7 @@ pub struct WorkerOpts + ?Sized> { args: PhantomData, worker: PhantomData, unique_for: Option, + retry_queue: Option, } impl WorkerOpts @@ -205,6 +218,7 @@ where args: PhantomData, worker: PhantomData, unique_for: None, + retry_queue: None, } } @@ -219,6 +233,14 @@ where } } + #[must_use] + pub fn retry_queue>(self, retry_queue: S) -> Self { + Self { + retry_queue: Some(retry_queue.into()), + ..self + } + } + #[must_use] pub fn queue>(self, queue: S) -> Self { Self { @@ -268,6 +290,7 @@ impl> From<&WorkerOpts> for EnqueueOpts { retry: opts.retry.clone(), queue: opts.queue.clone(), unique_for: opts.unique_for, + retry_queue: opts.retry_queue.clone(), } } } @@ -511,8 +534,10 @@ pub struct Job { pub enqueued_at: Option, pub failed_at: Option, pub error_message: Option, + pub error_class: Option, pub retry_count: Option, pub retried_at: Option, + pub retry_queue: Option, #[serde(skip)] pub unique_for: Option, diff --git a/src/middleware.rs b/src/middleware.rs index c4bf702..ec3a52d 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -200,9 +200,15 @@ impl ServerMiddleware for RetryMiddleware { "class" = &job.class, "jid" = &job.jid, "queue" = &job.queue, + "retry_queue" = &job.retry_queue, "err" = &job.error_message }, "Scheduling job for retry in the future"); + // We will now make sure we use the new retry_queue option if set. + if let Some(ref retry_queue) = job.retry_queue { + job.queue = retry_queue.into(); + } + UnitOfWork::from_job(job).reenqueue(&redis).await?; } @@ -233,8 +239,10 @@ mod test { enqueued_at: None, failed_at: None, error_message: None, + error_class: None, retry_count: None, retried_at: None, + retry_queue: None, unique_for: None, } } diff --git a/src/periodic.rs b/src/periodic.rs index 79b7b08..4ea006f 100644 --- a/src/periodic.rs +++ b/src/periodic.rs @@ -200,9 +200,11 @@ impl PeriodicJob { // Make default eventually... error_message: None, + error_class: None, failed_at: None, retry_count: None, retried_at: None, + retry_queue: None, // Meta data not used in periodic jobs right now... unique_for: None, diff --git a/tests/server_middleware_test.rs b/tests/server_middleware_test.rs index bef7f54..1d6b7d4 100644 --- a/tests/server_middleware_test.rs +++ b/tests/server_middleware_test.rs @@ -289,4 +289,45 @@ mod test { assert_eq!(n_jobs_retried, 0, "no jobs in the retry queue"); } + + #[tokio::test] + #[serial] + async fn can_retry_job_into_different_retry_queue() { + let worker = AlwaysFailWorker; + let queue = "failure_zone_max_on_job".to_string(); + let retry_queue = "the_retry_queue".to_string(); + let (mut p, redis) = new_base_processor(queue.clone()).await; + let (mut retry_p, _retry_redis) = new_base_processor(retry_queue.clone()).await; + p.register(worker.clone()); + + let mut job = AlwaysFailWorker::opts() + .queue(queue) + .retry(5) + .retry_queue(&retry_queue) + .perform_async(&redis, ()) + .await + .expect("enqueues"); + + assert_eq!(p.process_one_tick_once().await.unwrap(), WorkFetcher::Done); + let sets = vec!["retry".to_string()]; + let sched = Scheduled::new(redis.clone()); + let future_date = chrono::Utc::now() + chrono::Duration::days(30); + + // We should have one job that needs retrying. + let n_jobs_retried = sched.enqueue_jobs(future_date, &sets).await; + assert!(n_jobs_retried.is_ok()); + let n_jobs_retried = n_jobs_retried.unwrap(); + assert_eq!(n_jobs_retried, 1, "we have one job to retry in the queue"); + + // Let's grab that job. + let job = retry_p.fetch().await; + assert!(job.is_ok()); + let job = job.unwrap(); + assert!(job.is_some()); + let job = job.unwrap(); + + assert_eq!(job.job.class, "AlwaysFailWorker"); + assert_eq!(job.job.retry_queue, Some(retry_queue)); + assert_eq!(job.job.retry_count, Some(1)); + } }