Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: introduce JobDetail and JobKey for job scheduling #84

Merged
merged 1 commit into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ type Scheduler interface {
IsStarted() bool

// ScheduleJob schedules a job using a specified trigger.
ScheduleJob(ctx context.Context, job Job, trigger Trigger) error
ScheduleJob(ctx context.Context, jobDetail *JobDetail, trigger Trigger) error

// GetJobKeys returns the keys of all of the scheduled jobs.
GetJobKeys() []int
GetJobKeys() []*JobKey

// GetScheduledJob returns the scheduled job with the specified key.
GetScheduledJob(key int) (ScheduledJob, error)
GetScheduledJob(jobKey *JobKey) (ScheduledJob, error)

// DeleteJob removes the job with the specified key from the
// scheduler's execution queue.
DeleteJob(ctx context.Context, key int) error
DeleteJob(ctx context.Context, jobKey *JobKey) error

// Clear removes all of the scheduled jobs.
Clear() error
Expand Down Expand Up @@ -85,9 +85,6 @@ type Job interface {

// Description returns the description of the Job.
Description() string

// Key returns the unique key for the Job.
Key() int
}
```

Expand Down Expand Up @@ -159,9 +156,12 @@ func main() {
functionJob := quartz.NewFunctionJob(func(_ context.Context) (int, error) { return 42, nil })

// register jobs to scheduler
sched.ScheduleJob(ctx, shellJob, cronTrigger)
sched.ScheduleJob(ctx, curlJob, quartz.NewSimpleTrigger(time.Second*7))
sched.ScheduleJob(ctx, functionJob, quartz.NewSimpleTrigger(time.Second*5))
sched.ScheduleJob(ctx, quartz.NewJobDetail(shellJob, quartz.NewJobKey("shellJob")),
cronTrigger)
sched.ScheduleJob(ctx, quartz.NewJobDetail(curlJob, quartz.NewJobKey("curlJob")),
quartz.NewSimpleTrigger(time.Second*7))
sched.ScheduleJob(ctx, quartz.NewJobDetail(functionJob, quartz.NewJobKey("functionJob")),
quartz.NewSimpleTrigger(time.Second*5))

// stop scheduler
sched.Stop()
Expand Down
29 changes: 18 additions & 11 deletions examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,30 @@ func sampleScheduler(ctx context.Context, wg *sync.WaitGroup) {
return
}

cronJob := PrintJob{"Cron job"}
cronJob := quartz.NewJobDetail(&PrintJob{"Cron job"}, quartz.NewJobKey("cronJob"))
sched.Start(ctx)

_ = sched.ScheduleJob(ctx, &PrintJob{"Ad hoc Job"}, quartz.NewRunOnceTrigger(time.Second*5))
_ = sched.ScheduleJob(ctx, &PrintJob{"First job"}, quartz.NewSimpleTrigger(time.Second*12))
_ = sched.ScheduleJob(ctx, &PrintJob{"Second job"}, quartz.NewSimpleTrigger(time.Second*6))
_ = sched.ScheduleJob(ctx, &PrintJob{"Third job"}, quartz.NewSimpleTrigger(time.Second*3))
_ = sched.ScheduleJob(ctx, &cronJob, cronTrigger)
runOnceJobDetail := quartz.NewJobDetail(&PrintJob{"Ad hoc Job"}, quartz.NewJobKey("runOnceJob"))
jobDetail1 := quartz.NewJobDetail(&PrintJob{"First job"}, quartz.NewJobKey("job1"))
jobDetail2 := quartz.NewJobDetail(&PrintJob{"Second job"}, quartz.NewJobKey("job2"))
jobDetail3 := quartz.NewJobDetail(&PrintJob{"Third job"}, quartz.NewJobKey("job3"))
_ = sched.ScheduleJob(ctx, runOnceJobDetail, quartz.NewRunOnceTrigger(time.Second*5))
_ = sched.ScheduleJob(ctx, jobDetail1, quartz.NewSimpleTrigger(time.Second*12))
_ = sched.ScheduleJob(ctx, jobDetail2, quartz.NewSimpleTrigger(time.Second*6))
_ = sched.ScheduleJob(ctx, jobDetail3, quartz.NewSimpleTrigger(time.Second*3))
_ = sched.ScheduleJob(ctx, cronJob, cronTrigger)

time.Sleep(time.Second * 10)

scheduledJob, err := sched.GetScheduledJob(cronJob.Key())
scheduledJob, err := sched.GetScheduledJob(cronJob.JobKey())
if err != nil {
fmt.Println(err)
return
}

fmt.Println(scheduledJob.Trigger().Description())
fmt.Println("Before delete: ", sched.GetJobKeys())
_ = sched.DeleteJob(ctx, cronJob.Key())
_ = sched.DeleteJob(ctx, cronJob.JobKey())
fmt.Println("After delete: ", sched.GetJobKeys())

time.Sleep(time.Second * 2)
Expand Down Expand Up @@ -89,9 +93,12 @@ func sampleJobs(ctx context.Context, wg *sync.WaitGroup) {
curlJob := quartz.NewCurlJob(request)
functionJob := quartz.NewFunctionJobWithDesc("42", func(_ context.Context) (int, error) { return 42, nil })

_ = sched.ScheduleJob(ctx, shellJob, cronTrigger)
_ = sched.ScheduleJob(ctx, curlJob, quartz.NewSimpleTrigger(time.Second*7))
_ = sched.ScheduleJob(ctx, functionJob, quartz.NewSimpleTrigger(time.Second*3))
shellJobDetail := quartz.NewJobDetail(shellJob, quartz.NewJobKey("shellJob"))
curlJobDetail := quartz.NewJobDetail(curlJob, quartz.NewJobKey("curlJob"))
functionJobDetail := quartz.NewJobDetail(functionJob, quartz.NewJobKey("functionJob"))
_ = sched.ScheduleJob(ctx, shellJobDetail, cronTrigger)
_ = sched.ScheduleJob(ctx, curlJobDetail, quartz.NewSimpleTrigger(time.Second*7))
_ = sched.ScheduleJob(ctx, functionJobDetail, quartz.NewSimpleTrigger(time.Second*3))

time.Sleep(time.Second * 10)

Expand Down
10 changes: 4 additions & 6 deletions examples/print_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@ type PrintJob struct {
desc string
}

var _ quartz.Job = (*PrintJob)(nil)

// Description returns the description of the PrintJob.
func (pj *PrintJob) Description() string {
return pj.desc
}

// Key returns the unique PrintJob key.
func (pj *PrintJob) Key() int {
return quartz.HashCode(pj.Description())
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (pj *PrintJob) Execute(_ context.Context) {
func (pj *PrintJob) Execute(_ context.Context) error {
fmt.Println("Executing " + pj.Description())
return nil
}
43 changes: 43 additions & 0 deletions examples/readme/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"context"
"net/http"
"time"

"github.com/reugn/go-quartz/quartz"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create scheduler
sched := quartz.NewStdScheduler()

// async start scheduler
sched.Start(ctx)

// create jobs
cronTrigger, _ := quartz.NewCronTrigger("1/5 * * * * *")
shellJob := quartz.NewShellJob("ls -la")

request, _ := http.NewRequest(http.MethodGet, "https://worldtimeapi.org/api/timezone/utc", nil)
curlJob := quartz.NewCurlJob(request)

functionJob := quartz.NewFunctionJob(func(_ context.Context) (int, error) { return 42, nil })

// register jobs to scheduler
sched.ScheduleJob(ctx, quartz.NewJobDetail(shellJob, quartz.NewJobKey("shellJob")),
cronTrigger)
sched.ScheduleJob(ctx, quartz.NewJobDetail(curlJob, quartz.NewJobKey("curlJob")),
quartz.NewSimpleTrigger(time.Second*7))
sched.ScheduleJob(ctx, quartz.NewJobDetail(functionJob, quartz.NewJobKey("functionJob")),
quartz.NewSimpleTrigger(time.Second*5))

// stop scheduler
sched.Stop()

// wait for all workers to exit
sched.Wait(ctx)
}
10 changes: 4 additions & 6 deletions quartz/function_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type FunctionJob[R any] struct {
jobStatus JobStatus
}

var _ Job = (*FunctionJob[any])(nil)

// NewFunctionJob returns a new FunctionJob without an explicit description.
func NewFunctionJob[R any](function Function[R]) *FunctionJob[R] {
return &FunctionJob[R]{
Expand All @@ -44,14 +46,9 @@ func (f *FunctionJob[R]) Description() string {
return f.desc
}

// Key returns the unique FunctionJob key.
func (f *FunctionJob[R]) Key() int {
return HashCode(fmt.Sprintf("%s:%p", f.desc, f.function))
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
// It invokes the held function, setting the results in Result and Error members.
func (f *FunctionJob[R]) Execute(ctx context.Context) {
func (f *FunctionJob[R]) Execute(ctx context.Context) error {
result, err := (*f.function)(ctx)
f.Lock()
if err != nil {
Expand All @@ -64,6 +61,7 @@ func (f *FunctionJob[R]) Execute(ctx context.Context) {
f.err = nil
}
f.Unlock()
return err
}

// Result returns the result of the FunctionJob.
Expand Down
11 changes: 6 additions & 5 deletions quartz/function_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ func TestFunctionJob(t *testing.T) {

sched := quartz.NewStdScheduler()
sched.Start(ctx)
sched.ScheduleJob(ctx, funcJob1, quartz.NewRunOnceTrigger(time.Millisecond*300))
sched.ScheduleJob(ctx, funcJob2, quartz.NewRunOnceTrigger(time.Millisecond*800))
sched.ScheduleJob(ctx, quartz.NewJobDetail(funcJob1, quartz.NewJobKey("funcJob1")),
quartz.NewRunOnceTrigger(time.Millisecond*300))
sched.ScheduleJob(ctx, quartz.NewJobDetail(funcJob2, quartz.NewJobKey("funcJob2")),
quartz.NewRunOnceTrigger(time.Millisecond*800))
time.Sleep(time.Second)
_ = sched.Clear()
sched.Stop()
Expand All @@ -44,7 +46,7 @@ func TestFunctionJob(t *testing.T) {
assertEqual(t, int(atomic.LoadInt32(&n)), 6)
}

func TestNewFunctionJobWithDescAndKey(t *testing.T) {
func TestNewFunctionJobWithDesc(t *testing.T) {
jobDesc := "test job"

funcJob1 := quartz.NewFunctionJobWithDesc(jobDesc, func(_ context.Context) (string, error) {
Expand All @@ -56,8 +58,7 @@ func TestNewFunctionJobWithDescAndKey(t *testing.T) {
})

assertEqual(t, funcJob1.Description(), jobDesc)
assertEqual(t, funcJob1.Key(), funcJob1.Key())
assertNotEqual(t, funcJob1.Key(), funcJob2.Key())
assertEqual(t, funcJob2.Description(), jobDesc)
}

func TestFunctionJobRespectsContext(t *testing.T) {
Expand Down
37 changes: 15 additions & 22 deletions quartz/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quartz
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -11,22 +12,17 @@ import (
"strings"
"sync"
"sync/atomic"

"github.com/reugn/go-quartz/quartz/logger"
)

// Job represents an interface to be implemented by structs which
// represent a 'job' to be performed.
type Job interface {
// Execute is called by a Scheduler when the Trigger associated
// with this job fires.
Execute(context.Context)
Execute(context.Context) error

// Description returns the description of the Job.
Description() string

// Key returns the unique key for the Job.
Key() int
}

// JobStatus represents a Job status.
Expand Down Expand Up @@ -55,6 +51,8 @@ type ShellJob struct {
callback func(context.Context, *ShellJob)
}

var _ Job = (*ShellJob)(nil)

// NewShellJob returns a new ShellJob for the given command.
func NewShellJob(cmd string) *ShellJob {
return &ShellJob{
Expand All @@ -77,11 +75,6 @@ func (sh *ShellJob) Description() string {
return fmt.Sprintf("ShellJob: %s", sh.cmd)
}

// Key returns the unique ShellJob key.
func (sh *ShellJob) Key() int {
return HashCode(sh.Description())
}

var (
shellOnce = sync.Once{}
shellPath = "bash"
Expand All @@ -99,7 +92,7 @@ func getShell() string {
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (sh *ShellJob) Execute(ctx context.Context) {
func (sh *ShellJob) Execute(ctx context.Context) error {
shell := getShell()

var stdout, stderr bytes.Buffer
Expand All @@ -124,6 +117,7 @@ func (sh *ShellJob) Execute(ctx context.Context) {
if sh.callback != nil {
sh.callback(ctx, sh)
}
return nil
}

// ExitCode returns the exit code of the ShellJob.
Expand Down Expand Up @@ -167,6 +161,8 @@ type CurlJob struct {
callback func(context.Context, *CurlJob)
}

var _ Job = (*CurlJob)(nil)

// HTTPHandler sends an HTTP request and returns an HTTP response,
// following policy (such as redirects, cookies, auth) as configured
// on the implementing HTTP client.
Expand Down Expand Up @@ -204,11 +200,6 @@ func (cu *CurlJob) Description() string {
return fmt.Sprintf("CurlJob:\n%s", cu.description)
}

// Key returns the unique CurlJob key.
func (cu *CurlJob) Key() int {
return HashCode(cu.description)
}

// DumpResponse returns the response of the job in its HTTP/1.x wire
// representation.
// If body is true, DumpResponse also returns the body.
Expand Down Expand Up @@ -241,7 +232,7 @@ func formatRequest(r *http.Request) string {
}

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (cu *CurlJob) Execute(ctx context.Context) {
func (cu *CurlJob) Execute(ctx context.Context) error {
cu.Lock()
cu.request = cu.request.WithContext(ctx)
var err error
Expand All @@ -257,6 +248,7 @@ func (cu *CurlJob) Execute(ctx context.Context) {
if cu.callback != nil {
cu.callback(ctx, cu)
}
return nil
}

type isolatedJob struct {
Expand All @@ -265,15 +257,16 @@ type isolatedJob struct {
isRunning *atomic.Value
}

var _ Job = (*isolatedJob)(nil)

// Execute is called by a Scheduler when the Trigger associated with this job fires.
func (j *isolatedJob) Execute(ctx context.Context) {
func (j *isolatedJob) Execute(ctx context.Context) error {
if wasRunning := j.isRunning.Swap(true); wasRunning != nil && wasRunning.(bool) {
logger.Debugf("Executed job %d is running.", j.Job.Key())
return
return errors.New("job is running")
}
defer j.isRunning.Store(false)

j.Job.Execute(ctx)
return j.Job.Execute(ctx)
}

// NewIsolatedJob wraps a job object and ensures that only one
Expand Down
Loading