Skip to content

Commit

Permalink
Better handle of Dapr compatibility (#6)
Browse files Browse the repository at this point in the history
* Better handle TTL for Dapr compat.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Update .examples/cron_example.go

Co-authored-by: Josh van Leeuwen <me@joshvanl.dev>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Josh van Leeuwen <me@joshvanl.dev>
  • Loading branch information
artursouza and JoshVanL authored Mar 15, 2024
1 parent 90ed93a commit 7059bf0
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 119 deletions.
29 changes: 15 additions & 14 deletions .examples/cron_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"strconv"
"sync"
"syscall"
"time"

etcdcron "github.com/diagridio/go-etcd-cron"
"github.com/diagridio/go-etcd-cron/partitioning"
"google.golang.org/protobuf/types/known/anypb"
)

Expand All @@ -39,15 +41,15 @@ func main() {

log.Printf("starting hostId=%d for total of %d hosts and %d partitions", hostId, numHosts, numPartitions)

p, err := etcdcron.NewPartitioning(numPartitions, numHosts, hostId)
p, err := partitioning.NewPartitioning(numPartitions, numHosts, hostId)
if err != nil {
log.Fatal("fail to create partitioning", err)
}
cron, err := etcdcron.New(
etcdcron.WithNamespace(namespace),
etcdcron.WithPartitioning(p),
etcdcron.WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error {
log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value))
etcdcron.WithTriggerFunc(func(ctx context.Context, metadata map[string]string, payload *anypb.Any) error {
log.Printf("Trigger from pid %d: %s\n", os.Getpid(), string(payload.Value))
return nil
}),
)
Expand All @@ -72,51 +74,50 @@ func main() {

if os.Getenv("ADD") == "1" {
cron.AddJob(ctx, etcdcron.Job{
Name: "every-2s-b34w5y5hbwthjs",
Rhythm: "*/2 * * * * *",
Type: "stdout", // can be anything the client wants
Payload: &anypb.Any{Value: []byte("ev 2s")},
Name: "every-2s-dFG3F3DSGSGds",
Rhythm: "*/2 * * * * *",
StartTime: time.Time{}, // even seconds
Payload: &anypb.Any{Value: []byte("ev 2s even")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-2s-b34w5y5hbwthjs",
Rhythm: "*/2 * * * * *",
StartTime: time.Time{}.Add(time.Second), // odd seconds
Payload: &anypb.Any{Value: []byte("ev 2s odd")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-10s-bnsf45354wbdsnd",
Rhythm: "*/10 * * * * *",
Type: "stdout", // can be anything the client wants
Payload: &anypb.Any{Value: []byte("ev 10s")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-3s-mdhgm764324rqdg",
Rhythm: "*/3 * * * * *",
Type: "stdout", // can be anything the client wants
Payload: &anypb.Any{Value: []byte("ev 3s")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-4s-vdafbrtjnysh245",
Rhythm: "*/4 * * * * *",
Type: "stdout", // can be anything the client wants
Payload: &anypb.Any{Value: []byte("ev 4s")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-5s-adjbg43q5rbafbr44",
Rhythm: "*/5 * * * * *",
Type: "stdout", // can be anything the client wants
Payload: &anypb.Any{Value: []byte("ev 5s")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-6s-abadfh52jgdyj467",
Rhythm: "*/6 * * * * *",
Type: "stdout", // can be anything the client wants
Payload: &anypb.Any{Value: []byte("ev 6s")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-7s-bndasfbn4q55fgn",
Rhythm: "*/7 * * * * *",
Type: "stdout", // can be anything the client wants
Payload: &anypb.Any{Value: []byte("ev 7s")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-1s-then-expire-hadfh452erhh",
Rhythm: "*/1 * * * * *",
Type: "stdout", // can be anything the client wants
TTL: 10,
Payload: &anypb.Any{Value: []byte("ev 1s then expires after 10s")},
})
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ cron, _ := etcdcron.New(
cron.AddJob(Job{
Name: "job0",
Rhythm: "*/2 * * * * *",
Type: "my-job-type",
Metadata: map[string]string{
"type", "my-job-type"
},
Payload: &anypb.Any{Value: []byte("hello every 2s")},
})
```
Expand Down Expand Up @@ -69,7 +71,6 @@ cron, _ := etcdcron.New(
cron.AddJob(context.TODO(), Job{
Name: "job0",
Rhythm: "*/2 * * * * *",
Type: "my-job-type",
Payload: &anypb.Any{Value: []byte("hello every 2s")},
})
```
Expand Down
9 changes: 4 additions & 5 deletions cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Cron struct {
etcdErrorsHandler func(context.Context, Job, error)
errorsHandler func(context.Context, Job, error)
funcCtx func(context.Context, Job) context.Context
triggerFunc func(context.Context, string, *anypb.Any) error
triggerFunc func(context.Context, map[string]string, *anypb.Any) error
running bool
runWaitingGroup sync.WaitGroup
etcdclient *etcdclient.Client
Expand Down Expand Up @@ -124,7 +124,7 @@ func WithFuncCtx(f func(context.Context, Job) context.Context) CronOpt {
})
}

func WithTriggerFunc(f func(context.Context, string, *anypb.Any) error) CronOpt {
func WithTriggerFunc(f func(context.Context, map[string]string, *anypb.Any) error) CronOpt {
return CronOpt(func(cron *Cron) {
cron.triggerFunc = f
})
Expand Down Expand Up @@ -237,8 +237,7 @@ func (c *Cron) GetJob(jobName string) *Job {
return nil
}

//Avoids caller from modifying scheduler's job
return entry.Job.clone()
return &entry.Job
}

// Schedule adds a Job to the Cron to be run on the given schedule.
Expand Down Expand Up @@ -407,7 +406,7 @@ func (c *Cron) run(ctx context.Context) {
return
}

err = c.triggerFunc(ctx, e.Job.Type, e.Job.Payload)
err = c.triggerFunc(ctx, e.Job.Metadata, e.Job.Payload)
if err != nil {
go c.errorsHandler(ctx, e.Job, err)
return
Expand Down
86 changes: 46 additions & 40 deletions cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestStopCausesJobsToNotRun(t *testing.T) {

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
wg.Done()
return nil
}))
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestAddBeforeRunning(t *testing.T) {

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
if calledAlready {
return nil
}
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestAddWhileRunning(t *testing.T) {

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
wg.Done()
return nil
}))
Expand Down Expand Up @@ -150,7 +150,7 @@ func TestSnapshotEntries(t *testing.T) {

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
wg.Done()
return nil
}))
Expand Down Expand Up @@ -190,8 +190,8 @@ func TestDelayedAdd(t *testing.T) {

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
if s == "noop" {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
if m["op"] == "noop" {
return nil
}
if called {
Expand All @@ -207,9 +207,9 @@ func TestDelayedAdd(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
cron.AddJob(ctx, Job{
Name: "test-noop",
Rhythm: "@every 1s",
Type: "noop",
Name: "test-noop",
Rhythm: "@every 1s",
Metadata: singleMetadata("op", "noop"),
})

cron.Start(ctx)
Expand Down Expand Up @@ -244,8 +244,8 @@ func TestMultipleEntries(t *testing.T) {

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
if s == "return-nil" {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
if m["op"] == "return-nil" {
return nil
}

Expand All @@ -257,18 +257,18 @@ func TestMultipleEntries(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
cron.AddJob(ctx, Job{
Name: "test-multiple-1",
Rhythm: "0 0 0 1 1 ?",
Type: "return-nil",
Name: "test-multiple-1",
Rhythm: "0 0 0 1 1 ?",
Metadata: singleMetadata("op", "return-nil"),
})
cron.AddJob(ctx, Job{
Name: "test-multiple-2",
Rhythm: "* * * * * ?",
})
cron.AddJob(ctx, Job{
Name: "test-multiple-3",
Rhythm: "0 0 0 31 12 ?",
Type: "return-nil",
Name: "test-multiple-3",
Rhythm: "0 0 0 31 12 ?",
Metadata: singleMetadata("op", "return-nil"),
})
cron.AddJob(ctx, Job{
Name: "test-multiple-4",
Expand All @@ -295,8 +295,8 @@ func TestRunningJobTwice(t *testing.T) {

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
if s == "return-nil" {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
if m["op"] == "return-nil" {
return nil
}

Expand All @@ -308,14 +308,14 @@ func TestRunningJobTwice(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
cron.AddJob(ctx, Job{
Name: "test-twice-1",
Rhythm: "0 0 0 1 1 ?",
Type: "return-nil",
Name: "test-twice-1",
Rhythm: "0 0 0 1 1 ?",
Metadata: singleMetadata("op", "return-nil"),
})
cron.AddJob(ctx, Job{
Name: "test-twice-2",
Rhythm: "0 0 0 31 12 ?",
Type: "return-nil",
Name: "test-twice-2",
Rhythm: "0 0 0 31 12 ?",
Metadata: singleMetadata("op", "return-nil"),
})
cron.AddJob(ctx, Job{
Name: "test-twice-3",
Expand All @@ -341,8 +341,8 @@ func TestRunningMultipleSchedules(t *testing.T) {

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
if s == "return-nil" {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
if m["op"] == "return-nil" {
return nil
}

Expand All @@ -355,22 +355,22 @@ func TestRunningMultipleSchedules(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
cron.AddJob(ctx, Job{
Name: "test-mschedule-1",
Rhythm: "0 0 0 1 1 ?",
Type: "return-nil",
Name: "test-mschedule-1",
Rhythm: "0 0 0 1 1 ?",
Metadata: singleMetadata("op", "return-nil"),
})
cron.AddJob(ctx, Job{
Name: "test-mschedule-2",
Rhythm: "0 0 0 31 12 ?",
Type: "return-nil",
Name: "test-mschedule-2",
Rhythm: "0 0 0 31 12 ?",
Metadata: singleMetadata("op", "return-nil"),
})
cron.AddJob(ctx, Job{
Name: "test-mschedule-3",
Rhythm: "* * * * * ?",
})
cron.schedule(rhythm.Every(time.Minute), &Job{Name: "test-mschedule-4", Type: "return-nil"})
cron.schedule(rhythm.Every(time.Minute), &Job{Name: "test-mschedule-4", Metadata: singleMetadata("op", "return-nil")})
cron.schedule(rhythm.Every(time.Second), &Job{Name: "test-mschedule-5"})
cron.schedule(rhythm.Every(time.Hour), &Job{Name: "test-mschedule-6", Type: "return-nil"})
cron.schedule(rhythm.Every(time.Hour), &Job{Name: "test-mschedule-6", Metadata: singleMetadata("op", "return-nil")})

cron.Start(ctx)
defer func() {
Expand All @@ -397,7 +397,7 @@ func TestLocalTimezone(t *testing.T) {

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
if called.Add(1) > 1 {
return nil
}
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestJob(t *testing.T) {

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
if calledAlready {
return nil
}
Expand Down Expand Up @@ -506,7 +506,7 @@ func TestCron_Parallel(t *testing.T) {

cron1, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
wg.Done()
return nil
}))
Expand All @@ -521,7 +521,7 @@ func TestCron_Parallel(t *testing.T) {

cron2, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
wg.Done()
return nil
}))
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestTTL(t *testing.T) {

cron, err := New(
WithNamespace(randomNamespace()),
WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error {
WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error {
firedOnce.Store(true)
wg.Done()
return nil
Expand Down Expand Up @@ -613,3 +613,9 @@ func stop(cron *Cron, cancel context.CancelFunc) chan bool {
func randomNamespace() string {
return uuid.New().String()
}

func singleMetadata(key, value string) map[string]string {
m := map[string]string{}
m[key] = value
return m
}
Loading

0 comments on commit 7059bf0

Please sign in to comment.