Skip to content

Commit

Permalink
feat: introduce matchers to filter jobs (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Feb 9, 2024
1 parent b6a0644 commit c5ad3de
Show file tree
Hide file tree
Showing 12 changed files with 358 additions and 16 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ type Scheduler interface {
// ScheduleJob schedules a job using a specified trigger.
ScheduleJob(jobDetail *JobDetail, trigger Trigger) error

// GetJobKeys returns the keys of all of the scheduled jobs.
GetJobKeys() []*JobKey
// GetJobKeys returns the keys of scheduled jobs.
// For a job key to be returned, the job must satisfy all of the
// matchers specified.
// Given no matchers, it returns the keys of all scheduled jobs.
GetJobKeys(...Matcher[ScheduledJob]) []*JobKey

// GetScheduledJob returns the scheduled job with the specified key.
GetScheduledJob(jobKey *JobKey) (ScheduledJob, error)
Expand Down
16 changes: 13 additions & 3 deletions examples/queue/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func main() {

<-ctx.Done()

scheduledJobs := jobQueue.ScheduledJobs()
scheduledJobs := jobQueue.ScheduledJobs(nil)
jobNames := make([]string, 0, len(scheduledJobs))
for _, job := range scheduledJobs {
jobNames = append(jobNames, job.JobDetail().JobKey().String())
Expand Down Expand Up @@ -289,7 +289,7 @@ func (jq *jobQueue) Remove(jobKey *quartz.JobKey) (quartz.ScheduledJob, error) {
}

// ScheduledJobs returns the slice of all scheduled jobs in the queue.
func (jq *jobQueue) ScheduledJobs() []quartz.ScheduledJob {
func (jq *jobQueue) ScheduledJobs(matchers []quartz.Matcher[quartz.ScheduledJob]) []quartz.ScheduledJob {
jq.mtx.Lock()
defer jq.mtx.Unlock()
logger.Trace("ScheduledJobs")
Expand All @@ -303,7 +303,7 @@ func (jq *jobQueue) ScheduledJobs() []quartz.ScheduledJob {
data, err := os.ReadFile(fmt.Sprintf("%s/%s", dataFolder, file.Name()))
if err == nil {
job, err := unmarshal(data)
if err == nil {
if err == nil && isMatch(job, matchers) {
jobs = append(jobs, job)
}
}
Expand All @@ -312,6 +312,16 @@ func (jq *jobQueue) ScheduledJobs() []quartz.ScheduledJob {
return jobs
}

func isMatch(job quartz.ScheduledJob, matchers []quartz.Matcher[quartz.ScheduledJob]) bool {
for _, matcher := range matchers {
// require all matchers to match the job
if !matcher.IsMatch(job) {
return false
}
}
return true
}

// Size returns the size of the job queue.
func (jq *jobQueue) Size() int {
jq.mtx.Lock()
Expand Down
54 changes: 54 additions & 0 deletions matcher/job_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//nolint:dupl
package matcher

import (
"github.com/reugn/go-quartz/quartz"
)

// JobGroup implements the quartz.Matcher interface with the type argument
// quartz.ScheduledJob, matching jobs by their group name.
// It has public fields to allow predicate pushdown in custom quartz.JobQueue
// implementations.
type JobGroup struct {
Operator *StringOperator // uses a pointer to compare with standard operators
Pattern string
}

var _ quartz.Matcher[quartz.ScheduledJob] = (*JobGroup)(nil)

// NewJobGroup returns a new JobGroup matcher given the string operator and pattern.
func NewJobGroup(operator *StringOperator, pattern string) quartz.Matcher[quartz.ScheduledJob] {
return &JobGroup{
Operator: operator,
Pattern: pattern,
}
}

// JobGroupEquals returns a new JobGroup, matching jobs whose group name is identical
// to the given string pattern.
func JobGroupEquals(pattern string) quartz.Matcher[quartz.ScheduledJob] {
return NewJobGroup(&StringEquals, pattern)
}

// JobGroupStartsWith returns a new JobGroup, matching jobs whose group name starts
// with the given string pattern.
func JobGroupStartsWith(pattern string) quartz.Matcher[quartz.ScheduledJob] {
return NewJobGroup(&StringStartsWith, pattern)
}

// JobGroupEndsWith returns a new JobGroup, matching jobs whose group name ends
// with the given string pattern.
func JobGroupEndsWith(pattern string) quartz.Matcher[quartz.ScheduledJob] {
return NewJobGroup(&StringEndsWith, pattern)
}

// JobGroupContains returns a new JobGroup, matching jobs whose group name contains
// the given string pattern.
func JobGroupContains(pattern string) quartz.Matcher[quartz.ScheduledJob] {
return NewJobGroup(&StringContains, pattern)
}

// IsMatch evaluates JobGroup matcher on the given job.
func (g *JobGroup) IsMatch(job quartz.ScheduledJob) bool {
return (*g.Operator)(job.JobDetail().JobKey().Group(), g.Pattern)
}
114 changes: 114 additions & 0 deletions matcher/job_matcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package matcher_test

import (
"context"
"testing"

"github.com/reugn/go-quartz/internal/assert"
"github.com/reugn/go-quartz/job"
"github.com/reugn/go-quartz/matcher"
"github.com/reugn/go-quartz/quartz"
)

func TestMatcher_JobAll(t *testing.T) {
sched := quartz.NewStdScheduler()

dummy := job.NewFunctionJob(func(_ context.Context) (bool, error) {
return true, nil
})
cron, err := quartz.NewCronTrigger("@daily")
assert.IsNil(t, err)

jobKeys := []*quartz.JobKey{
quartz.NewJobKey("job_monitor"),
quartz.NewJobKey("job_update"),
quartz.NewJobKeyWithGroup("job_monitor", "group_monitor"),
quartz.NewJobKeyWithGroup("job_update", "group_update"),
}

for _, jobKey := range jobKeys {
err := sched.ScheduleJob(quartz.NewJobDetail(dummy, jobKey), cron)
assert.IsNil(t, err)
}
sched.Start(context.Background())

assert.Equal(t, len(sched.GetJobKeys(matcher.JobActive())), 4)
assert.Equal(t, len(sched.GetJobKeys(matcher.JobPaused())), 0)

assert.Equal(t, len(sched.GetJobKeys(matcher.JobGroupEquals(quartz.DefaultGroup))), 2)
assert.Equal(t, len(sched.GetJobKeys(matcher.JobGroupContains("_"))), 2)
assert.Equal(t, len(sched.GetJobKeys(matcher.JobGroupStartsWith("group_"))), 2)
assert.Equal(t, len(sched.GetJobKeys(matcher.JobGroupEndsWith("_update"))), 1)

assert.Equal(t, len(sched.GetJobKeys(matcher.JobNameEquals("job_monitor"))), 2)
assert.Equal(t, len(sched.GetJobKeys(matcher.JobNameContains("_"))), 4)
assert.Equal(t, len(sched.GetJobKeys(matcher.JobNameStartsWith("job_"))), 4)
assert.Equal(t, len(sched.GetJobKeys(matcher.JobNameEndsWith("_update"))), 2)

// multiple matchers
assert.Equal(t, len(sched.GetJobKeys(
matcher.JobNameEquals("job_monitor"),
matcher.JobGroupEquals(quartz.DefaultGroup),
matcher.JobActive(),
)), 1)

assert.Equal(t, len(sched.GetJobKeys(
matcher.JobNameEquals("job_monitor"),
matcher.JobGroupEquals(quartz.DefaultGroup),
matcher.JobPaused(),
)), 0)

// no matchers
assert.Equal(t, len(sched.GetJobKeys()), 4)

err = sched.PauseJob(quartz.NewJobKey("job_monitor"))
assert.IsNil(t, err)

assert.Equal(t, len(sched.GetJobKeys(matcher.JobActive())), 3)
assert.Equal(t, len(sched.GetJobKeys(matcher.JobPaused())), 1)

sched.Stop()
}

func TestMatcher_JobSwitchType(t *testing.T) {
tests := []struct {
name string
m quartz.Matcher[quartz.ScheduledJob]
}{
{
name: "job-active",
m: matcher.JobActive(),
},
{
name: "job-group-equals",
m: matcher.JobGroupEquals("group1"),
},
{
name: "job-name-contains",
m: matcher.JobNameContains("name"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
switch jm := tt.m.(type) {
case *matcher.JobStatus:
assert.Equal(t, jm.Suspended, false)
case *matcher.JobGroup:
if jm.Operator != &matcher.StringEquals {
t.Fatal("JobGroup unexpected operator")
}
case *matcher.JobName:
if jm.Operator != &matcher.StringContains {
t.Fatal("JobName unexpected operator")
}
default:
t.Fatal("Unexpected matcher type")
}
})
}
}

func TestMatcher_CustomStringOperator(t *testing.T) {
var op matcher.StringOperator = func(_, _ string) bool { return true }
assert.NotEqual(t, matcher.NewJobGroup(&op, "group1"), nil)
}
54 changes: 54 additions & 0 deletions matcher/job_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//nolint:dupl
package matcher

import (
"github.com/reugn/go-quartz/quartz"
)

// JobName implements the quartz.Matcher interface with the type argument
// quartz.ScheduledJob, matching jobs by their name.
// It has public fields to allow predicate pushdown in custom quartz.JobQueue
// implementations.
type JobName struct {
Operator *StringOperator // uses a pointer to compare with standard operators
Pattern string
}

var _ quartz.Matcher[quartz.ScheduledJob] = (*JobName)(nil)

// NewJobName returns a new JobName matcher given the string operator and pattern.
func NewJobName(operator *StringOperator, pattern string) quartz.Matcher[quartz.ScheduledJob] {
return &JobName{
Operator: operator,
Pattern: pattern,
}
}

// JobNameEquals returns a new JobName, matching jobs whose name is identical
// to the given string pattern.
func JobNameEquals(pattern string) quartz.Matcher[quartz.ScheduledJob] {
return NewJobName(&StringEquals, pattern)
}

// JobNameStartsWith returns a new JobName, matching jobs whose name starts
// with the given string pattern.
func JobNameStartsWith(pattern string) quartz.Matcher[quartz.ScheduledJob] {
return NewJobName(&StringStartsWith, pattern)
}

// JobNameEndsWith returns a new JobName, matching jobs whose name ends
// with the given string pattern.
func JobNameEndsWith(pattern string) quartz.Matcher[quartz.ScheduledJob] {
return NewJobName(&StringEndsWith, pattern)
}

// JobNameContains returns a new JobName, matching jobs whose name contains
// the given string pattern.
func JobNameContains(pattern string) quartz.Matcher[quartz.ScheduledJob] {
return NewJobName(&StringContains, pattern)
}

// IsMatch evaluates JobName matcher on the given job.
func (n *JobName) IsMatch(job quartz.ScheduledJob) bool {
return (*n.Operator)(job.JobDetail().JobKey().Name(), n.Pattern)
}
30 changes: 30 additions & 0 deletions matcher/job_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package matcher

import (
"github.com/reugn/go-quartz/quartz"
)

// JobStatus implements the quartz.Matcher interface with the type argument
// quartz.ScheduledJob, matching jobs by their status.
// It has public fields to allow predicate pushdown in custom quartz.JobQueue
// implementations.
type JobStatus struct {
Suspended bool
}

var _ quartz.Matcher[quartz.ScheduledJob] = (*JobStatus)(nil)

// JobActive returns a matcher to match active jobs.
func JobActive() quartz.Matcher[quartz.ScheduledJob] {
return &JobStatus{false}
}

// JobPaused returns a matcher to match paused jobs.
func JobPaused() quartz.Matcher[quartz.ScheduledJob] {
return &JobStatus{true}
}

// IsMatch evaluates JobStatus matcher on the given job.
func (s *JobStatus) IsMatch(job quartz.ScheduledJob) bool {
return job.JobDetail().Options().Suspended == s.Suspended
}
18 changes: 18 additions & 0 deletions matcher/string_operator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package matcher

import "strings"

// StringOperator is a function to equate two strings.
type StringOperator func(string, string) bool

// String operators.
var (
StringEquals StringOperator = stringsEqual
StringStartsWith StringOperator = strings.HasPrefix
StringEndsWith StringOperator = strings.HasSuffix
StringContains StringOperator = strings.Contains
)

func stringsEqual(source, target string) bool {
return source == target
}
3 changes: 2 additions & 1 deletion quartz/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import (
)

// Job represents an interface to be implemented by structs which
// represent a 'job' to be performed.
// represent a task to be performed.
// Some Job implementations can be found in the job package.
type Job interface {
// Execute is called by a Scheduler when the Trigger associated
// with this job fires.
Expand Down
10 changes: 10 additions & 0 deletions quartz/job_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,13 @@ func (jobKey *JobKey) Equals(that *JobKey) bool {
return jobKey.name == that.name &&
jobKey.group == that.group
}

// Name returns the name of the JobKey.
func (jobKey *JobKey) Name() string {
return jobKey.name
}

// Group returns the group of the JobKey.
func (jobKey *JobKey) Group() string {
return jobKey.group
}
10 changes: 10 additions & 0 deletions quartz/matcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package quartz

// Matcher represents a predicate (boolean-valued function) of one argument.
// Matchers can be used in various Scheduler API methods to select the entities
// that should be operated.
// Standard Matcher implementations are located in the matcher package.
type Matcher[T any] interface {
// IsMatch evaluates this matcher on the given argument.
IsMatch(T) bool
}
Loading

0 comments on commit c5ad3de

Please sign in to comment.