Skip to content

Commit

Permalink
Handle start time & repeats (#7)
Browse files Browse the repository at this point in the history
* Add support for delayed start.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Remove unused struct for etcd mutex.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Add support for counter (repeats).

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Add custom job delete logic.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Update rhythm/spec_test.go

Co-authored-by: Josh van Leeuwen <me@joshvanl.dev>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Update counting/etcd.go

Co-authored-by: Josh van Leeuwen <me@joshvanl.dev>
Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Addressed comments.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Simplify next calculation for constant delay.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Using fmt.Errorf instead of errors.Errorf

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Co-authored-by: Josh van Leeuwen <me@joshvanl.dev>
  • Loading branch information
artursouza and JoshVanL authored Mar 21, 2024
1 parent 7059bf0 commit c0f44b7
Show file tree
Hide file tree
Showing 17 changed files with 704 additions and 123 deletions.
50 changes: 38 additions & 12 deletions .examples/cron_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"strconv"
Expand Down Expand Up @@ -48,9 +49,18 @@ func main() {
cron, err := etcdcron.New(
etcdcron.WithNamespace(namespace),
etcdcron.WithPartitioning(p),
etcdcron.WithTriggerFunc(func(ctx context.Context, metadata map[string]string, payload *anypb.Any) error {
etcdcron.WithTriggerFunc(func(ctx context.Context, metadata map[string]string, payload *anypb.Any) (etcdcron.TriggerResult, error) {
if metadata["failure"] == "yes" {
// Failure does not trigger the errorsHandler() callback. It just skips the counter update.
return etcdcron.Failure, nil
}
if metadata["stop"] == "random" {
if rand.Int()%3 == 0 {
return etcdcron.Delete, nil
}
}
log.Printf("Trigger from pid %d: %s\n", os.Getpid(), string(payload.Value))
return nil
return etcdcron.OK, nil
}),
)
if err != nil {
Expand All @@ -72,33 +82,49 @@ func main() {
wg.Done()
}()

now := time.Now()
if os.Getenv("ADD") == "1" {
cron.AddJob(ctx, etcdcron.Job{
Name: "every-2s-dFG3F3DSGSGds",
Rhythm: "*/2 * * * * *",
StartTime: time.Time{}, // even seconds
Payload: &anypb.Any{Value: []byte("ev 2s even")},
Rhythm: "@every 2s",
StartTime: now,
Payload: &anypb.Any{Value: []byte("ev 2s from now")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-2s-b34w5y5hbwthjs",
Rhythm: "*/2 * * * * *",
StartTime: time.Time{}.Add(time.Second), // odd seconds
Payload: &anypb.Any{Value: []byte("ev 2s odd")},
Rhythm: "@every 2s",
StartTime: now.Add(time.Second), // odd seconds
Payload: &anypb.Any{Value: []byte("ev 2s from now+1s")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-10s-bnsf45354wbdsnd",
Rhythm: "*/10 * * * * *",
Payload: &anypb.Any{Value: []byte("ev 10s")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-3s-mdhgm764324rqdg",
Rhythm: "*/3 * * * * *",
Payload: &anypb.Any{Value: []byte("ev 3s")},
Name: "every-3s-mdhgm764324rqdg",
Rhythm: "@every 3s",
StartTime: time.Now().Add(10 * time.Second),
Payload: &anypb.Any{Value: []byte("waits 10s then ev 3s")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-4s-vdafbrtjnysh245",
Rhythm: "*/4 * * * * *",
Payload: &anypb.Any{Value: []byte("ev 4s")},
Repeats: 3, // Only triggers 3 times
Payload: &anypb.Any{Value: []byte("ev 4s 3 times only")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-4s-nmdjfgx35u7jfsgjgsf",
Rhythm: "*/4 * * * * *",
Repeats: 3, // Only triggers 3 times
Metadata: map[string]string{"failure": "yes"},
Payload: &anypb.Any{Value: []byte("ev 4s never expires because it returns a failure condition")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-1s-agdg42y645ydfdha",
Rhythm: "@every 1s",
Metadata: map[string]string{"stop": "random"},
Payload: &anypb.Any{Value: []byte("ev 1s with random stop")},
})
cron.AddJob(ctx, etcdcron.Job{
Name: "every-5s-adjbg43q5rbafbr44",
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ c, _ := etcdclientv3.New(etcdclientv3.Config{
Endpoints: []string{"etcd-host1:2379", "etcd-host2:2379"},
})
cron, _ := etcdcron.New(
WithEtcdMutexBuilder(c),
WithEtcdClient(c),
WithNamespace("my-example"), // multi-tenancy
WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error {
log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value))
Expand Down
74 changes: 74 additions & 0 deletions counting/counting_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright (c) 2024 Diagrid Inc.
Licensed under the MIT License.
*/

package counting

import (
"context"
"testing"
"time"

"github.com/diagridio/go-etcd-cron/partitioning"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
etcdclientv3 "go.etcd.io/etcd/client/v3"
)

const defaultEtcdEndpoint = "127.0.0.1:2379"

func TestCounterTTL(t *testing.T) {
ctx := context.TODO()
organizer := partitioning.NewOrganizer(randomNamespace(), partitioning.NoPartitioning())
etcdClient, err := etcdclientv3.New(etcdclientv3.Config{
Endpoints: []string{defaultEtcdEndpoint},
})
require.NoError(t, err)

key := organizer.CounterPath(0, "count")
// This counter will expire keys 1s after their next scheduled trigger.
counter := NewEtcdCounter(etcdClient, key, time.Second)

value, updated, err := counter.Add(ctx, 1, time.Now().Add(time.Second))
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 1, value)

value, updated, err = counter.Add(ctx, 2, time.Now().Add(time.Second))
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 3, value)

time.Sleep(time.Second)

value, updated, err = counter.Add(ctx, -4, time.Now().Add(time.Second))
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, -1, value)

// Counter expires 1 second after the next scheduled trigger (in this test's config)
time.Sleep(3 * time.Second)

// Counter should have expired but the in-memory value continues.
// Even if key is expired in the db, a new operation will set it again, with a new TTL.
value, updated, err = counter.Add(ctx, 0, time.Now().Add(time.Second))
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, -1, value)

// Counter expires 1 second after the next scheduled trigger.
time.Sleep(3 * time.Second)

// A new instance will start from 0 since the db record is expired.
counter = NewEtcdCounter(etcdClient, key, time.Second)
value, updated, err = counter.Add(ctx, 0, time.Now().Add(time.Second))
require.NoError(t, err)
assert.True(t, updated)
assert.Equal(t, 0, value)
}

func randomNamespace() string {
return uuid.New().String()
}
79 changes: 79 additions & 0 deletions counting/etcd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright (c) 2024 Diagrid Inc.
Licensed under the MIT License.
*/

package counting

import (
"context"
"fmt"
"strconv"
"time"

etcdclientv3 "go.etcd.io/etcd/client/v3"
)

type Counter interface {
// Applies by the given delta (+ or -) and return the updated value.
// Count has a ttl calculated using the next tick's time.
// Returns (updated value, true if value was updated in memory, err if any error happened)
// It is possible that the value is updated but an error occurred while trying to persist it.
Add(context.Context, int, time.Time) (int, bool, error)
}

// It keeps a cache of the value and updates async.
// It works assuming there cannot be two concurrent writes to the same key.
// Concurrency is handled at the job level, which makes this work.
type etcdcounter struct {
etcdclient *etcdclientv3.Client
key string

loaded bool
value int
ttlOffset time.Duration
}

func NewEtcdCounter(c *etcdclientv3.Client, key string, ttlOffset time.Duration) Counter {
return &etcdcounter{
etcdclient: c,
key: key,
ttlOffset: ttlOffset,
}
}

func (c *etcdcounter) Add(ctx context.Context, delta int, next time.Time) (int, bool, error) {
if !c.loaded {
// First, load the key's value.
res, err := c.etcdclient.KV.Get(ctx, c.key)
if err != nil {
return 0, false, err
}
if len(res.Kvs) == 0 {
c.value = 0
c.loaded = true
} else {
if res.Kvs[0].Value == nil {
return 0, false, fmt.Errorf("nil value for key %s", c.key)
}
if len(res.Kvs[0].Value) == 0 {
return 0, false, fmt.Errorf("empty value for key %s", c.key)
}

c.value, err = strconv.Atoi(string(res.Kvs[0].Value))
if err != nil {
return 0, false, err
}
}
}

c.value += delta
// Create a lease
ttl := time.Until(next.Add(c.ttlOffset))
lease, err := c.etcdclient.Grant(ctx, int64(ttl.Seconds()))
if err != nil {
return c.value, true, err
}
_, err = c.etcdclient.KV.Put(ctx, c.key, strconv.Itoa(c.value), etcdclientv3.WithLease(lease.ID))
return c.value, true, err
}
Loading

0 comments on commit c0f44b7

Please sign in to comment.