From f1f22d39c5accd2801424cde895050399d76d5bd Mon Sep 17 00:00:00 2001 From: reugn Date: Fri, 5 Jan 2024 16:07:29 +0200 Subject: [PATCH] feat!: introduce JobDetail and JobKey for job scheduling --- README.md | 20 ++-- examples/main.go | 29 +++-- examples/print_job.go | 10 +- examples/readme/main.go | 43 +++++++ quartz/function_job.go | 10 +- quartz/function_job_test.go | 11 +- quartz/job.go | 37 +++--- quartz/job_detail.go | 66 +++++++++++ quartz/job_key.go | 46 ++++++++ quartz/job_test.go | 1 - quartz/queue.go | 81 ++++++++++---- quartz/scheduler.go | 217 ++++++++++++++++++++---------------- quartz/scheduler_test.go | 67 +++++++---- 13 files changed, 439 insertions(+), 199 deletions(-) create mode 100644 examples/readme/main.go create mode 100644 quartz/job_detail.go create mode 100644 quartz/job_key.go diff --git a/README.md b/README.md index 76dd7b2..e3176ca 100644 --- a/README.md +++ b/README.md @@ -26,17 +26,17 @@ type Scheduler interface { IsStarted() bool // ScheduleJob schedules a job using a specified trigger. - ScheduleJob(ctx context.Context, job Job, trigger Trigger) error + ScheduleJob(ctx context.Context, jobDetail *JobDetail, trigger Trigger) error // GetJobKeys returns the keys of all of the scheduled jobs. - GetJobKeys() []int + GetJobKeys() []*JobKey // GetScheduledJob returns the scheduled job with the specified key. - GetScheduledJob(key int) (ScheduledJob, error) + GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) // DeleteJob removes the job with the specified key from the // scheduler's execution queue. - DeleteJob(ctx context.Context, key int) error + DeleteJob(ctx context.Context, jobKey *JobKey) error // Clear removes all of the scheduled jobs. Clear() error @@ -85,9 +85,6 @@ type Job interface { // Description returns the description of the Job. Description() string - - // Key returns the unique key for the Job. - Key() int } ``` @@ -159,9 +156,12 @@ func main() { functionJob := quartz.NewFunctionJob(func(_ context.Context) (int, error) { return 42, nil }) // register jobs to scheduler - sched.ScheduleJob(ctx, shellJob, cronTrigger) - sched.ScheduleJob(ctx, curlJob, quartz.NewSimpleTrigger(time.Second*7)) - sched.ScheduleJob(ctx, functionJob, quartz.NewSimpleTrigger(time.Second*5)) + sched.ScheduleJob(ctx, quartz.NewJobDetail(shellJob, quartz.NewJobKey("shellJob")), + cronTrigger) + sched.ScheduleJob(ctx, quartz.NewJobDetail(curlJob, quartz.NewJobKey("curlJob")), + quartz.NewSimpleTrigger(time.Second*7)) + sched.ScheduleJob(ctx, quartz.NewJobDetail(functionJob, quartz.NewJobKey("functionJob")), + quartz.NewSimpleTrigger(time.Second*5)) // stop scheduler sched.Stop() diff --git a/examples/main.go b/examples/main.go index 8f6b993..d644cf2 100644 --- a/examples/main.go +++ b/examples/main.go @@ -42,18 +42,22 @@ func sampleScheduler(ctx context.Context, wg *sync.WaitGroup) { return } - cronJob := PrintJob{"Cron job"} + cronJob := quartz.NewJobDetail(&PrintJob{"Cron job"}, quartz.NewJobKey("cronJob")) sched.Start(ctx) - _ = sched.ScheduleJob(ctx, &PrintJob{"Ad hoc Job"}, quartz.NewRunOnceTrigger(time.Second*5)) - _ = sched.ScheduleJob(ctx, &PrintJob{"First job"}, quartz.NewSimpleTrigger(time.Second*12)) - _ = sched.ScheduleJob(ctx, &PrintJob{"Second job"}, quartz.NewSimpleTrigger(time.Second*6)) - _ = sched.ScheduleJob(ctx, &PrintJob{"Third job"}, quartz.NewSimpleTrigger(time.Second*3)) - _ = sched.ScheduleJob(ctx, &cronJob, cronTrigger) + runOnceJobDetail := quartz.NewJobDetail(&PrintJob{"Ad hoc Job"}, quartz.NewJobKey("runOnceJob")) + jobDetail1 := quartz.NewJobDetail(&PrintJob{"First job"}, quartz.NewJobKey("job1")) + jobDetail2 := quartz.NewJobDetail(&PrintJob{"Second job"}, quartz.NewJobKey("job2")) + jobDetail3 := quartz.NewJobDetail(&PrintJob{"Third job"}, quartz.NewJobKey("job3")) + _ = sched.ScheduleJob(ctx, runOnceJobDetail, quartz.NewRunOnceTrigger(time.Second*5)) + _ = sched.ScheduleJob(ctx, jobDetail1, quartz.NewSimpleTrigger(time.Second*12)) + _ = sched.ScheduleJob(ctx, jobDetail2, quartz.NewSimpleTrigger(time.Second*6)) + _ = sched.ScheduleJob(ctx, jobDetail3, quartz.NewSimpleTrigger(time.Second*3)) + _ = sched.ScheduleJob(ctx, cronJob, cronTrigger) time.Sleep(time.Second * 10) - scheduledJob, err := sched.GetScheduledJob(cronJob.Key()) + scheduledJob, err := sched.GetScheduledJob(cronJob.JobKey()) if err != nil { fmt.Println(err) return @@ -61,7 +65,7 @@ func sampleScheduler(ctx context.Context, wg *sync.WaitGroup) { fmt.Println(scheduledJob.Trigger().Description()) fmt.Println("Before delete: ", sched.GetJobKeys()) - _ = sched.DeleteJob(ctx, cronJob.Key()) + _ = sched.DeleteJob(ctx, cronJob.JobKey()) fmt.Println("After delete: ", sched.GetJobKeys()) time.Sleep(time.Second * 2) @@ -89,9 +93,12 @@ func sampleJobs(ctx context.Context, wg *sync.WaitGroup) { curlJob := quartz.NewCurlJob(request) functionJob := quartz.NewFunctionJobWithDesc("42", func(_ context.Context) (int, error) { return 42, nil }) - _ = sched.ScheduleJob(ctx, shellJob, cronTrigger) - _ = sched.ScheduleJob(ctx, curlJob, quartz.NewSimpleTrigger(time.Second*7)) - _ = sched.ScheduleJob(ctx, functionJob, quartz.NewSimpleTrigger(time.Second*3)) + shellJobDetail := quartz.NewJobDetail(shellJob, quartz.NewJobKey("shellJob")) + curlJobDetail := quartz.NewJobDetail(curlJob, quartz.NewJobKey("curlJob")) + functionJobDetail := quartz.NewJobDetail(functionJob, quartz.NewJobKey("functionJob")) + _ = sched.ScheduleJob(ctx, shellJobDetail, cronTrigger) + _ = sched.ScheduleJob(ctx, curlJobDetail, quartz.NewSimpleTrigger(time.Second*7)) + _ = sched.ScheduleJob(ctx, functionJobDetail, quartz.NewSimpleTrigger(time.Second*3)) time.Sleep(time.Second * 10) diff --git a/examples/print_job.go b/examples/print_job.go index 654e0f8..29603f3 100644 --- a/examples/print_job.go +++ b/examples/print_job.go @@ -12,17 +12,15 @@ type PrintJob struct { desc string } +var _ quartz.Job = (*PrintJob)(nil) + // Description returns the description of the PrintJob. func (pj *PrintJob) Description() string { return pj.desc } -// Key returns the unique PrintJob key. -func (pj *PrintJob) Key() int { - return quartz.HashCode(pj.Description()) -} - // Execute is called by a Scheduler when the Trigger associated with this job fires. -func (pj *PrintJob) Execute(_ context.Context) { +func (pj *PrintJob) Execute(_ context.Context) error { fmt.Println("Executing " + pj.Description()) + return nil } diff --git a/examples/readme/main.go b/examples/readme/main.go new file mode 100644 index 0000000..42f26e2 --- /dev/null +++ b/examples/readme/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "context" + "net/http" + "time" + + "github.com/reugn/go-quartz/quartz" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // create scheduler + sched := quartz.NewStdScheduler() + + // async start scheduler + sched.Start(ctx) + + // create jobs + cronTrigger, _ := quartz.NewCronTrigger("1/5 * * * * *") + shellJob := quartz.NewShellJob("ls -la") + + request, _ := http.NewRequest(http.MethodGet, "https://worldtimeapi.org/api/timezone/utc", nil) + curlJob := quartz.NewCurlJob(request) + + functionJob := quartz.NewFunctionJob(func(_ context.Context) (int, error) { return 42, nil }) + + // register jobs to scheduler + sched.ScheduleJob(ctx, quartz.NewJobDetail(shellJob, quartz.NewJobKey("shellJob")), + cronTrigger) + sched.ScheduleJob(ctx, quartz.NewJobDetail(curlJob, quartz.NewJobKey("curlJob")), + quartz.NewSimpleTrigger(time.Second*7)) + sched.ScheduleJob(ctx, quartz.NewJobDetail(functionJob, quartz.NewJobKey("functionJob")), + quartz.NewSimpleTrigger(time.Second*5)) + + // stop scheduler + sched.Stop() + + // wait for all workers to exit + sched.Wait(ctx) +} diff --git a/quartz/function_job.go b/quartz/function_job.go index e5fa06f..aa22e9b 100644 --- a/quartz/function_job.go +++ b/quartz/function_job.go @@ -21,6 +21,8 @@ type FunctionJob[R any] struct { jobStatus JobStatus } +var _ Job = (*FunctionJob[any])(nil) + // NewFunctionJob returns a new FunctionJob without an explicit description. func NewFunctionJob[R any](function Function[R]) *FunctionJob[R] { return &FunctionJob[R]{ @@ -44,14 +46,9 @@ func (f *FunctionJob[R]) Description() string { return f.desc } -// Key returns the unique FunctionJob key. -func (f *FunctionJob[R]) Key() int { - return HashCode(fmt.Sprintf("%s:%p", f.desc, f.function)) -} - // Execute is called by a Scheduler when the Trigger associated with this job fires. // It invokes the held function, setting the results in Result and Error members. -func (f *FunctionJob[R]) Execute(ctx context.Context) { +func (f *FunctionJob[R]) Execute(ctx context.Context) error { result, err := (*f.function)(ctx) f.Lock() if err != nil { @@ -64,6 +61,7 @@ func (f *FunctionJob[R]) Execute(ctx context.Context) { f.err = nil } f.Unlock() + return err } // Result returns the result of the FunctionJob. diff --git a/quartz/function_job_test.go b/quartz/function_job_test.go index 0bd8b04..322fe61 100644 --- a/quartz/function_job_test.go +++ b/quartz/function_job_test.go @@ -27,8 +27,10 @@ func TestFunctionJob(t *testing.T) { sched := quartz.NewStdScheduler() sched.Start(ctx) - sched.ScheduleJob(ctx, funcJob1, quartz.NewRunOnceTrigger(time.Millisecond*300)) - sched.ScheduleJob(ctx, funcJob2, quartz.NewRunOnceTrigger(time.Millisecond*800)) + sched.ScheduleJob(ctx, quartz.NewJobDetail(funcJob1, quartz.NewJobKey("funcJob1")), + quartz.NewRunOnceTrigger(time.Millisecond*300)) + sched.ScheduleJob(ctx, quartz.NewJobDetail(funcJob2, quartz.NewJobKey("funcJob2")), + quartz.NewRunOnceTrigger(time.Millisecond*800)) time.Sleep(time.Second) _ = sched.Clear() sched.Stop() @@ -44,7 +46,7 @@ func TestFunctionJob(t *testing.T) { assertEqual(t, int(atomic.LoadInt32(&n)), 6) } -func TestNewFunctionJobWithDescAndKey(t *testing.T) { +func TestNewFunctionJobWithDesc(t *testing.T) { jobDesc := "test job" funcJob1 := quartz.NewFunctionJobWithDesc(jobDesc, func(_ context.Context) (string, error) { @@ -56,8 +58,7 @@ func TestNewFunctionJobWithDescAndKey(t *testing.T) { }) assertEqual(t, funcJob1.Description(), jobDesc) - assertEqual(t, funcJob1.Key(), funcJob1.Key()) - assertNotEqual(t, funcJob1.Key(), funcJob2.Key()) + assertEqual(t, funcJob2.Description(), jobDesc) } func TestFunctionJobRespectsContext(t *testing.T) { diff --git a/quartz/job.go b/quartz/job.go index d6cce0e..16e28e2 100644 --- a/quartz/job.go +++ b/quartz/job.go @@ -3,6 +3,7 @@ package quartz import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -11,8 +12,6 @@ import ( "strings" "sync" "sync/atomic" - - "github.com/reugn/go-quartz/quartz/logger" ) // Job represents an interface to be implemented by structs which @@ -20,13 +19,10 @@ import ( type Job interface { // Execute is called by a Scheduler when the Trigger associated // with this job fires. - Execute(context.Context) + Execute(context.Context) error // Description returns the description of the Job. Description() string - - // Key returns the unique key for the Job. - Key() int } // JobStatus represents a Job status. @@ -55,6 +51,8 @@ type ShellJob struct { callback func(context.Context, *ShellJob) } +var _ Job = (*ShellJob)(nil) + // NewShellJob returns a new ShellJob for the given command. func NewShellJob(cmd string) *ShellJob { return &ShellJob{ @@ -77,11 +75,6 @@ func (sh *ShellJob) Description() string { return fmt.Sprintf("ShellJob: %s", sh.cmd) } -// Key returns the unique ShellJob key. -func (sh *ShellJob) Key() int { - return HashCode(sh.Description()) -} - var ( shellOnce = sync.Once{} shellPath = "bash" @@ -99,7 +92,7 @@ func getShell() string { } // Execute is called by a Scheduler when the Trigger associated with this job fires. -func (sh *ShellJob) Execute(ctx context.Context) { +func (sh *ShellJob) Execute(ctx context.Context) error { shell := getShell() var stdout, stderr bytes.Buffer @@ -124,6 +117,7 @@ func (sh *ShellJob) Execute(ctx context.Context) { if sh.callback != nil { sh.callback(ctx, sh) } + return nil } // ExitCode returns the exit code of the ShellJob. @@ -167,6 +161,8 @@ type CurlJob struct { callback func(context.Context, *CurlJob) } +var _ Job = (*CurlJob)(nil) + // HTTPHandler sends an HTTP request and returns an HTTP response, // following policy (such as redirects, cookies, auth) as configured // on the implementing HTTP client. @@ -204,11 +200,6 @@ func (cu *CurlJob) Description() string { return fmt.Sprintf("CurlJob:\n%s", cu.description) } -// Key returns the unique CurlJob key. -func (cu *CurlJob) Key() int { - return HashCode(cu.description) -} - // DumpResponse returns the response of the job in its HTTP/1.x wire // representation. // If body is true, DumpResponse also returns the body. @@ -241,7 +232,7 @@ func formatRequest(r *http.Request) string { } // Execute is called by a Scheduler when the Trigger associated with this job fires. -func (cu *CurlJob) Execute(ctx context.Context) { +func (cu *CurlJob) Execute(ctx context.Context) error { cu.Lock() cu.request = cu.request.WithContext(ctx) var err error @@ -257,6 +248,7 @@ func (cu *CurlJob) Execute(ctx context.Context) { if cu.callback != nil { cu.callback(ctx, cu) } + return nil } type isolatedJob struct { @@ -265,15 +257,16 @@ type isolatedJob struct { isRunning *atomic.Value } +var _ Job = (*isolatedJob)(nil) + // Execute is called by a Scheduler when the Trigger associated with this job fires. -func (j *isolatedJob) Execute(ctx context.Context) { +func (j *isolatedJob) Execute(ctx context.Context) error { if wasRunning := j.isRunning.Swap(true); wasRunning != nil && wasRunning.(bool) { - logger.Debugf("Executed job %d is running.", j.Job.Key()) - return + return errors.New("job is running") } defer j.isRunning.Store(false) - j.Job.Execute(ctx) + return j.Job.Execute(ctx) } // NewIsolatedJob wraps a job object and ensures that only one diff --git a/quartz/job_detail.go b/quartz/job_detail.go new file mode 100644 index 0000000..ca0a796 --- /dev/null +++ b/quartz/job_detail.go @@ -0,0 +1,66 @@ +package quartz + +import "time" + +// JobDetailOptions represents additional JobDetail properties. +type JobDetailOptions struct { + // MaxRetries is the maximum number of retries before aborting the + // current job execution. + // Default: 0. + MaxRetries int + + // RetryInterval is the fixed time interval between retry attempts. + // Default: 1 second. + RetryInterval time.Duration + + // Replace specifies whether the job should replace an existing job + // with the same key. + // Default: false. + Replace bool +} + +// NewDefaultJobDetailOptions returns a new instance of JobDetailOptions +// with the default values. +func NewDefaultJobDetailOptions() *JobDetailOptions { + return &JobDetailOptions{ + MaxRetries: 0, + RetryInterval: time.Second, + Replace: false, + } +} + +// JobDetail conveys the detail properties of a given Job instance. +type JobDetail struct { + job Job + jobKey *JobKey + opts *JobDetailOptions +} + +// NewJobDetail creates and returns a new JobDetail. +func NewJobDetail(job Job, jobKey *JobKey) *JobDetail { + return NewJobDetailWithOptions(job, jobKey, NewDefaultJobDetailOptions()) +} + +// NewJobDetailWithOptions creates and returns a new JobDetail configured as specified. +func NewJobDetailWithOptions(job Job, jobKey *JobKey, opts *JobDetailOptions) *JobDetail { + return &JobDetail{ + job: job, + jobKey: jobKey, + opts: opts, + } +} + +// Job returns job. +func (jd *JobDetail) Job() Job { + return jd.job +} + +// JobKey returns jobKey. +func (jd *JobDetail) JobKey() *JobKey { + return jd.jobKey +} + +// Options returns opts. +func (jd *JobDetail) Options() *JobDetailOptions { + return jd.opts +} diff --git a/quartz/job_key.go b/quartz/job_key.go new file mode 100644 index 0000000..4cb0d92 --- /dev/null +++ b/quartz/job_key.go @@ -0,0 +1,46 @@ +package quartz + +import "fmt" + +const ( + DefaultGroup = "default" +) + +// JobKey represents the identifier of a scheduled job. +// Keys are composed of both a name and group, and the name must be unique +// within the group. +// If only a name is specified then the default group name will be used. +type JobKey struct { + name string + group string +} + +// NewJobKey returns a new NewJobKey using the given name. +func NewJobKey(name string) *JobKey { + return &JobKey{ + name: name, + group: DefaultGroup, + } +} + +// NewJobKeyWithGroup returns a new NewJobKey using the given name and group. +func NewJobKeyWithGroup(name, group string) *JobKey { + if group == "" { // use default if empty + group = DefaultGroup + } + return &JobKey{ + name: name, + group: group, + } +} + +// String returns string representation of the JobKey. +func (jobKey *JobKey) String() string { + return fmt.Sprintf("%s::%s", jobKey.group, jobKey.name) +} + +// Equals indicates whether some other JobKey is "equal to" this one. +func (jobKey *JobKey) Equals(that *JobKey) bool { + return jobKey.name == that.name && + jobKey.group == that.group +} diff --git a/quartz/job_test.go b/quartz/job_test.go index b4efd4b..1480108 100644 --- a/quartz/job_test.go +++ b/quartz/job_test.go @@ -285,6 +285,5 @@ func TestCurlJob_WithCallback(t *testing.T) { curlJob := quartz.NewCurlJobWithOptions(request, opts) curlJob.Execute(context.Background()) - assertEqual(t, 466866822, curlJob.Key()) assertEqual(t, quartz.OK, <-resultChan) } diff --git a/quartz/queue.go b/quartz/queue.go index 85d8081..1e81ec5 100644 --- a/quartz/queue.go +++ b/quartz/queue.go @@ -3,12 +3,14 @@ package quartz import ( "container/heap" "errors" + "fmt" + "sync" ) // scheduledJob represents a scheduled job. // It implements the ScheduledJob interface. type scheduledJob struct { - job Job + job *JobDetail trigger Trigger priority int64 // job priority, backed by its next run time. index int // maintained by the heap.Interface methods. @@ -17,7 +19,7 @@ type scheduledJob struct { var _ ScheduledJob = (*scheduledJob)(nil) // Job returns the scheduled job instance. -func (scheduled *scheduledJob) Job() Job { +func (scheduled *scheduledJob) JobDetail() *JobDetail { return scheduled.job } @@ -32,22 +34,26 @@ func (scheduled *scheduledJob) NextRunTime() int64 { } // JobQueue represents the job queue used by the scheduler. -// The default jobQueue implementation uses an in-memory priority queue -// to manage scheduled jobs. -// An alternative implementation can be provided for customization, e.g. -// to support persistent storage. +// The default jobQueue implementation uses an in-memory priority queue that orders +// scheduled jobs by their next execution time, when the job with the closest time +// being removed and returned first. +// An alternative implementation can be provided for customization, e.g. to support +// persistent storage. +// The implementation is required to be thread safe. type JobQueue interface { - // Push inserts a new scheduled job at the end of the queue. + // Push inserts a new scheduled job to the queue. + // This method is also used by the Scheduler to reschedule existing jobs that + // have been dequeued for execution. Push(job ScheduledJob) error - // Pop removes and returns the next scheduled job from the queue. + // Pop removes and returns the next to run scheduled job from the queue. Pop() (ScheduledJob, error) - // Head returns the first scheduled job without removing it. + // Head returns the first scheduled job without removing it from the queue. Head() (ScheduledJob, error) - // Remove removes and returns the scheduled job at index i. - Remove(i int) (ScheduledJob, error) + // Remove removes and returns the scheduled job with the specified key. + Remove(jobKey *JobKey) (ScheduledJob, error) // ScheduledJobs returns the slice of all scheduled jobs in the queue. ScheduledJobs() []ScheduledJob @@ -104,6 +110,7 @@ func (pq *priorityQueue) Pop() interface{} { // jobQueue implements the JobQueue interface by using an in-memory // priority queue as the storage layer. type jobQueue struct { + mtx sync.Mutex delegate priorityQueue } @@ -116,38 +123,68 @@ func newJobQueue() *jobQueue { } } -// Push inserts a new scheduled job at the end of the queue. +// Push inserts a new scheduled job to the queue. +// This method is also used by the Scheduler to reschedule existing jobs that +// have been dequeued for execution. func (jq *jobQueue) Push(job ScheduledJob) error { + jq.mtx.Lock() + defer jq.mtx.Unlock() + scheduledJobs := jq.scheduledJobs() + for i, scheduled := range scheduledJobs { + if scheduled.JobDetail().jobKey.Equals(job.JobDetail().jobKey) { + if job.JobDetail().opts.Replace { + heap.Remove(&jq.delegate, i) + break + } + return fmt.Errorf("job with the key %s already exists", + job.JobDetail().jobKey) + } + } heap.Push(&jq.delegate, job) return nil } // Pop removes and returns the next scheduled job from the queue. func (jq *jobQueue) Pop() (ScheduledJob, error) { - if jq.Size() == 0 { + jq.mtx.Lock() + defer jq.mtx.Unlock() + if len(jq.delegate) == 0 { return nil, errors.New("queue is empty") } return heap.Pop(&jq.delegate).(ScheduledJob), nil } -// Head returns the first scheduled job without removing it. +// Head returns the first scheduled job without removing it from the queue. func (jq *jobQueue) Head() (ScheduledJob, error) { - if jq.Size() == 0 { + jq.mtx.Lock() + defer jq.mtx.Unlock() + if len(jq.delegate) == 0 { return nil, errors.New("queue is empty") } return jq.delegate[0], nil } -// Remove removes and returns the scheduled job at index i. -func (jq *jobQueue) Remove(i int) (ScheduledJob, error) { - if jq.Size() <= i { - return nil, errors.New("index out of range") +// Remove removes and returns the scheduled job with the specified key. +func (jq *jobQueue) Remove(jobKey *JobKey) (ScheduledJob, error) { + jq.mtx.Lock() + defer jq.mtx.Unlock() + scheduledJobs := jq.scheduledJobs() + for i, scheduled := range scheduledJobs { + if scheduled.JobDetail().jobKey.Equals(jobKey) { + return heap.Remove(&jq.delegate, i).(ScheduledJob), nil + } } - return heap.Remove(&jq.delegate, i).(ScheduledJob), nil + return nil, errors.New("no job with the given key found") } // ScheduledJobs returns the slice of all scheduled jobs in the queue. func (jq *jobQueue) ScheduledJobs() []ScheduledJob { + jq.mtx.Lock() + defer jq.mtx.Unlock() + return jq.scheduledJobs() +} + +func (jq *jobQueue) scheduledJobs() []ScheduledJob { scheduledJobs := make([]ScheduledJob, len(jq.delegate)) for i, job := range jq.delegate { scheduledJobs[i] = ScheduledJob(job) @@ -157,11 +194,15 @@ func (jq *jobQueue) ScheduledJobs() []ScheduledJob { // Size returns the size of the job queue. func (jq *jobQueue) Size() int { + jq.mtx.Lock() + defer jq.mtx.Unlock() return len(jq.delegate) } // Clear clears the job queue. func (jq *jobQueue) Clear() error { + jq.mtx.Lock() + defer jq.mtx.Unlock() jq.delegate = priorityQueue{} return nil } diff --git a/quartz/scheduler.go b/quartz/scheduler.go index cc37ce6..575bd25 100644 --- a/quartz/scheduler.go +++ b/quartz/scheduler.go @@ -12,7 +12,7 @@ import ( // ScheduledJob represents a scheduled Job with the Trigger associated // with it and the next run epoch time. type ScheduledJob interface { - Job() Job + JobDetail() *JobDetail Trigger() Trigger NextRunTime() int64 } @@ -30,17 +30,17 @@ type Scheduler interface { IsStarted() bool // ScheduleJob schedules a job using a specified trigger. - ScheduleJob(ctx context.Context, job Job, trigger Trigger) error + ScheduleJob(ctx context.Context, jobDetail *JobDetail, trigger Trigger) error // GetJobKeys returns the keys of all of the scheduled jobs. - GetJobKeys() []int + GetJobKeys() []*JobKey // GetScheduledJob returns the scheduled job with the specified key. - GetScheduledJob(key int) (ScheduledJob, error) + GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) // DeleteJob removes the job with the specified key from the // scheduler's execution queue. - DeleteJob(ctx context.Context, key int) error + DeleteJob(ctx context.Context, jobKey *JobKey) error // Clear removes all of the scheduled jobs. Clear() error @@ -133,24 +133,39 @@ func NewStdSchedulerWithOptions( // ScheduleJob schedules a Job using a specified Trigger. func (sched *StdScheduler) ScheduleJob( ctx context.Context, - job Job, + jobDetail *JobDetail, trigger Trigger, ) error { + if jobDetail == nil { + return errors.New("jobDetail is nil") + } + if jobDetail.jobKey == nil { + return errors.New("jobDetail.jobKey is nil") + } + if jobDetail.jobKey.name == "" { + return errors.New("empty key name is not allowed") + } + if trigger == nil { + return errors.New("trigger is nil") + } nextRunTime, err := trigger.NextFireTime(NowNano()) if err != nil { return err } - select { - case sched.feeder <- &scheduledJob{ - job: job, + toSchedule := &scheduledJob{ + job: jobDetail, trigger: trigger, priority: nextRunTime, - }: - return nil - case <-ctx.Done(): - return ctx.Err() } + err = sched.queue.Push(toSchedule) + if err == nil { + logger.Debugf("Successfully added job %s.", jobDetail.jobKey) + if sched.IsStarted() { + sched.reset(ctx) + } + } + return err } // Start starts the StdScheduler execution loop. @@ -198,55 +213,46 @@ func (sched *StdScheduler) IsStarted() bool { } // GetJobKeys returns the keys of all of the scheduled jobs. -func (sched *StdScheduler) GetJobKeys() []int { - sched.mtx.Lock() - defer sched.mtx.Unlock() - - keys := make([]int, 0, sched.queue.Size()) - for _, scheduled := range sched.queue.ScheduledJobs() { - keys = append(keys, scheduled.Job().Key()) +func (sched *StdScheduler) GetJobKeys() []*JobKey { + scheduledJobs := sched.queue.ScheduledJobs() + keys := make([]*JobKey, 0, len(scheduledJobs)) + for _, scheduled := range scheduledJobs { + keys = append(keys, scheduled.JobDetail().jobKey) } - return keys } // GetScheduledJob returns the ScheduledJob with the specified key. -func (sched *StdScheduler) GetScheduledJob(key int) (ScheduledJob, error) { - sched.mtx.Lock() - defer sched.mtx.Unlock() - - for _, scheduled := range sched.queue.ScheduledJobs() { - if scheduled.Job().Key() == key { +func (sched *StdScheduler) GetScheduledJob(jobKey *JobKey) (ScheduledJob, error) { + if jobKey == nil { + return nil, errors.New("jobKey is nil") + } + scheduledJobs := sched.queue.ScheduledJobs() + for _, scheduled := range scheduledJobs { + if scheduled.JobDetail().jobKey.Equals(jobKey) { return scheduled, nil } } - - return nil, errors.New("no Job with the given Key found") + return nil, errors.New("no job with the given key found") } // DeleteJob removes the Job with the specified key if present. -func (sched *StdScheduler) DeleteJob(ctx context.Context, key int) error { - sched.mtx.Lock() - defer sched.mtx.Unlock() - - for i, scheduled := range sched.queue.ScheduledJobs() { - if scheduled.Job().Key() == key { - _, err := sched.queue.Remove(i) - if err == nil { - sched.reset(ctx) - } - return err +func (sched *StdScheduler) DeleteJob(ctx context.Context, jobKey *JobKey) error { + if jobKey == nil { + return errors.New("jobKey is nil") + } + _, err := sched.queue.Remove(jobKey) + if err == nil { + logger.Debugf("Successfully deleted job %s.", jobKey) + if sched.IsStarted() { + sched.reset(ctx) } } - - return errors.New("no Job with the given Key found") + return err } // Clear removes all of the scheduled jobs. func (sched *StdScheduler) Clear() error { - sched.mtx.Lock() - defer sched.mtx.Unlock() - // reset the job queue return sched.queue.Clear() } @@ -269,25 +275,28 @@ func (sched *StdScheduler) Stop() { func (sched *StdScheduler) startExecutionLoop(ctx context.Context) { defer sched.wg.Done() for { - if sched.queueLen() == 0 { + if sched.queue.Size() == 0 { select { case <-sched.interrupt: + logger.Trace("Interrupted in empty queue.") case <-ctx.Done(): logger.Info("Exit the empty execution loop.") return } } else { - t := time.NewTimer(sched.calculateNextTick()) + timer := time.NewTimer(sched.calculateNextTick()) select { - case <-t.C: + case <-timer.C: + logger.Trace("Tick.") sched.executeAndReschedule(ctx) case <-sched.interrupt: - t.Stop() + logger.Trace("Interrupted waiting for next tick.") + timer.Stop() case <-ctx.Done(): logger.Info("Exit the execution loop.") - t.Stop() + timer.Stop() return } } @@ -306,7 +315,7 @@ func (sched *StdScheduler) startWorkers(ctx context.Context) { case <-ctx.Done(): return case scheduled := <-sched.dispatch: - scheduled.Job().Execute(ctx) + executeWithRetries(ctx, scheduled.JobDetail()) } } }() @@ -314,23 +323,23 @@ func (sched *StdScheduler) startWorkers(ctx context.Context) { } } -func (sched *StdScheduler) queueLen() int { - sched.mtx.Lock() - defer sched.mtx.Unlock() - - return sched.queue.Size() -} - func (sched *StdScheduler) calculateNextTick() time.Duration { - sched.mtx.Lock() - defer sched.mtx.Unlock() - if sched.queue.Size() > 0 { scheduledJob, err := sched.queue.Head() if err != nil { - logger.Warnf("Failed to calculate next tick: %s", err) + logger.Warnf("Failed to calculate next tick for %s, err: %s", + scheduledJob.JobDetail().jobKey, err) } else { - return time.Duration(parkTime(scheduledJob.NextRunTime())) + var nextTick int64 + nextRunTime := scheduledJob.NextRunTime() + now := NowNano() + if nextRunTime > now { + nextTick = nextRunTime - now + } + nextTickDuration := time.Duration(nextTick) + logger.Debugf("Next tick for %s in %s.", scheduledJob.JobDetail().jobKey, + nextTickDuration) + return nextTickDuration } } return sched.opts.OutdatedThreshold @@ -338,33 +347,33 @@ func (sched *StdScheduler) calculateNextTick() time.Duration { func (sched *StdScheduler) executeAndReschedule(ctx context.Context) { // return if the job queue is empty - if sched.queueLen() == 0 { + if sched.queue.Size() == 0 { logger.Debug("Job queue is empty.") return } // fetch a job for processing - sched.mtx.Lock() scheduled, err := sched.queue.Pop() if err != nil { logger.Errorf("Failed to fetch a job from the queue: %s", err) - sched.mtx.Unlock() return } - sched.mtx.Unlock() + // try rescheduling the job immediately + sched.rescheduleJob(ctx, scheduled) + now := NowNano() // check if the job is due to be processed - if scheduled.NextRunTime() > NowNano() { - logger.Tracef("Job %d is not due to run yet.", scheduled.Job().Key()) + if scheduled.NextRunTime() > now { + logger.Tracef("Job %s is not due to run yet.", scheduled.JobDetail().jobKey) return } // execute the job - if sched.jobIsUpToDate(scheduled) { - logger.Debugf("Job %d is about to be executed.", scheduled.Job().Key()) + if sched.jobIsUpToDate(scheduled, now) { + logger.Debugf("Job %s is about to be executed.", scheduled.JobDetail().jobKey) switch { case sched.opts.BlockingExecution: - scheduled.Job().Execute(ctx) + executeWithRetries(ctx, scheduled.JobDetail()) case sched.opts.WorkerLimit > 0: select { case sched.dispatch <- scheduled: @@ -375,37 +384,58 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) { sched.wg.Add(1) go func() { defer sched.wg.Done() - scheduled.Job().Execute(ctx) + executeWithRetries(ctx, scheduled.JobDetail()) }() } } else { - logger.Debugf("Job %d skipped as outdated %d.", scheduled.Job().Key(), - scheduled.NextRunTime()) + logger.Debugf("Job %s skipped as outdated %s.", scheduled.JobDetail().jobKey, + time.Duration(now-scheduled.NextRunTime())) } +} - // reschedule the job - sched.rescheduleJob(ctx, scheduled) +func executeWithRetries(ctx context.Context, jobDetail *JobDetail) { + err := jobDetail.job.Execute(ctx) + if err == nil { + return + } + for i := 0; i < jobDetail.opts.MaxRetries; i++ { + timer := time.NewTimer(jobDetail.opts.RetryInterval) + select { + case <-timer.C: + case <-ctx.Done(): + timer.Stop() + break + } + logger.Tracef("Job %s retry %d", jobDetail.jobKey, i) + err = jobDetail.job.Execute(ctx) + if err == nil { + break + } + } + if err != nil { + logger.Warnf("Job %s terminated with error: %s", jobDetail.jobKey, err) + } } func (sched *StdScheduler) rescheduleJob(ctx context.Context, job ScheduledJob) { nextRunTime, err := job.Trigger().NextFireTime(job.NextRunTime()) if err != nil { - logger.Infof("Job %d got out the execution loop: %s.", job.Job().Key(), err) + logger.Infof("Job %s exited the execution loop: %s.", job.JobDetail().jobKey, err) return } select { case <-ctx.Done(): case sched.feeder <- &scheduledJob{ - job: job.Job(), + job: job.JobDetail(), trigger: job.Trigger(), priority: nextRunTime, }: } } -func (sched *StdScheduler) jobIsUpToDate(job ScheduledJob) bool { - return job.NextRunTime() > NowNano()-sched.opts.OutdatedThreshold.Nanoseconds() +func (sched *StdScheduler) jobIsUpToDate(job ScheduledJob, now int64) bool { + return job.NextRunTime() > now-sched.opts.OutdatedThreshold.Nanoseconds() } func (sched *StdScheduler) startFeedReader(ctx context.Context) { @@ -413,16 +443,13 @@ func (sched *StdScheduler) startFeedReader(ctx context.Context) { for { select { case scheduled := <-sched.feeder: - func() { - sched.mtx.Lock() - defer sched.mtx.Unlock() - - if err := sched.queue.Push(scheduled); err != nil { - logger.Errorf("Failed to schedule job %d.", scheduled.Job().Key()) - } else { - sched.reset(ctx) - } - }() + if err := sched.queue.Push(scheduled); err != nil { + logger.Errorf("Failed to reschedule job %s, err: %s", + scheduled.JobDetail().jobKey, err) + } else { + logger.Tracef("Successfully rescheduled job %s", scheduled.JobDetail().jobKey) + sched.reset(ctx) + } case <-ctx.Done(): logger.Info("Exit the feed reader.") return @@ -437,11 +464,3 @@ func (sched *StdScheduler) reset(ctx context.Context) { default: } } - -func parkTime(ts int64) int64 { - now := NowNano() - if ts > now { - return ts - now - } - return 0 -} diff --git a/quartz/scheduler_test.go b/quartz/scheduler_test.go index 7072fe6..421137a 100644 --- a/quartz/scheduler_test.go +++ b/quartz/scheduler_test.go @@ -2,6 +2,7 @@ package quartz_test import ( "context" + "fmt" "net/http" "runtime" "sync/atomic" @@ -16,44 +17,61 @@ func TestScheduler(t *testing.T) { defer cancel() sched := quartz.NewStdScheduler() - var jobKeys [4]int + var jobKeys [4]*quartz.JobKey shellJob := quartz.NewShellJob("ls -la") shellJob.Description() - jobKeys[0] = shellJob.Key() + jobKeys[0] = quartz.NewJobKey("shellJob") request, err := http.NewRequest(http.MethodGet, "https://worldtimeapi.org/api/timezone/utc", nil) assertEqual(t, err, nil) - curlJob := quartz.NewCurlJob(request) + + handlerOk := struct{ httpHandlerMock }{} + handlerOk.doFunc = func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: 200, + Request: request, + }, nil + } + curlJob := quartz.NewCurlJobWithOptions(request, quartz.CurlJobOptions{HTTPClient: handlerOk}) curlJob.Description() - jobKeys[1] = curlJob.Key() + jobKeys[1] = quartz.NewJobKey("curlJob") errShellJob := quartz.NewShellJob("ls -z") - jobKeys[2] = errShellJob.Key() + jobKeys[2] = quartz.NewJobKey("errShellJob") request, err = http.NewRequest(http.MethodGet, "http://", nil) assertEqual(t, err, nil) errCurlJob := quartz.NewCurlJob(request) - jobKeys[3] = errCurlJob.Key() + jobKeys[3] = quartz.NewJobKey("errCurlJob") sched.Start(ctx) assertEqual(t, sched.IsStarted(), true) - sched.ScheduleJob(ctx, shellJob, quartz.NewSimpleTrigger(time.Millisecond*800)) - sched.ScheduleJob(ctx, curlJob, quartz.NewRunOnceTrigger(time.Millisecond)) - sched.ScheduleJob(ctx, errShellJob, quartz.NewRunOnceTrigger(time.Millisecond)) - sched.ScheduleJob(ctx, errCurlJob, quartz.NewSimpleTrigger(time.Millisecond*800)) + + err = sched.ScheduleJob(ctx, quartz.NewJobDetail(shellJob, jobKeys[0]), + quartz.NewSimpleTrigger(time.Millisecond*700)) + assertEqual(t, err, nil) + err = sched.ScheduleJob(ctx, quartz.NewJobDetail(curlJob, jobKeys[1]), + quartz.NewRunOnceTrigger(time.Millisecond)) + assertEqual(t, err, nil) + err = sched.ScheduleJob(ctx, quartz.NewJobDetail(errShellJob, jobKeys[2]), + quartz.NewRunOnceTrigger(time.Millisecond)) + assertEqual(t, err, nil) + err = sched.ScheduleJob(ctx, quartz.NewJobDetail(errCurlJob, jobKeys[3]), + quartz.NewSimpleTrigger(time.Millisecond*800)) + assertEqual(t, err, nil) time.Sleep(time.Second) scheduledJobKeys := sched.GetJobKeys() - assertEqual(t, scheduledJobKeys, []int{3668896347, 2787962474}) + assertEqual(t, scheduledJobKeys, []*quartz.JobKey{jobKeys[0], jobKeys[3]}) _, err = sched.GetScheduledJob(jobKeys[0]) assertEqual(t, err, nil) - err = sched.DeleteJob(ctx, shellJob.Key()) + err = sched.DeleteJob(ctx, jobKeys[0]) // shellJob key assertEqual(t, err, nil) - nonExistentJobKey := 1111 + nonExistentJobKey := quartz.NewJobKey("NA") _, err = sched.GetScheduledJob(nonExistentJobKey) assertNotEqual(t, err, nil) @@ -62,7 +80,7 @@ func TestScheduler(t *testing.T) { scheduledJobKeys = sched.GetJobKeys() assertEqual(t, len(scheduledJobKeys), 1) - assertEqual(t, scheduledJobKeys, []int{2787962474}) + assertEqual(t, scheduledJobKeys, []*quartz.JobKey{jobKeys[3]}) _ = sched.Clear() assertEqual(t, len(sched.GetJobKeys()), 0) @@ -100,7 +118,7 @@ func TestSchedulerBlockingSemantics(t *testing.T) { sched.Start(ctx) var n int64 - sched.ScheduleJob(ctx, + timerJob := quartz.NewJobDetail( quartz.NewFunctionJob(func(ctx context.Context) (bool, error) { atomic.AddInt64(&n, 1) timer := time.NewTimer(time.Hour) @@ -112,8 +130,16 @@ func TestSchedulerBlockingSemantics(t *testing.T) { return true, nil } }), - quartz.NewSimpleTrigger(20*time.Millisecond)) - + quartz.NewJobKey("timerJob"), + ) + err := sched.ScheduleJob( + ctx, + timerJob, + quartz.NewSimpleTrigger(20*time.Millisecond), + ) + if err != nil { + t.Fatalf("Failed to schedule job, err: %s", err) + } ticker := time.NewTicker(100 * time.Millisecond) <-ticker.C if atomic.LoadInt64(&n) == 0 { @@ -211,8 +237,11 @@ func TestSchedulerCancel(t *testing.T) { } for i := 0; i < 100; i++ { - if err := sched.ScheduleJob(ctx, - quartz.NewFunctionJob(hourJob), + functionJob := quartz.NewJobDetail(quartz.NewFunctionJob(hourJob), + quartz.NewJobKey(fmt.Sprintf("functionJob_%d", i))) + if err := sched.ScheduleJob( + ctx, + functionJob, quartz.NewSimpleTrigger(100*time.Millisecond), ); err != nil { t.Errorf("could not add job %d, %s", i, err.Error())