Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for a retry_queue + tests #54

Merged
merged 1 commit into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ pub fn opts() -> EnqueueOpts {
queue: "default".into(),
retry: RetryOpts::Yes,
unique_for: None,
retry_queue: None,
}
}

pub struct EnqueueOpts {
queue: String,
retry: RetryOpts,
unique_for: Option<std::time::Duration>,
retry_queue: Option<String>,
}

impl EnqueueOpts {
Expand Down Expand Up @@ -99,6 +101,14 @@ impl EnqueueOpts {
}
}

#[must_use]
pub fn retry_queue(self, retry_queue: String) -> Self {
Self {
retry_queue: Some(retry_queue),
..self
}
}

pub fn create_job(&self, class: String, args: impl serde::Serialize) -> Result<Job> {
let args = serde_json::to_value(args)?;

Expand All @@ -120,11 +130,13 @@ impl EnqueueOpts {

// Make default eventually...
error_message: None,
error_class: None,
failed_at: None,
retry_count: None,
retried_at: None,

// Meta for enqueueing
retry_queue: self.retry_queue.clone(),
unique_for: self.unique_for,
})
}
Expand Down Expand Up @@ -191,6 +203,7 @@ pub struct WorkerOpts<Args, W: Worker<Args> + ?Sized> {
args: PhantomData<Args>,
worker: PhantomData<W>,
unique_for: Option<std::time::Duration>,
retry_queue: Option<String>,
}

impl<Args, W> WorkerOpts<Args, W>
Expand All @@ -205,6 +218,7 @@ where
args: PhantomData,
worker: PhantomData,
unique_for: None,
retry_queue: None,
}
}

Expand All @@ -219,6 +233,14 @@ where
}
}

#[must_use]
pub fn retry_queue<S: Into<String>>(self, retry_queue: S) -> Self {
Self {
retry_queue: Some(retry_queue.into()),
..self
}
}

#[must_use]
pub fn queue<S: Into<String>>(self, queue: S) -> Self {
Self {
Expand Down Expand Up @@ -268,6 +290,7 @@ impl<Args, W: Worker<Args>> From<&WorkerOpts<Args, W>> for EnqueueOpts {
retry: opts.retry.clone(),
queue: opts.queue.clone(),
unique_for: opts.unique_for,
retry_queue: opts.retry_queue.clone(),
}
}
}
Expand Down Expand Up @@ -511,8 +534,10 @@ pub struct Job {
pub enqueued_at: Option<f64>,
pub failed_at: Option<f64>,
pub error_message: Option<String>,
pub error_class: Option<String>,
pub retry_count: Option<usize>,
pub retried_at: Option<f64>,
pub retry_queue: Option<String>,

#[serde(skip)]
pub unique_for: Option<std::time::Duration>,
Expand Down
8 changes: 8 additions & 0 deletions src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,15 @@ impl ServerMiddleware for RetryMiddleware {
"class" = &job.class,
"jid" = &job.jid,
"queue" = &job.queue,
"retry_queue" = &job.retry_queue,
"err" = &job.error_message
}, "Scheduling job for retry in the future");

// We will now make sure we use the new retry_queue option if set.
if let Some(ref retry_queue) = job.retry_queue {
job.queue = retry_queue.into();
}

UnitOfWork::from_job(job).reenqueue(&redis).await?;
}

Expand Down Expand Up @@ -233,8 +239,10 @@ mod test {
enqueued_at: None,
failed_at: None,
error_message: None,
error_class: None,
retry_count: None,
retried_at: None,
retry_queue: None,
unique_for: None,
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ impl PeriodicJob {

// Make default eventually...
error_message: None,
error_class: None,
failed_at: None,
retry_count: None,
retried_at: None,
retry_queue: None,

// Meta data not used in periodic jobs right now...
unique_for: None,
Expand Down
41 changes: 41 additions & 0 deletions tests/server_middleware_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,45 @@ mod test {

assert_eq!(n_jobs_retried, 0, "no jobs in the retry queue");
}

#[tokio::test]
#[serial]
async fn can_retry_job_into_different_retry_queue() {
let worker = AlwaysFailWorker;
let queue = "failure_zone_max_on_job".to_string();
let retry_queue = "the_retry_queue".to_string();
let (mut p, redis) = new_base_processor(queue.clone()).await;
let (mut retry_p, _retry_redis) = new_base_processor(retry_queue.clone()).await;
p.register(worker.clone());

let mut job = AlwaysFailWorker::opts()
.queue(queue)
.retry(5)
.retry_queue(&retry_queue)
.perform_async(&redis, ())
.await
.expect("enqueues");

assert_eq!(p.process_one_tick_once().await.unwrap(), WorkFetcher::Done);
let sets = vec!["retry".to_string()];
let sched = Scheduled::new(redis.clone());
let future_date = chrono::Utc::now() + chrono::Duration::days(30);

// We should have one job that needs retrying.
let n_jobs_retried = sched.enqueue_jobs(future_date, &sets).await;
assert!(n_jobs_retried.is_ok());
let n_jobs_retried = n_jobs_retried.unwrap();
assert_eq!(n_jobs_retried, 1, "we have one job to retry in the queue");

// Let's grab that job.
let job = retry_p.fetch().await;
assert!(job.is_ok());
let job = job.unwrap();
assert!(job.is_some());
let job = job.unwrap();

assert_eq!(job.job.class, "AlwaysFailWorker");
assert_eq!(job.job.retry_queue, Some(retry_queue));
assert_eq!(job.job.retry_count, Some(1));
}
}
Loading