diff --git a/README.md b/README.md index 1dd5b65..77e0946 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,8 @@ Simple in-memory job queue for Golang using worker-based dispatching Documentation here: https://godoc.org/github.com/mborders/artifex +Cron jobs use the robfig/cron library: https://godoc.org/github.com/robfig/cron + ## Example Usage ```go @@ -32,6 +34,14 @@ dt, err := d.DispatchEvery(func() { // Stop a given DispatchTicker dt.Stop() +// Returns a DispatchCron +dc, err := d.DispatchCron(func() { + // do something every 1s +}, "*/1 * * * * *") + +// Stop a given DispatchCron +dc.Stop() + // Stop a dispatcher and all its workers/tickers d.Stop() ``` diff --git a/dispatcher.go b/dispatcher.go index f61b95e..ea29fc0 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -2,6 +2,7 @@ package artifex import ( "errors" + "github.com/robfig/cron/v3" "time" ) @@ -12,6 +13,7 @@ type Dispatcher struct { maxQueue int workers []*Worker tickers []*DispatchTicker + crons []*DispatchCron workerPool chan chan Job jobQueue chan Job quit chan bool @@ -34,6 +36,7 @@ func NewDispatcher(maxWorkers int, maxQueue int) *Dispatcher { func (d *Dispatcher) Start() { d.workers = []*Worker{} d.tickers = []*DispatchTicker{} + d.crons = []*DispatchCron{} d.workerPool = make(chan chan Job, d.maxWorkers) d.jobQueue = make(chan Job, d.maxQueue) d.quit = make(chan bool) @@ -78,8 +81,13 @@ func (d *Dispatcher) Stop() { d.tickers[i].Stop() } + for i := range d.crons { + d.crons[i].Stop() + } + d.workers = []*Worker{} d.tickers = []*DispatchTicker{} + d.crons = []*DispatchCron{} d.quit <- true } @@ -134,6 +142,28 @@ func (d *Dispatcher) DispatchEvery(run func(), interval time.Duration) (*Dispatc return dt, nil } +// DispatchEvery pushes the given job into the job queue +// each time the cron definition is met +func (d *Dispatcher) DispatchCron(run func(), cronStr string) (*DispatchCron, error) { + if !d.active { + return nil, errors.New("dispatcher is not active") + } + + dc := &DispatchCron{cron: cron.New(cron.WithSeconds())} + d.crons = append(d.crons, dc) + + _, err := dc.cron.AddFunc(cronStr, func() { + d.jobQueue <- Job{Run: run} + }) + + if err != nil { + return nil, errors.New("invalid cron definition") + } + + dc.cron.Start() + return dc, nil +} + // DispatchTicker represents a dispatched job ticker // that executes on a given interval. This provides // a means for stopping the execution cycle from continuing. @@ -147,3 +177,14 @@ func (dt *DispatchTicker) Stop() { dt.ticker.Stop() dt.quit <- true } + +// DispatchCron represents a dispatched cron job +// that executes using cron expression formats. +type DispatchCron struct { + cron *cron.Cron +} + +// Stops ends the execution cycle for the given cron. +func (c *DispatchCron) Stop() { + c.cron.Stop() +} diff --git a/dispatcher_test.go b/dispatcher_test.go index 9588177..8e81e24 100644 --- a/dispatcher_test.go +++ b/dispatcher_test.go @@ -160,6 +160,10 @@ func TestDispatcher_Stop(t *testing.T) { }, time.Millisecond*100) assert.NotNil(t, err) + _, err = d.DispatchCron(func() { + }, "*/1 * * * * *") + assert.NotNil(t, err) + time.Sleep(time.Millisecond * 100) assert.Equal(t, 1, c) } @@ -209,3 +213,45 @@ func TestDispatcher_StopTwice(t *testing.T) { d.Stop() d.Stop() } + +func TestDispatcher_DispatchCron(t *testing.T) { + c := 0 + d := NewDispatcher(1, 3) + d.Start() + + _, err := d.DispatchCron(func() { + c++ + }, "*/1 * * * * *") + assert.Nil(t, err) + + time.Sleep(time.Millisecond * 3000) + assert.Equal(t, 3, c) +} + +func TestDispatcher_DispatchCron_Stop(t *testing.T) { + c := 0 + d := NewDispatcher(1, 3) + d.Start() + + _, err := d.DispatchCron(func() { + c++ + }, "*/1 * * * * *") + assert.Nil(t, err) + + time.Sleep(time.Millisecond * 3000) + d.Stop() + assert.Equal(t, 3, c) + + time.Sleep(time.Second * 1) + assert.Equal(t, 3, c) +} + +func TestDispatcher_DispatchCron_InvalidDefinition(t *testing.T) { + d := NewDispatcher(1, 3) + d.Start() + + _, err := d.DispatchCron(func() { + }, "foobar") + assert.NotNil(t, err) + assert.Equal(t, "invalid cron definition", err.Error()) +} \ No newline at end of file diff --git a/go.mod b/go.mod index 0685c30..c15b15d 100644 --- a/go.mod +++ b/go.mod @@ -2,5 +2,6 @@ module github.com/mborders/artifex require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.3.0 ) diff --git a/go.sum b/go.sum index 4f89841..70f1c7d 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=