diff --git a/.examples/cron_example.go b/.examples/cron_example.go index 8dae85d..b904e2b 100644 --- a/.examples/cron_example.go +++ b/.examples/cron_example.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "log" + "math/rand" "os" "os/signal" "strconv" @@ -48,9 +49,18 @@ func main() { cron, err := etcdcron.New( etcdcron.WithNamespace(namespace), etcdcron.WithPartitioning(p), - etcdcron.WithTriggerFunc(func(ctx context.Context, metadata map[string]string, payload *anypb.Any) error { + etcdcron.WithTriggerFunc(func(ctx context.Context, metadata map[string]string, payload *anypb.Any) (etcdcron.TriggerResult, error) { + if metadata["failure"] == "yes" { + // Failure does not trigger the errorsHandler() callback. It just skips the counter update. + return etcdcron.Failure, nil + } + if metadata["stop"] == "random" { + if rand.Int()%3 == 0 { + return etcdcron.Delete, nil + } + } log.Printf("Trigger from pid %d: %s\n", os.Getpid(), string(payload.Value)) - return nil + return etcdcron.OK, nil }), ) if err != nil { @@ -72,18 +82,19 @@ func main() { wg.Done() }() + now := time.Now() if os.Getenv("ADD") == "1" { cron.AddJob(ctx, etcdcron.Job{ Name: "every-2s-dFG3F3DSGSGds", - Rhythm: "*/2 * * * * *", - StartTime: time.Time{}, // even seconds - Payload: &anypb.Any{Value: []byte("ev 2s even")}, + Rhythm: "@every 2s", + StartTime: now, + Payload: &anypb.Any{Value: []byte("ev 2s from now")}, }) cron.AddJob(ctx, etcdcron.Job{ Name: "every-2s-b34w5y5hbwthjs", - Rhythm: "*/2 * * * * *", - StartTime: time.Time{}.Add(time.Second), // odd seconds - Payload: &anypb.Any{Value: []byte("ev 2s odd")}, + Rhythm: "@every 2s", + StartTime: now.Add(time.Second), // odd seconds + Payload: &anypb.Any{Value: []byte("ev 2s from now+1s")}, }) cron.AddJob(ctx, etcdcron.Job{ Name: "every-10s-bnsf45354wbdsnd", @@ -91,14 +102,29 @@ func main() { Payload: &anypb.Any{Value: []byte("ev 10s")}, }) cron.AddJob(ctx, etcdcron.Job{ - Name: "every-3s-mdhgm764324rqdg", - Rhythm: "*/3 * * * * *", - Payload: &anypb.Any{Value: []byte("ev 3s")}, + Name: "every-3s-mdhgm764324rqdg", + Rhythm: "@every 3s", + StartTime: time.Now().Add(10 * time.Second), + Payload: &anypb.Any{Value: []byte("waits 10s then ev 3s")}, }) cron.AddJob(ctx, etcdcron.Job{ Name: "every-4s-vdafbrtjnysh245", Rhythm: "*/4 * * * * *", - Payload: &anypb.Any{Value: []byte("ev 4s")}, + Repeats: 3, // Only triggers 3 times + Payload: &anypb.Any{Value: []byte("ev 4s 3 times only")}, + }) + cron.AddJob(ctx, etcdcron.Job{ + Name: "every-4s-nmdjfgx35u7jfsgjgsf", + Rhythm: "*/4 * * * * *", + Repeats: 3, // Only triggers 3 times + Metadata: map[string]string{"failure": "yes"}, + Payload: &anypb.Any{Value: []byte("ev 4s never expires because it returns a failure condition")}, + }) + cron.AddJob(ctx, etcdcron.Job{ + Name: "every-1s-agdg42y645ydfdha", + Rhythm: "@every 1s", + Metadata: map[string]string{"stop": "random"}, + Payload: &anypb.Any{Value: []byte("ev 1s with random stop")}, }) cron.AddJob(ctx, etcdcron.Job{ Name: "every-5s-adjbg43q5rbafbr44", diff --git a/README.md b/README.md index 9823bb9..6b16834 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ c, _ := etcdclientv3.New(etcdclientv3.Config{ Endpoints: []string{"etcd-host1:2379", "etcd-host2:2379"}, }) cron, _ := etcdcron.New( - WithEtcdMutexBuilder(c), + WithEtcdClient(c), WithNamespace("my-example"), // multi-tenancy WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error { log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value)) diff --git a/counting/counting_test.go b/counting/counting_test.go new file mode 100644 index 0000000..cb26514 --- /dev/null +++ b/counting/counting_test.go @@ -0,0 +1,74 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package counting + +import ( + "context" + "testing" + "time" + + "github.com/diagridio/go-etcd-cron/partitioning" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + etcdclientv3 "go.etcd.io/etcd/client/v3" +) + +const defaultEtcdEndpoint = "127.0.0.1:2379" + +func TestCounterTTL(t *testing.T) { + ctx := context.TODO() + organizer := partitioning.NewOrganizer(randomNamespace(), partitioning.NoPartitioning()) + etcdClient, err := etcdclientv3.New(etcdclientv3.Config{ + Endpoints: []string{defaultEtcdEndpoint}, + }) + require.NoError(t, err) + + key := organizer.CounterPath(0, "count") + // This counter will expire keys 1s after their next scheduled trigger. + counter := NewEtcdCounter(etcdClient, key, time.Second) + + value, updated, err := counter.Add(ctx, 1, time.Now().Add(time.Second)) + require.NoError(t, err) + assert.True(t, updated) + assert.Equal(t, 1, value) + + value, updated, err = counter.Add(ctx, 2, time.Now().Add(time.Second)) + require.NoError(t, err) + assert.True(t, updated) + assert.Equal(t, 3, value) + + time.Sleep(time.Second) + + value, updated, err = counter.Add(ctx, -4, time.Now().Add(time.Second)) + require.NoError(t, err) + assert.True(t, updated) + assert.Equal(t, -1, value) + + // Counter expires 1 second after the next scheduled trigger (in this test's config) + time.Sleep(3 * time.Second) + + // Counter should have expired but the in-memory value continues. + // Even if key is expired in the db, a new operation will set it again, with a new TTL. + value, updated, err = counter.Add(ctx, 0, time.Now().Add(time.Second)) + require.NoError(t, err) + assert.True(t, updated) + assert.Equal(t, -1, value) + + // Counter expires 1 second after the next scheduled trigger. + time.Sleep(3 * time.Second) + + // A new instance will start from 0 since the db record is expired. + counter = NewEtcdCounter(etcdClient, key, time.Second) + value, updated, err = counter.Add(ctx, 0, time.Now().Add(time.Second)) + require.NoError(t, err) + assert.True(t, updated) + assert.Equal(t, 0, value) +} + +func randomNamespace() string { + return uuid.New().String() +} diff --git a/counting/etcd.go b/counting/etcd.go new file mode 100644 index 0000000..32b2902 --- /dev/null +++ b/counting/etcd.go @@ -0,0 +1,79 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package counting + +import ( + "context" + "fmt" + "strconv" + "time" + + etcdclientv3 "go.etcd.io/etcd/client/v3" +) + +type Counter interface { + // Applies by the given delta (+ or -) and return the updated value. + // Count has a ttl calculated using the next tick's time. + // Returns (updated value, true if value was updated in memory, err if any error happened) + // It is possible that the value is updated but an error occurred while trying to persist it. + Add(context.Context, int, time.Time) (int, bool, error) +} + +// It keeps a cache of the value and updates async. +// It works assuming there cannot be two concurrent writes to the same key. +// Concurrency is handled at the job level, which makes this work. +type etcdcounter struct { + etcdclient *etcdclientv3.Client + key string + + loaded bool + value int + ttlOffset time.Duration +} + +func NewEtcdCounter(c *etcdclientv3.Client, key string, ttlOffset time.Duration) Counter { + return &etcdcounter{ + etcdclient: c, + key: key, + ttlOffset: ttlOffset, + } +} + +func (c *etcdcounter) Add(ctx context.Context, delta int, next time.Time) (int, bool, error) { + if !c.loaded { + // First, load the key's value. + res, err := c.etcdclient.KV.Get(ctx, c.key) + if err != nil { + return 0, false, err + } + if len(res.Kvs) == 0 { + c.value = 0 + c.loaded = true + } else { + if res.Kvs[0].Value == nil { + return 0, false, fmt.Errorf("nil value for key %s", c.key) + } + if len(res.Kvs[0].Value) == 0 { + return 0, false, fmt.Errorf("empty value for key %s", c.key) + } + + c.value, err = strconv.Atoi(string(res.Kvs[0].Value)) + if err != nil { + return 0, false, err + } + } + } + + c.value += delta + // Create a lease + ttl := time.Until(next.Add(c.ttlOffset)) + lease, err := c.etcdclient.Grant(ctx, int64(ttl.Seconds())) + if err != nil { + return c.value, true, err + } + _, err = c.etcdclient.KV.Put(ctx, c.key, strconv.Itoa(c.value), etcdclientv3.WithLease(lease.ID)) + return c.value, true, err +} diff --git a/cron.go b/cron.go index c64c75a..f8560a0 100644 --- a/cron.go +++ b/cron.go @@ -18,6 +18,7 @@ import ( anypb "google.golang.org/protobuf/types/known/anypb" "github.com/diagridio/go-etcd-cron/collector" + "github.com/diagridio/go-etcd-cron/counting" "github.com/diagridio/go-etcd-cron/locking" "github.com/diagridio/go-etcd-cron/partitioning" "github.com/diagridio/go-etcd-cron/rhythm" @@ -43,7 +44,7 @@ type Cron struct { etcdErrorsHandler func(context.Context, Job, error) errorsHandler func(context.Context, Job, error) funcCtx func(context.Context, Job) context.Context - triggerFunc func(context.Context, map[string]string, *anypb.Any) error + triggerFunc TriggerFunction running bool runWaitingGroup sync.WaitGroup etcdclient *etcdclient.Client @@ -53,6 +54,16 @@ type Cron struct { collector collector.Collector } +type TriggerFunction func(ctx context.Context, metadata map[string]string, payload *anypb.Any) (TriggerResult, error) + +type TriggerResult int + +const ( + OK TriggerResult = iota + Failure + Delete +) + // Entry consists of a schedule and the func to execute on that schedule. type Entry struct { // The schedule on which this job should be run. @@ -71,6 +82,21 @@ type Entry struct { // Prefix for the ticker mutex distMutexPrefix string + + // Counter if has limit on number of triggers + counter counting.Counter +} + +func (e *Entry) tick(now time.Time) { + e.Prev = e.Next + start := e.Job.StartTime.Truncate(time.Second) + + if start.After(now) { + e.Next = start + return + } + + e.Next = e.Schedule.Next(start, now) } // byTime is a wrapper for sorting the entry array by time @@ -124,7 +150,7 @@ func WithFuncCtx(f func(context.Context, Job) context.Context) CronOpt { }) } -func WithTriggerFunc(f func(context.Context, map[string]string, *anypb.Any) error) CronOpt { +func WithTriggerFunc(f TriggerFunction) CronOpt { return CronOpt(func(cron *Cron) { cron.triggerFunc = f }) @@ -258,11 +284,19 @@ func (c *Cron) schedule(schedule rhythm.Schedule, job *Job) error { return fmt.Errorf("host does not own partition %d", partitionId) } + var counter counting.Counter + if job.Repeats > 0 { + counterKey := c.organizer.CounterPath(partitionId, job.Name) + // Needs to count the number of invocations + // TODO(artursouza): get ttl param for counter instead of hardcode it. + counter = counting.NewEtcdCounter(c.etcdclient, counterKey, 48*time.Hour) + } + entry := &Entry{ Schedule: schedule, Job: *job, - Prev: time.Unix(0, 0), distMutexPrefix: c.organizer.TicksPath(partitionId) + "/", + counter: counter, } c.appendOperation(func(ctx context.Context) *Entry { @@ -335,7 +369,7 @@ func (c *Cron) run(ctx context.Context) { for _, op := range c.pendingOperations { newEntry := op(ctx) if newEntry != nil { - newEntry.Next = newEntry.Schedule.Next(now) + newEntry.tick(now) } } for _, e := range c.entries { @@ -362,7 +396,7 @@ func (c *Cron) run(ctx context.Context) { c.entriesMutex.Lock() newEntry := op(ctx) if newEntry != nil { - newEntry.Next = newEntry.Schedule.Next(now) + newEntry.tick(now) } entries = []*Entry{} for _, e := range c.entries { @@ -377,9 +411,9 @@ func (c *Cron) run(ctx context.Context) { break } e.Prev = e.Next - e.Next = e.Schedule.Next(effective) + e.tick(effective) - go func(ctx context.Context, e *Entry) { + go func(ctx context.Context, e *Entry, next time.Time) { if c.funcCtx != nil { ctx = c.funcCtx(ctx, e.Job) } @@ -406,15 +440,53 @@ func (c *Cron) run(ctx context.Context) { return } - err = c.triggerFunc(ctx, e.Job.Metadata, e.Job.Payload) + result, err := c.triggerFunc(ctx, e.Job.Metadata, e.Job.Payload) if err != nil { go c.errorsHandler(ctx, e.Job, err) return } + + if result == Delete { + // Job must be deleted. + // This is handy if client wants to have a custom logic to decide if job is over. + // One example, is having a more efficient way to count number of invocations. + err = c.DeleteJob(ctx, e.Job.Name) + if err != nil { + go c.errorsHandler(ctx, e.Job, err) + } + + // No need to check (and delete) a counter since every counter has a TTL. + return + } + + increment := 1 + if result == Failure { + // Does not increment counter, but refreshes the TTL. + increment = 0 + } + + if e.counter != nil { + // Needs to check number of triggers + count, updated, err := e.counter.Add(ctx, increment, next) + if err != nil { + go c.errorsHandler(ctx, e.Job, err) + // No need to abort if updating the count failed. + // The count solution is not transactional anyway. + } + + if updated { + if count >= int(e.Job.Repeats) { + err = c.DeleteJob(ctx, e.Job.Name) + if err != nil { + go c.errorsHandler(ctx, e.Job, err) + } + } + } + } // Cannot unlock because it can open a chance for double trigger since two instances // can have a clock skew and compete for the lock at slight different windows. // So, we keep the lock during its ttl - }(ctx, e) + }(ctx, e, e.Next) } continue diff --git a/cron_test.go b/cron_test.go index 6e01242..bd6189b 100644 --- a/cron_test.go +++ b/cron_test.go @@ -22,7 +22,7 @@ import ( // Many tests schedule a job for every second, and then wait at most a second // for it to run. This amount is just slightly larger than 1 second to // compensate for a few milliseconds of runtime. -const ONE_SECOND = 1*time.Second + 200*time.Millisecond +const ONE_SECOND_PLUS_SOME = 1*time.Second + 200*time.Millisecond // Start and stop cron with no entries. func TestNoEntries(t *testing.T) { @@ -34,7 +34,7 @@ func TestNoEntries(t *testing.T) { cron.Start(ctx) select { - case <-time.After(ONE_SECOND): + case <-time.After(ONE_SECOND_PLUS_SOME): t.FailNow() case <-stop(cron, cancel): } @@ -47,9 +47,9 @@ func TestStopCausesJobsToNotRun(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -64,7 +64,7 @@ func TestStopCausesJobsToNotRun(t *testing.T) { }) select { - case <-time.After(ONE_SECOND): + case <-time.After(ONE_SECOND_PLUS_SOME): // No job ran! case <-wait(wg): t.FailNow() @@ -79,14 +79,14 @@ func TestAddBeforeRunning(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { if calledAlready { - return nil + return OK, nil } calledAlready = true wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -104,12 +104,154 @@ func TestAddBeforeRunning(t *testing.T) { // Give cron 2 seconds to run our job (which is always activated). select { - case <-time.After(2 * ONE_SECOND): + case <-time.After(2 * ONE_SECOND_PLUS_SOME): t.FailNow() case <-wait(wg): } } +// Add jobs with delayed start. +func TestDelayedStart(t *testing.T) { + calledCount1 := atomic.Int32{} + calledCount2 := atomic.Int32{} + + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { + if m["id"] == "one" { + calledCount1.Add(1) + } + if m["id"] == "two" { + calledCount2.Add(1) + } + return OK, nil + })) + if err != nil { + t.Fatal("unexpected error") + } + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ + Name: "test-delayed-start-1", + Rhythm: "* * * * * *", + Metadata: singleMetadata("id", "one"), + StartTime: time.Now().Add(5 * time.Second), + }) + cron.AddJob(ctx, Job{ + Name: "test-delayed-start-2", + Rhythm: "@every 1s", + Metadata: singleMetadata("id", "two"), + StartTime: time.Now().Add(5 * time.Second), + }) + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() + + time.Sleep(4 * time.Second) + assert.Equal(t, int32(0), calledCount1.Load()) + assert.Equal(t, int32(0), calledCount2.Load()) + + time.Sleep(5 * time.Second) + count1 := calledCount1.Load() + count2 := calledCount2.Load() + assert.True(t, (count1 == 4) || (count1 == 5), "count1 was: %d", count1) + assert.True(t, (count2 == 4) || (count2 == 5), "count2 was: %d", count2) +} + +// Job with repeat limit. +func TestRepeatLimit(t *testing.T) { + calledCount := atomic.Int32{} + + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { + calledCount.Add(1) + return OK, nil + })) + if err != nil { + t.Fatal("unexpected error") + } + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ + Name: "test-repeat-limit", + Rhythm: "* * * * * *", + Repeats: 3, + }) + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() + + time.Sleep(5 * time.Second) + assert.Equal(t, int32(3), calledCount.Load()) + assert.Nil(t, cron.GetJob("test-repeat-limit")) +} + +// Job with failure never increments the counter. +func TestFailureDoesNotIncrementCounter(t *testing.T) { + calledCount := atomic.Int32{} + + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { + calledCount.Add(1) + return Failure, nil + })) + if err != nil { + t.Fatal("unexpected error") + } + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ + Name: "test-failure-does-not-count", + Rhythm: "* * * * * *", + Repeats: 2, + }) + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() + + time.Sleep(3 * ONE_SECOND_PLUS_SOME) + assert.Equal(t, int32(3), calledCount.Load()) + assert.NotNil(t, cron.GetJob("test-failure-does-not-count")) +} + +// Job with custom limit. +func TestCustomLimit(t *testing.T) { + calledCount := atomic.Int32{} + maxCount := int32(2) + + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { + calledCount.Add(1) + if calledCount.Load() == maxCount { + return Delete, nil + } + return OK, nil + })) + if err != nil { + t.Fatal("unexpected error") + } + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ + Name: "test-custom-limit", + Rhythm: "* * * * * *", + }) + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() + + time.Sleep(2 * ONE_SECOND_PLUS_SOME) + assert.Equal(t, maxCount, calledCount.Load()) + assert.Nil(t, cron.GetJob("test-repeat-limit")) +} + // Start cron, add a job, expect it runs. func TestAddWhileRunning(t *testing.T) { wg := &sync.WaitGroup{} @@ -117,9 +259,9 @@ func TestAddWhileRunning(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -137,7 +279,7 @@ func TestAddWhileRunning(t *testing.T) { }) select { - case <-time.After(2 * ONE_SECOND): + case <-time.After(2 * ONE_SECOND_PLUS_SOME): t.FailNow() case <-wait(wg): } @@ -150,9 +292,9 @@ func TestSnapshotEntries(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -170,13 +312,13 @@ func TestSnapshotEntries(t *testing.T) { // After 1 second, call Entries. select { - case <-time.After(ONE_SECOND): + case <-time.After(ONE_SECOND_PLUS_SOME): cron.Entries() } // Even though Entries was called, the cron should fire twice within 3 seconds (1 + 3). select { - case <-time.After(3 * ONE_SECOND): + case <-time.After(3 * ONE_SECOND_PLUS_SOME): t.FailNow() case <-wait(wg): } @@ -190,16 +332,16 @@ func TestDelayedAdd(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { if m["op"] == "noop" { - return nil + return OK, nil } if called { t.Fatal("cannot call twice") } called = true wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -228,7 +370,7 @@ func TestDelayedAdd(t *testing.T) { // Event should be called only once within 2 seconds. select { - case <-time.After(3 * ONE_SECOND): + case <-time.After(3 * ONE_SECOND_PLUS_SOME): t.FailNow() case <-wait(wg): } @@ -244,13 +386,13 @@ func TestMultipleEntries(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { if m["op"] == "return-nil" { - return nil + return OK, nil } wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -282,7 +424,7 @@ func TestMultipleEntries(t *testing.T) { }() select { - case <-time.After(2 * ONE_SECOND): + case <-time.After(2 * ONE_SECOND_PLUS_SOME): t.FailNow() case <-wait(wg): } @@ -295,13 +437,13 @@ func TestRunningJobTwice(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { if m["op"] == "return-nil" { - return nil + return OK, nil } wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -329,7 +471,7 @@ func TestRunningJobTwice(t *testing.T) { }() select { - case <-time.After(2 * ONE_SECOND): + case <-time.After(2 * ONE_SECOND_PLUS_SOME): t.FailNow() case <-wait(wg): } @@ -341,13 +483,13 @@ func TestRunningMultipleSchedules(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { if m["op"] == "return-nil" { - return nil + return OK, nil } wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -379,7 +521,7 @@ func TestRunningMultipleSchedules(t *testing.T) { }() select { - case <-time.After(2 * ONE_SECOND): + case <-time.After(2 * ONE_SECOND_PLUS_SOME): t.FailNow() case <-wait(wg): } @@ -397,12 +539,12 @@ func TestLocalTimezone(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { if called.Add(1) > 1 { - return nil + return OK, nil } wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -420,7 +562,7 @@ func TestLocalTimezone(t *testing.T) { }() select { - case <-time.After(3 * ONE_SECOND): + case <-time.After(3 * ONE_SECOND_PLUS_SOME): t.FailNow() case <-wait(wg): } @@ -434,13 +576,13 @@ func TestJob(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { if calledAlready { - return nil + return OK, nil } calledAlready = true wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -477,7 +619,7 @@ func TestJob(t *testing.T) { }() select { - case <-time.After(2 * ONE_SECOND): + case <-time.After(2 * ONE_SECOND_PLUS_SOME): t.FailNow() case <-wait(wg): } @@ -506,9 +648,9 @@ func TestCron_Parallel(t *testing.T) { cron1, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -521,9 +663,9 @@ func TestCron_Parallel(t *testing.T) { cron2, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -545,7 +687,7 @@ func TestCron_Parallel(t *testing.T) { cron2.Start(ctx2) select { - case <-time.After(time.Duration(2) * ONE_SECOND): + case <-time.After(time.Duration(2) * ONE_SECOND_PLUS_SOME): t.FailNow() case <-wait(wg): } @@ -560,10 +702,10 @@ func TestTTL(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) (TriggerResult, error) { firedOnce.Store(true) wg.Done() - return nil + return OK, nil })) if err != nil { t.Fatal("unexpected error") @@ -582,7 +724,7 @@ func TestTTL(t *testing.T) { }() select { - case <-time.After(6 * ONE_SECOND): + case <-time.After(6 * ONE_SECOND_PLUS_SOME): // Success, it means it did not consume all the workgroup count because the job expired. assert.True(t, firedOnce.Load()) case <-wait(wg): diff --git a/job.go b/job.go index 2ea6687..ad998f8 100644 --- a/job.go +++ b/job.go @@ -18,20 +18,26 @@ type Job struct { Name string // Cron-formatted rhythm (ie. 0,10,30 1-5 0 * * *) Rhythm string - // Any metadata that the client understands. + // Optional metadata that the client understands. Metadata map[string]string - // The payload containg all the information for the trigger + // Optional payload containg all the information for the trigger Payload *anypb.Any + // Optional start time for the first trigger of the schedule + Repeats int32 + // Optional start time for the first trigger of the schedule + StartTime time.Time // Optional number of seconds until this job expires (if > 0) TTL time.Duration } func (j *Job) toJobRecord() (*storage.JobRecord, storage.JobRecordOptions) { return &storage.JobRecord{ - Name: j.Name, - Rhythm: j.Rhythm, - Metadata: j.Metadata, - Payload: j.Payload, + Name: j.Name, + Rhythm: j.Rhythm, + Metadata: j.Metadata, + Payload: j.Payload, + Repeats: j.Repeats, + StartTimestamp: j.StartTime.Unix(), }, storage.JobRecordOptions{ TTL: j.TTL, } @@ -43,9 +49,11 @@ func jobFromJobRecord(r *storage.JobRecord) *Job { } return &Job{ - Name: r.Name, - Rhythm: r.Rhythm, - Metadata: r.Metadata, - Payload: r.Payload, + Name: r.Name, + Rhythm: r.Rhythm, + Metadata: r.Metadata, + Payload: r.Payload, + Repeats: r.Repeats, + StartTime: time.Unix(r.StartTimestamp, 0), } } diff --git a/locking/etcd.go b/locking/etcd.go index 59f56b7..6b34a8a 100644 --- a/locking/etcd.go +++ b/locking/etcd.go @@ -10,9 +10,6 @@ import ( etcdclient "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" - - "github.com/diagridio/go-etcd-cron/partitioning" - "github.com/diagridio/go-etcd-cron/storage" ) type DistributedMutex interface { @@ -22,15 +19,6 @@ type DistributedMutex interface { Unlock(ctx context.Context) error } -type EtcdMutexBuilder interface { - NewMutex(pfx string) (DistributedMutex, error) - NewJobStore( - organizer partitioning.Organizer, - partitioning partitioning.Partitioner, - putCallback func(context.Context, *storage.JobRecord) error, - deleteCallback func(context.Context, string) error) storage.JobStore -} - func NewDistributedMutex(etcdclient *etcdclient.Client, pfx string) (DistributedMutex, error) { // We keep the lock per run, reusing the lock over multiple iterations. // If we lose the lock, another instance will take it. diff --git a/partitioning/organizer.go b/partitioning/organizer.go index 249193a..502afad 100644 --- a/partitioning/organizer.go +++ b/partitioning/organizer.go @@ -15,6 +15,7 @@ type Organizer interface { JobPath(jobName string) string JobsPath(partitionId int) string TicksPath(partitionId int) string + CounterPath(partitionId int, name string) string } type organizer struct { @@ -40,3 +41,7 @@ func (o *organizer) JobsPath(partitionId int) string { func (o *organizer) TicksPath(partitionId int) string { return filepath.Join(o.namespace, "partitions", strconv.Itoa(partitionId), "ticks") } + +func (o *organizer) CounterPath(partitionId int, name string) string { + return filepath.Join(o.namespace, "partitions", strconv.Itoa(partitionId), "counts", name) +} diff --git a/proto/storage/job.proto b/proto/storage/job.proto index 98adcdc..98f0c9f 100644 --- a/proto/storage/job.proto +++ b/proto/storage/job.proto @@ -16,6 +16,8 @@ option go_package = "github.com/diagridio/go-etcd-cron/storage;storage"; message JobRecord { string name = 1; string rhythm = 2; - map metadata = 3; - google.protobuf.Any payload = 4; -} \ No newline at end of file + int64 startTimestamp = 3; // time that the schedule starts + int32 repeats = 4; // time total time the job should trigger + map metadata = 5; + google.protobuf.Any payload = 6; +} diff --git a/rhythm/constantdelay.go b/rhythm/constantdelay.go index 07f2c35..8c7ee04 100644 --- a/rhythm/constantdelay.go +++ b/rhythm/constantdelay.go @@ -20,6 +20,7 @@ func Every(duration time.Duration) ConstantDelaySchedule { if duration < time.Second { duration = time.Second } + return ConstantDelaySchedule{ Delay: duration - time.Duration(duration.Nanoseconds())%time.Second, } @@ -27,6 +28,29 @@ func Every(duration time.Duration) ConstantDelaySchedule { // Next returns the next time this should be run. // This rounds so that the next activation time will be on the second. -func (schedule ConstantDelaySchedule) Next(t time.Time) time.Time { - return t.Add(schedule.Delay - time.Duration(t.Nanosecond())*time.Nanosecond) +func (schedule ConstantDelaySchedule) Next(start, t time.Time) time.Time { + if start.IsZero() { + // schedule is not bound to a starting point + return t.Truncate(time.Second).Add(schedule.Delay) + } + + s := start.Truncate(time.Second) + + if t.Before(s) { + return s + } + + // Truncate to the second + effective := t.Truncate(time.Second) + + // Number of steps from start until now (truncated): + steps := int64(effective.Sub(s).Seconds()) / int64(schedule.Delay.Seconds()) + // Timestamp after taking all those steps: + next := s.Add(time.Duration(int64(schedule.Delay) * steps)) + if !next.After(t) { + // Off by one due to truncation, one more step needed in this case. + next = next.Add(schedule.Delay) + } + + return next } diff --git a/rhythm/constantdelay_test.go b/rhythm/constantdelay_test.go index 24a220f..350b53d 100644 --- a/rhythm/constantdelay_test.go +++ b/rhythm/constantdelay_test.go @@ -50,7 +50,67 @@ func TestConstantDelayNext(t *testing.T) { } for _, c := range tests { - actual := Every(c.delay).Next(getTime(c.time)) + actual := Every(c.delay).Next(time.Time{}, getTime(c.time)) + expected := getTime(c.expected) + if actual != expected { + t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.delay, expected, actual) + } + } +} + +func TestConstantDelayFromStartNext(t *testing.T) { + tests := []struct { + time string + start string + delay time.Duration + expected string + }{ + // Simple cases + {"Mon Jul 9 14:45 2012", "Mon Jul 9 14:45 2012", 15*time.Minute + 50*time.Nanosecond, "Mon Jul 9 15:00 2012"}, + {"Mon Jul 9 14:59 2012", "Mon Jul 9 14:59 2012", 15 * time.Minute, "Mon Jul 9 15:14 2012"}, + {"Mon Jul 9 14:59:59 2012", "Mon Jul 9 14:59:59 2012", 15 * time.Minute, "Mon Jul 9 15:14:59 2012"}, + {"Mon Jul 9 14:59:59 2012", "Mon Jul 9 14:55:01 2012", 10 * time.Minute, "Mon Jul 9 15:05:01 2012"}, + {"Mon Jul 9 15:04:59 2012", "Mon Jul 9 14:55:01 2012", 10 * time.Minute, "Mon Jul 9 15:05:01 2012"}, + + // Simple case for running every second + {"Mon Jul 9 14:45 2015", "Mon Jul 9 14:45 2015", 1 * time.Second, "Mon Jul 9 14:45:01 2015"}, + {"Mon Jul 9 14:45:01 2015", "Mon Jul 9 14:45 2015", 1 * time.Second, "Mon Jul 9 14:45:02 2015"}, + + // Starts only in a distant future + {"Mon Jul 9 15:04:59 2012", "Mon Jul 9 12:05:01 2050", 10 * time.Minute, "Mon Jul 9 12:05:01 2050"}, + + // Wrap around hours + {"Mon Jul 9 15:45 2012", "Mon Jul 9 15:45 2012", 35 * time.Minute, "Mon Jul 9 16:20 2012"}, + {"Mon Jul 9 15:45 2012", "Mon Jul 9 15:44 2012", 35 * time.Minute, "Mon Jul 9 16:19 2012"}, + {"Mon Jul 9 15:45 2012", "Mon Jul 9 15:46 2012", 35 * time.Minute, "Mon Jul 9 15:46 2012"}, + + // Wrap around days + {"Mon Jul 9 23:46 2012", "Mon Jul 9 23:46 2012", 14 * time.Minute, "Tue Jul 10 00:00 2012"}, + {"Mon Jul 9 23:45 2012", "Mon Jul 9 23:45 2012", 35 * time.Minute, "Tue Jul 10 00:20 2012"}, + {"Mon Jul 9 23:35:51 2012", "Mon Jul 9 23:35:51 2012", 44*time.Minute + 24*time.Second, "Tue Jul 10 00:20:15 2012"}, + {"Mon Jul 9 23:35:51 2012", "Mon Jul 9 23:35:51 2012", 25*time.Hour + 44*time.Minute + 24*time.Second, "Thu Jul 11 01:20:15 2012"}, + + // Wrap around months + {"Mon Jul 9 23:35 2012", "Mon Jul 9 23:35 2012", 91*24*time.Hour + 25*time.Minute, "Thu Oct 9 00:00 2012"}, + + // Wrap around minute, hour, day, month, and year + {"Mon Dec 31 23:59:45 2012", "Mon Dec 31 23:59:45 2012", 15 * time.Second, "Tue Jan 1 00:00:00 2013"}, + + // Round to nearest second on the delay + {"Mon Jul 9 14:45 2012", "Mon Jul 9 14:45 2012", 15*time.Minute + 50*time.Nanosecond, "Mon Jul 9 15:00 2012"}, + + // Round up to 1 second if the duration is less. + {"Mon Jul 9 14:45:00 2012", "Mon Jul 9 14:45:00 2012", 15 * time.Millisecond, "Mon Jul 9 14:45:01 2012"}, + + // Round to nearest second when calculating the next time. + {"Mon Jul 9 14:45:00.006 2012", "Mon Jul 9 14:45:00.006 2012", 15 * time.Minute, "Mon Jul 9 15:00 2012"}, + + // Round to nearest second for both. + {"Mon Jul 9 14:45:00.007 2012", "Mon Jul 9 14:45:00.007 2012", 15*time.Minute + 50*time.Nanosecond, "Mon Jul 9 15:00 2012"}, + } + + for _, c := range tests { + actual := Every(c.delay).Next(getTime(c.start), getTime(c.time)) expected := getTime(c.expected) if actual != expected { t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.delay, expected, actual) diff --git a/rhythm/parser_test.go b/rhythm/parser_test.go index 604262c..3534523 100644 --- a/rhythm/parser_test.go +++ b/rhythm/parser_test.go @@ -107,7 +107,7 @@ func TestSpecSchedule(t *testing.T) { expected Schedule }{ {"* 5 * * * *", &SpecSchedule{all(seconds), 1 << 5, all(hours), all(dom), all(months), all(dow)}}, - {"@every 5m", ConstantDelaySchedule{time.Duration(5) * time.Minute}}, + {"@every 5m", ConstantDelaySchedule{Delay: time.Duration(5) * time.Minute}}, } for _, c := range entries { diff --git a/rhythm/schedule.go b/rhythm/schedule.go index 2f30f48..4e79108 100644 --- a/rhythm/schedule.go +++ b/rhythm/schedule.go @@ -11,5 +11,6 @@ import "time" type Schedule interface { // Return the next activation time, later than the given time. // Next is invoked initially, and then each time the job is run. - Next(time.Time) time.Time + // The result must be after the given start time. + Next(start time.Time, now time.Time) time.Time } diff --git a/rhythm/spec.go b/rhythm/spec.go index 70a845f..08fb4f9 100644 --- a/rhythm/spec.go +++ b/rhythm/spec.go @@ -59,7 +59,15 @@ const ( // Next returns the next time this schedule is activated, greater than the given // time. If no time can be found to satisfy the schedule, return the zero time. -func (s *SpecSchedule) Next(t time.Time) time.Time { +func (s *SpecSchedule) Next(start, tick time.Time) time.Time { + // The cron logic only matters after the start time. + t := tick + if t.Before(start) { + // The cron logic does not guarantee delta from the start, since it is an + // schedule based on the absolute clock. + t = start + } + // General approach: // For Month, Day, Hour, Minute, Second: // Check if the time value matches. If yes, continue to the next field. diff --git a/rhythm/spec_test.go b/rhythm/spec_test.go index 4262bdf..73e7122 100644 --- a/rhythm/spec_test.go +++ b/rhythm/spec_test.go @@ -8,6 +8,8 @@ package rhythm import ( "testing" "time" + + "github.com/stretchr/testify/require" ) func TestActivation(t *testing.T) { @@ -66,7 +68,7 @@ func TestActivation(t *testing.T) { t.Error(err) continue } - actual := sched.Next(getTime(test.time).Add(-1 * time.Second)) + actual := sched.Next(time.Time{}, getTime(test.time).Add(-1*time.Second)) expected := getTime(test.time) if test.expected && expected != actual || !test.expected && expected == actual { t.Errorf("Fail evaluating %s on %s: (expected) %s != %s (actual)", @@ -84,6 +86,8 @@ func TestNext(t *testing.T) { {"Mon Jul 9 14:45 2012", "0 0/15 * * *", "Mon Jul 9 15:00 2012"}, {"Mon Jul 9 14:59 2012", "0 0/15 * * *", "Mon Jul 9 15:00 2012"}, {"Mon Jul 9 14:59:59 2012", "0 0/15 * * *", "Mon Jul 9 15:00 2012"}, + {"Mon Jul 9 15:00 2015", "0 0/15 * * *", "Mon Jul 9 15:15 2015"}, + {"Mon Jul 9 15:00:01 2015", "0 0/15 * * *", "Mon Jul 9 15:15 2015"}, // Wrap around hours {"Mon Jul 9 15:45 2012", "0 20-35/15 * * *", "Mon Jul 9 16:20 2012"}, @@ -132,7 +136,74 @@ func TestNext(t *testing.T) { t.Error(err) continue } - actual := sched.Next(getTime(c.time)) + actual := sched.Next(time.Time{}, getTime(c.time)) + expected := getTime(c.expected) + if !actual.Equal(expected) { + t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.spec, expected, actual) + } + } +} + +func TestNextWithDelayedStart(t *testing.T) { + runs := []struct { + time, start, spec string + expected string + }{ + // Simple cases + {"Mon Jul 9 14:45 2012", "Mon Jul 9 14:45 2012", "0 0/15 * * *", "Mon Jul 9 15:00 2012"}, + {"Mon Jul 9 14:44 2013", "Mon Jul 9 14:45:01 2013", "0 0/15 * * *", "Mon Jul 9 15:00 2013"}, + {"Mon Jul 9 14:44 2011", "Mon Jul 9 14:44:59 2011", "0 0/15 * * *", "Mon Jul 9 14:45 2011"}, + {"Mon Jul 9 14:45 2014", "Mon Jul 9 15:00:01 2014", "0 0/15 * * *", "Mon Jul 9 15:15 2014"}, + {"Mon Jul 9 14:45 2015", "Mon Jul 9 15:01:00 2015", "0 0/15 * * *", "Mon Jul 9 15:15 2015"}, + {"Mon Jul 9 15:00 2016", "Mon Jul 9 15:00:00 2016", "0 0/15 * * *", "Mon Jul 9 15:15 2016"}, + {"Mon Jul 9 15:00 2012", "Mon Jul 9 15:00 2012", "0 0/15 * * *", "Mon Jul 9 15:15 2012"}, + {"Mon Jul 9 15:00:01 2012", "Mon Jul 9 15:00 2012", "0 0/15 * * *", "Mon Jul 9 15:15 2012"}, + + // Wrap around hours + {"Mon Jul 9 15:45 2012", "Mon Jul 9 15:45 2012", "0 20-35/15 * * *", "Mon Jul 9 16:20 2012"}, + + // Wrap around days + {"Mon Jul 9 23:46 2012", "Mon Jul 9 23:46 2012", "0 */15 * * *", "Tue Jul 10 00:00 2012"}, + {"Mon Jul 9 23:45 2012", "Mon Jul 9 23:45 2012", "0 20-35/15 * * *", "Tue Jul 10 00:20 2012"}, + {"Mon Jul 9 23:35:51 2012", "Mon Jul 9 23:35:51 2012", "15/35 20-35/15 * * *", "Tue Jul 10 00:20:15 2012"}, + {"Mon Jul 9 23:35:51 2012", "Mon Jul 9 23:35:51 2012", "15/35 20-35/15 1/2 * *", "Tue Jul 10 01:20:15 2012"}, + {"Mon Jul 9 23:35:51 2012", "Mon Jul 9 23:35:51 2012", "15/35 20-35/15 10-12 * *", "Tue Jul 10 10:20:15 2012"}, + + {"Mon Jul 9 23:35:51 2012", "Mon Jul 9 23:35:51 2012", "15/35 20-35/15 1/2 */2 * *", "Thu Jul 11 01:20:15 2012"}, + {"Mon Jul 9 23:35:51 2012", "Mon Jul 9 23:35:51 2012", "15/35 20-35/15 * 9-20 * *", "Wed Jul 10 00:20:15 2012"}, + {"Mon Jul 9 23:35:51 2012", "Mon Jul 9 23:35:51 2012", "15/35 20-35/15 * 9-20 Jul *", "Wed Jul 10 00:20:15 2012"}, + + // Wrap around months + {"Mon Jul 9 23:35 2012", "Mon Jul 9 23:35 2012", "0 0 0 9 Apr-Oct ?", "Thu Aug 9 00:00 2012"}, + {"Mon Jul 9 23:35 2012", "Mon Jul 9 23:35 2012", "0 0 0 */5 Apr,Aug,Oct Mon", "Mon Aug 6 00:00 2012"}, + {"Mon Jul 9 23:35 2012", "Mon Jul 9 23:35 2012", "0 0 0 */5 Oct Mon", "Mon Oct 1 00:00 2012"}, + + // Wrap around years + {"Mon Jul 9 23:35 2012", "Mon Jul 9 23:35 2012", "0 0 0 * Feb Mon", "Mon Feb 4 00:00 2013"}, + {"Mon Jul 9 23:35 2012", "Mon Jul 9 23:35 2012", "0 0 0 * Feb Mon/2", "Fri Feb 1 00:00 2013"}, + + // Wrap around minute, hour, day, month, and year + {"Mon Dec 31 23:59:45 2012", "Mon Dec 31 23:59:45 2012", "0 * * * * *", "Tue Jan 1 00:00:00 2013"}, + + // Leap year + {"Mon Jul 9 23:35 2012", "Mon Jul 9 23:35 2012", "0 0 0 29 Feb ?", "Mon Feb 29 00:00 2016"}, + + // Daylight savings time EST -> EDT + {"2012-03-11T00:00:00-0500", "2012-03-11T00:00:00-0500", "0 30 2 11 Mar ?", "2013-03-11T02:30:00-0400"}, + + // Daylight savings time EDT -> EST + {"2012-11-04T00:00:00-0400", "2012-11-04T00:00:00-0400", "0 30 2 04 Nov ?", "2012-11-04T02:30:00-0500"}, + {"2012-11-04T01:45:00-0400", "2012-11-04T01:45:00-0400", "0 30 1 04 Nov ?", "2012-11-04T01:30:00-0500"}, + + // Unsatisfiable + {"Mon Jul 9 23:35 2012", "Mon Jul 9 23:35 2012", "0 0 0 30 Feb ?", ""}, + {"Mon Jul 9 23:35 2012", "Mon Jul 9 23:35 2012", "0 0 0 31 Apr ?", ""}, + } + + for _, c := range runs { + sched, err := Parse(c.spec) + require.NoError(t, err) + actual := sched.Next(getTime(c.start), getTime(c.time)) expected := getTime(c.expected) if !actual.Equal(expected) { t.Errorf("%s, \"%s\": (expected) %v != %v (actual)", c.time, c.spec, expected, actual) diff --git a/storage/job.pb.go b/storage/job.pb.go index 738a2ec..aa914c0 100644 --- a/storage/job.pb.go +++ b/storage/job.pb.go @@ -32,10 +32,12 @@ type JobRecord struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - Rhythm string `protobuf:"bytes,2,opt,name=rhythm,proto3" json:"rhythm,omitempty"` - Metadata map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - Payload *anypb.Any `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Rhythm string `protobuf:"bytes,2,opt,name=rhythm,proto3" json:"rhythm,omitempty"` + StartTimestamp int64 `protobuf:"varint,3,opt,name=startTimestamp,proto3" json:"startTimestamp,omitempty"` // time that the schedule starts + Repeats int32 `protobuf:"varint,4,opt,name=repeats,proto3" json:"repeats,omitempty"` // time total time the job should trigger + Metadata map[string]string `protobuf:"bytes,5,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Payload *anypb.Any `protobuf:"bytes,6,opt,name=payload,proto3" json:"payload,omitempty"` } func (x *JobRecord) Reset() { @@ -84,6 +86,20 @@ func (x *JobRecord) GetRhythm() string { return "" } +func (x *JobRecord) GetStartTimestamp() int64 { + if x != nil { + return x.StartTimestamp + } + return 0 +} + +func (x *JobRecord) GetRepeats() int32 { + if x != nil { + return x.Repeats + } + return 0 +} + func (x *JobRecord) GetMetadata() map[string]string { if x != nil { return x.Metadata @@ -104,25 +120,30 @@ var file_proto_storage_job_proto_rawDesc = []byte{ 0x0a, 0x17, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x6a, 0x6f, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe2, 0x01, + 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa4, 0x02, 0x0a, 0x09, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x68, 0x79, 0x74, 0x68, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x06, 0x72, 0x68, 0x79, 0x74, 0x68, 0x6d, 0x12, 0x3c, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, - 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x73, 0x74, 0x6f, 0x72, - 0x61, 0x67, 0x65, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x2e, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, - 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x2e, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x07, 0x70, 0x61, - 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x3b, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, - 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, - 0x38, 0x01, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, 0x74, - 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x3b, - 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x06, 0x72, 0x68, 0x79, 0x74, 0x68, 0x6d, 0x12, 0x26, 0x0a, 0x0e, 0x73, 0x74, 0x61, 0x72, 0x74, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x0e, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, + 0x18, 0x0a, 0x07, 0x72, 0x65, 0x70, 0x65, 0x61, 0x74, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, + 0x52, 0x07, 0x72, 0x65, 0x70, 0x65, 0x61, 0x74, 0x73, 0x12, 0x3c, 0x0a, 0x08, 0x6d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x73, 0x74, + 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x2e, + 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x2e, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, + 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x07, + 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x3b, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x3a, 0x02, 0x38, 0x01, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, + 0x6f, 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, + 0x65, 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, + 0x65, 0x3b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var (