Skip to content

Commit

Permalink
fix: ensure atomicity of the fetch and reschedule operation (#121)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Mar 15, 2024
1 parent 50c609c commit 50a769b
Showing 1 changed file with 51 additions and 44 deletions.
95 changes: 51 additions & 44 deletions quartz/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func (sched *StdScheduler) ScheduleJob(
jobDetail *JobDetail,
trigger Trigger,
) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()

if jobDetail == nil {
return illegalArgumentError("jobDetail is nil")
}
Expand Down Expand Up @@ -175,7 +178,7 @@ func (sched *StdScheduler) ScheduleJob(
err = sched.queue.Push(toSchedule)
if err == nil {
logger.Debugf("Successfully added job %s.", jobDetail.jobKey)
if sched.IsStarted() {
if sched.started {
sched.reset()
}
}
Expand All @@ -194,9 +197,6 @@ func (sched *StdScheduler) Start(ctx context.Context) {

ctx, sched.cancel = context.WithCancel(ctx)
go func() { <-ctx.Done(); sched.Stop() }()
// start the feed reader
sched.wg.Add(1)
go sched.startFeedReader(ctx)

// start scheduler execution loop
sched.wg.Add(1)
Expand Down Expand Up @@ -230,6 +230,9 @@ func (sched *StdScheduler) IsStarted() bool {
// For a job key to be returned, the job must satisfy all of the matchers specified.
// Given no matchers, it returns the keys of all scheduled jobs.
func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) []*JobKey {
sched.mtx.Lock()
defer sched.mtx.Unlock()

scheduledJobs := sched.queue.ScheduledJobs(matchers)
keys := make([]*JobKey, 0, len(scheduledJobs))
for _, scheduled := range scheduledJobs {
Expand All @@ -240,6 +243,9 @@ func (sched *StdScheduler) GetJobKeys(matchers ...Matcher[ScheduledJob]) []*JobK

// GetScheduledJob returns the ScheduledJob with the specified key.
func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) {
sched.mtx.Lock()
defer sched.mtx.Unlock()

if jobKey == nil {
return nil, illegalArgumentError("jobKey is nil")
}
Expand All @@ -248,13 +254,16 @@ func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error)

// DeleteJob removes the Job with the specified key if present.
func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
}
_, err := sched.queue.Remove(jobKey)
if err == nil {
logger.Debugf("Successfully deleted job %s.", jobKey)
if sched.IsStarted() {
if sched.started {
sched.reset()
}
}
Expand All @@ -264,6 +273,9 @@ func (sched *StdScheduler) DeleteJob(jobKey *JobKey) error {
// PauseJob suspends the job with the specified key from being
// executed by the scheduler.
func (sched *StdScheduler) PauseJob(jobKey *JobKey) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
}
Expand All @@ -285,7 +297,7 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error {
err = sched.queue.Push(paused)
if err == nil {
logger.Debugf("Successfully paused job %s.", jobKey)
if sched.IsStarted() {
if sched.started {
sched.reset()
}
}
Expand All @@ -295,6 +307,9 @@ func (sched *StdScheduler) PauseJob(jobKey *JobKey) error {

// ResumeJob restarts the suspended job with the specified key.
func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error {
sched.mtx.Lock()
defer sched.mtx.Unlock()

if jobKey == nil {
return illegalArgumentError("jobKey is nil")
}
Expand All @@ -320,7 +335,7 @@ func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error {
err = sched.queue.Push(resumed)
if err == nil {
logger.Debugf("Successfully resumed job %s.", jobKey)
if sched.IsStarted() {
if sched.started {
sched.reset()
}
}
Expand All @@ -330,11 +345,14 @@ func (sched *StdScheduler) ResumeJob(jobKey *JobKey) error {

// Clear removes all of the scheduled jobs.
func (sched *StdScheduler) Clear() error {
sched.mtx.Lock()
defer sched.mtx.Unlock()

// reset the job queue
err := sched.queue.Clear()
if err == nil {
logger.Debug("Successfully cleared job queue.")
if sched.IsStarted() {
if sched.started {
sched.reset()
}
}
Expand Down Expand Up @@ -439,17 +457,7 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) {
}

// fetch a job for processing
scheduled, err := sched.queue.Pop()
if err != nil {
logger.Errorf("Failed to fetch a job from the queue: %s", err)
return
}

// validate the job
valid, nextRunTimeExtractor := sched.validateJob(scheduled)

// try rescheduling the job immediately
sched.rescheduleJob(ctx, scheduled, nextRunTimeExtractor)
scheduled, valid := sched.fetchAndReschedule()

// execute the job
if valid {
Expand Down Expand Up @@ -514,40 +522,39 @@ func (sched *StdScheduler) validateJob(job ScheduledJob) (bool, func() (int64, e
return true, func() (int64, error) { return job.Trigger().NextFireTime(job.NextRunTime()) }
}

func (sched *StdScheduler) rescheduleJob(ctx context.Context, job ScheduledJob,
nextRunTimeExtractor func() (int64, error)) {
func (sched *StdScheduler) fetchAndReschedule() (ScheduledJob, bool) {
sched.mtx.Lock()
defer sched.mtx.Unlock()

// fetch a job for processing
job, err := sched.queue.Pop()
if err != nil {
logger.Errorf("Failed to fetch a job from the queue: %s", err)
return nil, false
}
// validate the job
valid, nextRunTimeExtractor := sched.validateJob(job)
// calculate next run time for the job
nextRunTime, err := nextRunTimeExtractor()
if err != nil {
logger.Infof("Job %s exited the execution loop: %s.", job.JobDetail().jobKey, err)
return
return job, valid
}
select {
case <-ctx.Done():
case sched.feeder <- &scheduledJob{
// reschedule the job
toSchedule := &scheduledJob{
job: job.JobDetail(),
trigger: job.Trigger(),
priority: nextRunTime,
}:
}
}

func (sched *StdScheduler) startFeedReader(ctx context.Context) {
defer sched.wg.Done()
for {
select {
case scheduled := <-sched.feeder:
if err := sched.queue.Push(scheduled); err != nil {
logger.Errorf("Failed to reschedule job %s, err: %s",
scheduled.JobDetail().jobKey, err)
} else {
logger.Tracef("Successfully rescheduled job %s", scheduled.JobDetail().jobKey)
sched.reset()
}
case <-ctx.Done():
logger.Info("Exit the feed reader.")
return
}
if err := sched.queue.Push(toSchedule); err != nil {
logger.Errorf("Failed to reschedule job %s, err: %s",
toSchedule.JobDetail().jobKey, err)
} else {
logger.Tracef("Successfully rescheduled job %s", toSchedule.JobDetail().jobKey)
sched.reset()
}

return job, valid
}

func (sched *StdScheduler) reset() {
Expand Down

0 comments on commit 50a769b

Please sign in to comment.