diff --git a/quartz/scheduler.go b/quartz/scheduler.go index 6f91bb7..02a1756 100644 --- a/quartz/scheduler.go +++ b/quartz/scheduler.go @@ -147,6 +147,9 @@ func (sched *StdScheduler) ScheduleJob( jobDetail *JobDetail, trigger Trigger, ) error { + sched.mtx.Lock() + defer sched.mtx.Unlock() + if jobDetail == nil { return illegalArgumentError("jobDetail is nil") } @@ -175,7 +178,7 @@ func (sched *StdScheduler) ScheduleJob( err = sched.queue.Push(toSchedule) if err == nil { logger.Debugf("Successfully added job %s.", jobDetail.jobKey) - if sched.IsStarted() { + if sched.started { sched.reset() } } @@ -194,9 +197,6 @@ func (sched *StdScheduler) Start(ctx context.Context) { ctx, sched.cancel = context.WithCancel(ctx) go func() { <-ctx.Done(); sched.Stop() }() - // start the feed reader - sched.wg.Add(1) - go sched.startFeedReader(ctx) // start scheduler execution loop sched.wg.Add(1) @@ -230,6 +230,9 @@ func (sched *StdScheduler) IsStarted() bool { // For a job key to be returned, the job must satisfy all of the matchers specified. // Given no matchers, it returns the keys of all scheduled jobs. func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) []*JobKey { + sched.mtx.Lock() + defer sched.mtx.Unlock() + scheduledJobs := sched.queue.ScheduledJobs(matchers) keys := make([]*JobKey, 0, len(scheduledJobs)) for _, scheduled := range scheduledJobs { @@ -240,6 +243,9 @@ func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) []*JobK // GetScheduledJob returns the ScheduledJob with the specified key. func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) { + sched.mtx.Lock() + defer sched.mtx.Unlock() + if jobKey == nil { return nil, illegalArgumentError("jobKey is nil") } @@ -248,13 +254,16 @@ func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) // DeleteJob removes the Job with the specified key if present. func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error { + sched.mtx.Lock() + defer sched.mtx.Unlock() + if jobKey == nil { return illegalArgumentError("jobKey is nil") } _, err := sched.queue.Remove(jobKey) if err == nil { logger.Debugf("Successfully deleted job %s.", jobKey) - if sched.IsStarted() { + if sched.started { sched.reset() } } @@ -264,6 +273,9 @@ func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error { // PauseJob suspends the job with the specified key from being // executed by the scheduler. func (sched *StdScheduler) PauseJob(jobKey *JobKey) error { + sched.mtx.Lock() + defer sched.mtx.Unlock() + if jobKey == nil { return illegalArgumentError("jobKey is nil") } @@ -285,7 +297,7 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error { err = sched.queue.Push(paused) if err == nil { logger.Debugf("Successfully paused job %s.", jobKey) - if sched.IsStarted() { + if sched.started { sched.reset() } } @@ -295,6 +307,9 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error { // ResumeJob restarts the suspended job with the specified key. func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error { + sched.mtx.Lock() + defer sched.mtx.Unlock() + if jobKey == nil { return illegalArgumentError("jobKey is nil") } @@ -320,7 +335,7 @@ func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error { err = sched.queue.Push(resumed) if err == nil { logger.Debugf("Successfully resumed job %s.", jobKey) - if sched.IsStarted() { + if sched.started { sched.reset() } } @@ -330,11 +345,14 @@ func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error { // Clear removes all of the scheduled jobs. func (sched *StdScheduler) Clear() error { + sched.mtx.Lock() + defer sched.mtx.Unlock() + // reset the job queue err := sched.queue.Clear() if err == nil { logger.Debug("Successfully cleared job queue.") - if sched.IsStarted() { + if sched.started { sched.reset() } } @@ -439,17 +457,7 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) { } // fetch a job for processing - scheduled, err := sched.queue.Pop() - if err != nil { - logger.Errorf("Failed to fetch a job from the queue: %s", err) - return - } - - // validate the job - valid, nextRunTimeExtractor := sched.validateJob(scheduled) - - // try rescheduling the job immediately - sched.rescheduleJob(ctx, scheduled, nextRunTimeExtractor) + scheduled, valid := sched.fetchAndReschedule() // execute the job if valid { @@ -514,40 +522,39 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e return true, func() (int64, error) { return job.Trigger().NextFireTime(job.NextRunTime()) } } -func (sched *StdScheduler) rescheduleJob(ctx context.Context, job ScheduledJob, - nextRunTimeExtractor func() (int64, error)) { +func (sched *StdScheduler) fetchAndReschedule() (ScheduledJob, bool) { + sched.mtx.Lock() + defer sched.mtx.Unlock() + + // fetch a job for processing + job, err := sched.queue.Pop() + if err != nil { + logger.Errorf("Failed to fetch a job from the queue: %s", err) + return nil, false + } + // validate the job + valid, nextRunTimeExtractor := sched.validateJob(job) + // calculate next run time for the job nextRunTime, err := nextRunTimeExtractor() if err != nil { logger.Infof("Job %s exited the execution loop: %s.", job.JobDetail().jobKey, err) - return + return job, valid } - select { - case <-ctx.Done(): - case sched.feeder <- &scheduledJob{ + // reschedule the job + toSchedule := &scheduledJob{ job: job.JobDetail(), trigger: job.Trigger(), priority: nextRunTime, - }: } -} - -func (sched *StdScheduler) startFeedReader(ctx context.Context) { - defer sched.wg.Done() - for { - select { - case scheduled := <-sched.feeder: - if err := sched.queue.Push(scheduled); err != nil { - logger.Errorf("Failed to reschedule job %s, err: %s", - scheduled.JobDetail().jobKey, err) - } else { - logger.Tracef("Successfully rescheduled job %s", scheduled.JobDetail().jobKey) - sched.reset() - } - case <-ctx.Done(): - logger.Info("Exit the feed reader.") - return - } + if err := sched.queue.Push(toSchedule); err != nil { + logger.Errorf("Failed to reschedule job %s, err: %s", + toSchedule.JobDetail().jobKey, err) + } else { + logger.Tracef("Successfully rescheduled job %s", toSchedule.JobDetail().jobKey) + sched.reset() } + + return job, valid } func (sched *StdScheduler) reset() {