From 04fed7ec182172129d9910e1b640b2141771bfd3 Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Wed, 13 Mar 2024 10:23:06 -0700 Subject: [PATCH] Persist and partition job (#3) * WIP: change lock to be per job. Signed-off-by: Artur Souza * Lock per job + namespace handing. Signed-off-by: Artur Souza * Partitioning of jobs. Signed-off-by: Artur Souza * job store. Signed-off-by: Artur Souza * GH workflow to test. Signed-off-by: Artur Souza * Remove dependabot automerge workflow. Signed-off-by: Artur Souza * Record job using protobuf. Signed-off-by: Artur Souza * Handle TTL. Signed-off-by: Artur Souza * Some random fixes. Signed-off-by: Artur Souza * Fix TestLocalTimezone flakiness. Signed-off-by: Artur Souza * Fix more flaky tests. Signed-off-by: Artur Souza * Addressed comments. Signed-off-by: Artur Souza * Stop via context only. Signed-off-by: Artur Souza * Remove update routine. Signed-off-by: Artur Souza * nit: comment Signed-off-by: Artur Souza * Fix detected race conditions. Signed-off-by: Artur Souza * Update RADME.md and docs.go Signed-off-by: Artur Souza * Addressing more comments. Signed-off-by: Artur Souza * Reduce locking contention for cron dist mutex reuse. Signed-off-by: Artur Souza * Does not try to recover from panic. Signed-off-by: Artur Souza * Allow test to take longer. Signed-off-by: Artur Souza --------- Signed-off-by: Artur Souza --- .github/workflows/build.yml | 45 ++++ .github/workflows/dependabot.yml | 17 -- Makefile | 45 ++++ README.md | 47 +++- collector.go | 115 +++++++++ cron.go | 356 +++++++++++++++---------- cron_test.go | 431 ++++++++++++++++++++++--------- doc.go | 82 ++++-- etcd.go | 27 +- examples/cron_example.go | 141 ++++++++-- go.mod | 30 ++- go.sum | 50 ++-- job.pb.go | 183 +++++++++++++ local_mutexer.go | 60 +++++ mutex_cache.go | 64 +++++ organizer.go | 42 +++ partition.go | 90 +++++++ partition_test.go | 155 +++++++++++ proto/job.proto | 21 ++ store.go | 203 +++++++++++++++ 20 files changed, 1841 insertions(+), 363 deletions(-) create mode 100644 .github/workflows/build.yml delete mode 100644 .github/workflows/dependabot.yml create mode 100644 Makefile create mode 100644 collector.go create mode 100644 job.pb.go create mode 100644 local_mutexer.go create mode 100644 mutex_cache.go create mode 100644 organizer.go create mode 100644 partition.go create mode 100644 partition_test.go create mode 100644 proto/job.proto create mode 100644 store.go diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..7d96519 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,45 @@ +# Copyright (c) 2024 Diagrid Inc. +# Licensed under the MIT License. + +name: "Build and testing" + +on: + push: + branches: + - master + pull_request: + branches: + - master + +jobs: + test: + name: Build and Test + runs-on: ubuntu-latest + timeout-minutes: 30 + services: + etcd: + image: quay.io/coreos/etcd:v3.5.5 + env: + ETCD_ADVERTISE_CLIENT_URLS: http://0.0.0.0:2379 + ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379 + ports: + - 2379:2379 + env: + GOOS: linux + GOARCH: amd64 + GOPROXY: https://proxy.golang.org + steps: + - name: Check out code + uses: actions/checkout@v4 + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version-file: 'go.mod' + cache: 'false' + - name: Check mod tidy + run: | + go mod tidy + git diff --exit-code ./go.mod # check no changes + git diff --exit-code ./go.sum # check no changes + - name: Run test + run: go test -v -timeout 300s --race \ No newline at end of file diff --git a/.github/workflows/dependabot.yml b/.github/workflows/dependabot.yml deleted file mode 100644 index 2e5ee58..0000000 --- a/.github/workflows/dependabot.yml +++ /dev/null @@ -1,17 +0,0 @@ -name: Dependabot auto-approve -on: pull_request - -permissions: - # Mandatory for both the auto-merge enabling and approval steps - pull-requests: write - # Mandatory for the auto-merge enabling step - contents: write - -jobs: - dependabot: - runs-on: ubuntu-latest - steps: - - name: Automatically merge Dependabot PRs - uses: scalingo/ghaction-dependabot-automerge@v1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0717710 --- /dev/null +++ b/Makefile @@ -0,0 +1,45 @@ +################################################################################ +# Target: modtidy # +################################################################################ +.PHONY: modtidy +modtidy: + go mod tidy + +################################################################################ +# Target: gen-proto # +################################################################################ +PROTOC ?=protoc +PROTOC_VERSION = 3.21.12 +PROTOBUF_SUITE_VERSION = 21.12 +PROTOC_GEN_GO_VERSION = v1.28.1 + +PROTOC_GEN_GO_GRPC_VERSION = 1.2.0 + +PROTOS:=$(shell ls proto) +PROTO_PREFIX:=github.com/diagridio/go-etcd-cron + +.PHONY: check-proto-version +check-proto-version: ## Checking the version of proto related tools + @test "$(shell protoc --version)" = "libprotoc $(PROTOC_VERSION)" \ + || { echo "please use protoc $(PROTOC_VERSION) (protobuf $(PROTOBUF_SUITE_VERSION)) to generate proto"; exit 1; } + + @test "$(shell protoc-gen-go-grpc --version)" = "protoc-gen-go-grpc $(PROTOC_GEN_GO_GRPC_VERSION)" \ + || { echo "please use protoc-gen-go-grpc $(PROTOC_GEN_GO_GRPC_VERSION) to generate proto"; exit 1; } + + @test "$(shell protoc-gen-go --version 2>&1)" = "protoc-gen-go $(PROTOC_GEN_GO_VERSION)" \ + || { echo "please use protoc-gen-go $(PROTOC_GEN_GO_VERSION) to generate proto"; exit 1; } + +# Generate archive files for each binary +# $(1): the binary name to be archived +define genProtoc +.PHONY: gen-proto-$(1) +gen-proto-$(1): + $(PROTOC) --go_out=. --go_opt=module=$(PROTO_PREFIX) --go-grpc_out=. --go-grpc_opt=require_unimplemented_servers=false,module=$(PROTO_PREFIX) ./proto/$(1) +endef + +$(foreach ITEM,$(PROTOS),$(eval $(call genProtoc,$(ITEM)))) + +GEN_PROTOS:=$(foreach ITEM,$(PROTOS),gen-proto-$(ITEM)) + +.PHONY: gen-proto +gen-proto: check-proto-version $(GEN_PROTOS) modtidy \ No newline at end of file diff --git a/README.md b/README.md index b78bc67..3f523d3 100644 --- a/README.md +++ b/README.md @@ -10,11 +10,16 @@ This package aims at implementing a distributed and fault tolerant cron in order * Run an identical process on several hosts * Each of these process instantiate a subset of the cron jobs -* Ensure only one of these processes executes a job +* Allow concurrent instances to execute the same job +* Ensure only one of these processes can trigger a job * Number of cron jobs can scale by increasing the number of hosts -* Eventualy, a trigger can be missed due to change in leadership for a cron job +* Jobs are persisted and loaded from etcd +* Jobs can have a TTL and be auto-deleted +* Jobs can have a delayed start +* Jobs can have a max run count +* Cron can be used for multiple tenants, via namespacing -## Etcd Initialization +## Getting started By default the library creates an etcd client on `127.0.0.1:2379` @@ -22,13 +27,19 @@ By default the library creates an etcd client on `127.0.0.1:2379` c, _ := etcdcron.NewEtcdMutexBuilder(clientv3.Config{ Endpoints: []string{"etcd-host1:2379", "etcd-host2:2379"}, }) -cron, _ := etcdcron.New(WithEtcdMutexBuilder(c)) +cron, _ := etcdcron.New( + WithEtcdMutexBuilder(c), + WithNamespace("my-example"), // multi-tenancy + WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error { + log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value)) + return nil + }), + ) cron.AddJob(Job{ Name: "job0", Rhythm: "*/2 * * * * *", - Func: func(ctx context.Context) error { - // Handler - }, + Type: "my-job-type", + Payload: &anypb.Any{Value: []byte("hello every 2s")}, }) ``` @@ -46,17 +57,31 @@ etcdErrorsHandler := func(ctx context.Context, job etcdcron.Job, err error) { cron, _ := etcdcron.New( WithErrorsHandler(errorsHandler), WithEtcdErrorsHandler(etcdErrorsHandler), + WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error { + log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value)) + return nil + }), ) -cron.AddJob(Job{ +cron.AddJob(context.TODO(), Job{ Name: "job0", Rhythm: "*/2 * * * * *", - Func: func(ctx context.Context) error { - // Handler - }, + Type: "my-job-type", + Payload: &anypb.Any{Value: []byte("hello every 2s")}, }) ``` +## Tests + +Pre-requisites to run the tests locally: +- Run etcd locally via one of the options below: + - Locally: [Install etcd](https://etcd.io/docs/v3.4/install/) then run `etcd --logger=zap` + - Docker: [Running a single node etcd](https://etcd.io/docs/v3.5/op-guide/container/#running-a-single-node-etcd-1) + +```bash +go test -v --race +``` + ## History This is a fork of [https://github.com/Scalingo/go-etcd-cron](https://github.com/Scalingo/go-etcd-cron), which had been based on [https://github.com/robfig/cron](https://github.com/robfig/cron). diff --git a/collector.go b/collector.go new file mode 100644 index 0000000..c90f0ec --- /dev/null +++ b/collector.go @@ -0,0 +1,115 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package etcdcron + +import ( + "context" + "sync" + "time" +) + +// collector garbage collects items after a globally configured TTL. +type Collector struct { + ttl int64 // time to wait to perform collection + bufferTime int64 // arbitrary delay to allow buffering of operations + + running bool + mutex sync.RWMutex + operations []*collectorEntry + changed chan bool + runWaitingGroup sync.WaitGroup +} + +type collectorEntry struct { + expiration int64 + op func(ctx context.Context) +} + +func NewCollector(ttl int64, bufferTime int64) *Collector { + return &Collector{ + ttl: ttl, + bufferTime: bufferTime, + running: false, + changed: make(chan bool), + operations: []*collectorEntry{}, + } +} + +func (c *Collector) Start(ctx context.Context) { + if c.running { + return + } + + c.running = true + + doIt := func(ctx context.Context) { + c.mutex.Lock() + defer c.mutex.Unlock() + + now := time.Now().Unix() + nextStartIndex := -1 + for i, o := range c.operations { + if o.expiration <= now { + o.op(ctx) + } else { + nextStartIndex = i + break + } + } + + if nextStartIndex >= 0 { + c.operations = c.operations[nextStartIndex:] + return + } + c.operations = []*collectorEntry{} + } + + waitTimeForNext := func() time.Duration { + c.mutex.RLock() + defer c.mutex.RUnlock() + + now := time.Now().Unix() + if len(c.operations) > 0 { + diff := c.operations[0].expiration - now + if diff <= 0 { + return 0 + } + + return time.Duration(diff) + } + + // Some arbitrarily large number that gives us certainty that some record will be added. + return 24 * time.Hour + } + + c.runWaitingGroup.Add(1) + go func(ctx context.Context) { + for { + doIt(ctx) + select { + case <-time.After(waitTimeForNext() + time.Duration(c.bufferTime)): + continue + case <-c.changed: + continue + case <-ctx.Done(): + c.runWaitingGroup.Done() + return + } + } + }(ctx) +} + +func (c *Collector) Add(op func(ctx context.Context)) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.operations = append(c.operations, &collectorEntry{ + expiration: time.Now().Unix() + c.ttl, + }) +} + +func (c *Collector) Wait() { + c.runWaitingGroup.Wait() +} diff --git a/cron.go b/cron.go index d80d34a..4e096e9 100644 --- a/cron.go +++ b/cron.go @@ -1,40 +1,51 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + package etcdcron import ( "context" "fmt" - "google.golang.org/protobuf/types/known/anypb" "log" - "regexp" - "runtime/debug" "sort" "strings" "sync" "time" - "github.com/iancoleman/strcase" "github.com/pkg/errors" etcdclient "go.etcd.io/etcd/client/v3" + anypb "google.golang.org/protobuf/types/known/anypb" ) const ( defaultEtcdEndpoint = "127.0.0.1:2379" + defaultNamespace = "etcd_cron" ) // Cron keeps track of any number of entries, invoking the associated func as // specified by the schedule. It may be started, stopped, and the entries may // be inspected while running. type Cron struct { - entries []*Entry - entriesMutex sync.RWMutex - stop chan struct{} - add chan *Entry - snapshot chan []*Entry - etcdErrorsHandler func(context.Context, Job, error) - errorsHandler func(context.Context, Job, error) - funcCtx func(context.Context, Job) context.Context - running bool - etcdclient EtcdMutexBuilder + namespace string + pendingOperations []func(context.Context) *Entry + pendingOperationsMutex sync.RWMutex + liveOperation chan func(ctx context.Context) *Entry + entries map[string]*Entry + entriesMutex sync.RWMutex + snapshot chan []*Entry + etcdErrorsHandler func(context.Context, Job, error) + errorsHandler func(context.Context, Job, error) + funcCtx func(context.Context, Job) context.Context + triggerFunc func(context.Context, string, *anypb.Any) error + running bool + runWaitingGroup sync.WaitGroup + etcdclient EtcdMutexBuilder + jobStore JobStore + organizer Organizer + partitioning Partitioning + collector *Collector } // Job contains 3 mandatory options to define a job @@ -43,31 +54,12 @@ type Job struct { Name string // Cron-formatted rhythm (ie. 0,10,30 1-5 0 * * *) Rhythm string - // Routine method - Func func(context.Context) error - - Repeats int32 - DueTime string - TTL string - Data *anypb.Any - Metadata map[string]string -} - -func (j Job) Run(ctx context.Context) error { - return j.Func(ctx) -} - -var ( - nonAlphaNumerical = regexp.MustCompile("[^a-z0-9_]") -) - -func (j Job) canonicalName() string { - return strcase.ToSnake( - nonAlphaNumerical.ReplaceAllString( - strings.ToLower(j.Name), - "_", - ), - ) + // The type of trigger that client undertsands + Type string + // The payload containg all the information for the trigger + Payload *anypb.Any + // Optional number of seconds until this job expires (if > 0) + TTL int64 } // The Schedule describes a job's duty cycle. @@ -92,6 +84,9 @@ type Entry struct { // The Job o run. Job Job + + // Prefix for the ticker mutex + distMutexPrefix string } // byTime is a wrapper for sorting the entry array by time @@ -133,24 +128,51 @@ func WithEtcdMutexBuilder(b EtcdMutexBuilder) CronOpt { }) } +func WithJobStore(s JobStore) CronOpt { + return CronOpt(func(cron *Cron) { + cron.jobStore = s + }) +} + func WithFuncCtx(f func(context.Context, Job) context.Context) CronOpt { return CronOpt(func(cron *Cron) { cron.funcCtx = f }) } +func WithTriggerFunc(f func(context.Context, string, *anypb.Any) error) CronOpt { + return CronOpt(func(cron *Cron) { + cron.triggerFunc = f + }) +} + +func WithNamespace(n string) CronOpt { + return CronOpt(func(cron *Cron) { + cron.namespace = n + }) +} + +func WithPartitioning(p Partitioning) CronOpt { + return CronOpt(func(cron *Cron) { + cron.partitioning = p + }) +} + // New returns a new Cron job runner. func New(opts ...CronOpt) (*Cron, error) { cron := &Cron{ - entries: nil, - add: make(chan *Entry), - stop: make(chan struct{}), - snapshot: make(chan []*Entry), - running: false, + pendingOperations: []func(context.Context) *Entry{}, + liveOperation: make(chan func(context.Context) *Entry), + entries: map[string]*Entry{}, + snapshot: make(chan []*Entry), + running: false, } for _, opt := range opts { opt(cron) } + if cron.partitioning == nil { + cron.partitioning = NoPartitioning() + } if cron.etcdclient == nil { etcdClient, err := NewEtcdMutexBuilder(etcdclient.Config{ Endpoints: []string{defaultEtcdEndpoint}, @@ -170,17 +192,53 @@ func New(opts ...CronOpt) (*Cron, error) { log.Printf("[etcd-cron] error when handling '%v' job: %v", j.Name, err) } } + if cron.namespace == "" { + cron.namespace = defaultNamespace + } + cron.organizer = NewOrganizer(cron.namespace, cron.partitioning) + if cron.jobStore == nil { + cron.jobStore = cron.etcdclient.NewJobStore( + cron.organizer, + cron.partitioning, + func(ctx context.Context, j Job) error { + return cron.scheduleJob(j) + }, + func(ctx context.Context, s string) error { + cron.killJob(s) + return nil + }) + } + + cron.collector = NewCollector(int64(time.Hour), int64(time.Minute)) return cron, nil } -// AddFunc adds a Job to the Cron to be run on the given schedule. -func (c *Cron) AddJob(job Job) error { - schedule, err := Parse(job.Rhythm) - if err != nil { - return err +// AddJob adds a Job. +func (c *Cron) AddJob(ctx context.Context, job Job) error { + if c.jobStore == nil { + return fmt.Errorf("cannot persist job: no job store configured") } - c.Schedule(schedule, job) - return nil + return c.jobStore.Put(ctx, job) +} + +// DeleteJob removes a job. +func (c *Cron) DeleteJob(ctx context.Context, jobName string) error { + if c.jobStore == nil { + return fmt.Errorf("cannot delete job: no job store configured") + } + return c.jobStore.Delete(ctx, jobName) +} + +func (c *Cron) killJob(name string) { + c.appendOperation(func(ctx context.Context) *Entry { + _, ok := c.entries[name] + if !ok { + return nil + } + + delete(c.entries, name) + return nil + }) } // GetJob retrieves a job by name. @@ -188,111 +246,160 @@ func (c *Cron) GetJob(jobName string) *Job { c.entriesMutex.RLock() defer c.entriesMutex.RUnlock() - for _, entry := range c.entries { - if entry.Job.Name == jobName { - return &entry.Job - } + entry, ok := c.entries[jobName] + if !ok || (entry == nil) { + return nil } - return nil -} - -// DeleteJob deletes a job by name. -func (c *Cron) DeleteJob(jobName string) error { - c.entriesMutex.Lock() - defer c.entriesMutex.Unlock() - var updatedEntries []*Entry - found := false - for _, entry := range c.entries { - if entry.Job.Name == jobName { - found = true - continue - } - // Keep the entries that don't match the specified jobName - updatedEntries = append(updatedEntries, entry) - } - if !found { - return fmt.Errorf("job not found: %s", jobName) - } - c.entries = updatedEntries - return nil + return &entry.Job } func (c *Cron) ListJobsByPrefix(prefix string) []*Job { - c.entriesMutex.RLock() - defer c.entriesMutex.RUnlock() - var appJobs []*Job + c.entriesMutex.RLock() for _, entry := range c.entries { if strings.HasPrefix(entry.Job.Name, prefix) { // Job belongs to the specified app_id appJobs = append(appJobs, &entry.Job) } } + c.entriesMutex.RUnlock() return appJobs } // Schedule adds a Job to the Cron to be run on the given schedule. -func (c *Cron) Schedule(schedule Schedule, job Job) { +func (c *Cron) scheduleJob(job Job) error { + s, err := Parse(job.Rhythm) + if err != nil { + return err + } + + return c.schedule(s, job) +} + +// Schedule adds a Job to the Cron to be run on the given schedule. +func (c *Cron) schedule(schedule Schedule, job Job) error { + partitionId := c.partitioning.CalculatePartitionId(job.Name) + if !c.partitioning.CheckPartitionLeader(partitionId) { + // It means the partitioning changed and persisted jobs are in the wrong partition now. + return fmt.Errorf("host does not own partition %d", partitionId) + } + entry := &Entry{ - Schedule: schedule, - Job: job, + Schedule: schedule, + Job: job, + Prev: time.Unix(0, 0), + distMutexPrefix: c.organizer.TicksPath(partitionId) + "/", } + + c.appendOperation(func(ctx context.Context) *Entry { + c.entries[entry.Job.Name] = entry + return entry + }) + return nil +} + +func (c *Cron) appendOperation(op func(ctx context.Context) *Entry) { if !c.running { - c.entriesMutex.Lock() - c.entries = append(c.entries, entry) - c.entriesMutex.Unlock() + c.pendingOperationsMutex.Lock() + defer c.pendingOperationsMutex.Unlock() + + c.pendingOperations = append(c.pendingOperations, op) return } - c.add <- entry + c.liveOperation <- op } // Entries returns a snapshot of the cron entries. func (c *Cron) Entries() []*Entry { if c.running { - c.entriesMutex.RLock() - defer c.entriesMutex.RUnlock() - c.snapshot <- nil x := <-c.snapshot return x } - return c.entrySnapshot() + + c.entriesMutex.RLock() + defer c.entriesMutex.RUnlock() + entries := []*Entry{} + for _, e := range c.entries { + entries = append(entries, &Entry{ + Schedule: e.Schedule, + Next: e.Next, + Prev: e.Prev, + Job: e.Job, + }) + } + return entries } // Start the cron scheduler in its own go-routine. -func (c *Cron) Start(ctx context.Context) { +func (c *Cron) Start(ctx context.Context) error { + err := c.jobStore.Start(ctx) + if err != nil { + return err + } + c.collector.Start(ctx) c.running = true + c.runWaitingGroup.Add(1) go c.run(ctx) + return nil } // Run the scheduler.. this is private just due to the need to synchronize // access to the 'running' state variable. func (c *Cron) run(ctx context.Context) { + localMutexer := NewMutexer(c.collector) + mutexStore := NewMutexStore(c.etcdclient, c.collector) // Figure out the next activation times for each entry. now := time.Now().Local() - for _, entry := range c.entries { - entry.Next = entry.Schedule.Next(now) + + entries := []*Entry{} + + // Pending operations only matter before running, ignored afterwards. + c.pendingOperationsMutex.Lock() + c.entriesMutex.Lock() + for _, op := range c.pendingOperations { + newEntry := op(ctx) + if newEntry != nil { + newEntry.Next = newEntry.Schedule.Next(now) + } } + for _, e := range c.entries { + entries = append(entries, e) + } + c.entriesMutex.Unlock() + c.pendingOperations = []func(context.Context) *Entry{} + c.pendingOperationsMutex.Unlock() for { - // Determine the next entry to run. - sort.Sort(byTime(c.entries)) + sort.Sort(byTime(entries)) var effective time.Time - if len(c.entries) == 0 || c.entries[0].Next.IsZero() { + if len(entries) == 0 || entries[0].Next.IsZero() { // If there are no entries yet, just sleep - it still handles new entries // and stop requests. effective = now.AddDate(10, 0, 0) } else { - effective = c.entries[0].Next + effective = entries[0].Next } select { + case op := <-c.liveOperation: + c.entriesMutex.Lock() + newEntry := op(ctx) + if newEntry != nil { + newEntry.Next = newEntry.Schedule.Next(now) + } + entries = []*Entry{} + for _, e := range c.entries { + entries = append(entries, e) + } + c.entriesMutex.Unlock() + case now = <-time.After(effective.Sub(now)): // Run every entry whose next time was this effective time. - for _, e := range c.entries { + for _, e := range entries { if e.Next != effective { break } @@ -300,31 +407,25 @@ func (c *Cron) run(ctx context.Context) { e.Next = e.Schedule.Next(effective) go func(ctx context.Context, e *Entry) { - defer func() { - r := recover() - if r != nil { - err, ok := r.(error) - if !ok { - err = fmt.Errorf("%v", r) - } - err = fmt.Errorf("panic: %v, stacktrace: %s", err, string(debug.Stack())) - go c.errorsHandler(ctx, e.Job, err) - } - }() - if c.funcCtx != nil { ctx = c.funcCtx(ctx, e.Job) } - m, err := c.etcdclient.NewMutex(fmt.Sprintf("etcd_cron/%s/", e.Job.canonicalName())) + tickLock := e.distMutexPrefix + fmt.Sprintf("%v", effective.Unix()) + m, err := mutexStore.Get(tickLock) if err != nil { go c.etcdErrorsHandler(ctx, e.Job, errors.Wrapf(err, "fail to create etcd mutex for job '%v'", e.Job.Name)) return } + lockCtx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() + // Local mutex is needed to avoid race condition on reusing the etcd mutex object. + localMutex := localMutexer.Get(tickLock) + localMutex.Lock() err = m.Lock(lockCtx) + localMutex.Unlock() if err == context.DeadlineExceeded { return } else if err != nil { @@ -332,44 +433,39 @@ func (c *Cron) run(ctx context.Context) { return } - err = e.Job.Run(ctx) + err = c.triggerFunc(ctx, e.Job.Type, e.Job.Payload) if err != nil { go c.errorsHandler(ctx, e.Job, err) return } + // Cannot unlock because it can open a chance for double trigger since two instances + // can have a clock skew and compete for the lock at slight different windows. + // So, we keep the lock during its ttl }(ctx, e) } continue - case newEntry := <-c.add: - c.entries = append(c.entries, newEntry) - newEntry.Next = newEntry.Schedule.Next(now) - case <-c.snapshot: - c.snapshot <- c.entrySnapshot() + c.snapshot <- c.entrySnapshot(entries) - case <-c.stop: + case <-ctx.Done(): + c.runWaitingGroup.Done() return } - - // 'now' should be updated after newEntry and snapshot cases. - now = time.Now().Local() } } -// Stop the cron scheduler. -func (c *Cron) Stop() { - c.stop <- struct{}{} +// Wait the cron to stop after context is cancelled. +func (c *Cron) Wait() { + c.runWaitingGroup.Wait() + c.jobStore.Wait() c.running = false } // entrySnapshot returns a copy of the current cron entry list. -func (c *Cron) entrySnapshot() []*Entry { - c.entriesMutex.RLock() - defer c.entriesMutex.RUnlock() - +func (c *Cron) entrySnapshot(input []*Entry) []*Entry { entries := []*Entry{} - for _, e := range c.entries { + for _, e := range input { entries = append(entries, &Entry{ Schedule: e.Schedule, Next: e.Next, diff --git a/cron_test.go b/cron_test.go index 0eafbf7..513e50c 100644 --- a/cron_test.go +++ b/cron_test.go @@ -1,11 +1,21 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + package etcdcron import ( "context" "fmt" "sync" + "sync/atomic" "testing" "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + anypb "google.golang.org/protobuf/types/known/anypb" ) // Many tests schedule a job for every second, and then wait at most a second @@ -15,16 +25,17 @@ const ONE_SECOND = 1*time.Second + 200*time.Millisecond // Start and stop cron with no entries. func TestNoEntries(t *testing.T) { - cron, err := New() + cron, err := New(WithNamespace(randomNamespace())) if err != nil { t.Fatal("unexpected error") } - cron.Start(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + cron.Start(ctx) select { case <-time.After(ONE_SECOND): t.FailNow() - case <-stop(cron): + case <-stop(cron, cancel): } } @@ -33,19 +44,22 @@ func TestStopCausesJobsToNotRun(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) - cron, err := New() + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + wg.Done() + return nil + })) if err != nil { t.Fatal("unexpected error") } - cron.Start(context.Background()) - cron.Stop() - cron.AddJob(Job{ + ctx, cancel := context.WithCancel(context.Background()) + cron.Start(ctx) + cancel() + cron.Wait() + cron.AddJob(ctx, Job{ Name: "test-stop", Rhythm: "* * * * * ?", - Func: func(ctx context.Context) error { - wg.Done() - return nil - }, }) select { @@ -59,23 +73,37 @@ func TestStopCausesJobsToNotRun(t *testing.T) { // Add a job, start cron, expect it runs. func TestAddBeforeRunning(t *testing.T) { wg := &sync.WaitGroup{} + calledAlready := false wg.Add(1) - cron, err := New() + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + if calledAlready { + return nil + } + + calledAlready = true + wg.Done() + return nil + })) if err != nil { t.Fatal("unexpected error") } - cron.AddJob(Job{ + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ Name: "test-add-before-running", - Rhythm: "* * * * * ?", - Func: func(context.Context) error { wg.Done(); return nil }, + Rhythm: "* * * * * *", }) - cron.Start(context.Background()) - defer cron.Stop() + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() // Give cron 2 seconds to run our job (which is always activated). select { - case <-time.After(ONE_SECOND): + case <-time.After(2 * ONE_SECOND): t.FailNow() case <-wait(wg): } @@ -84,26 +112,31 @@ func TestAddBeforeRunning(t *testing.T) { // Start cron, add a job, expect it runs. func TestAddWhileRunning(t *testing.T) { wg := &sync.WaitGroup{} - wg.Add(1) + wg.Add(2) - cron, err := New() + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + wg.Done() + return nil + })) if err != nil { t.Fatal("unexpected error") } - cron.Start(context.Background()) - defer cron.Stop() + ctx, cancel := context.WithCancel(context.Background()) + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() - cron.AddJob(Job{ + cron.AddJob(ctx, Job{ Name: "test-run", Rhythm: "* * * * * ?", - Func: func(context.Context) error { - wg.Done() - return nil - }, }) select { - case <-time.After(ONE_SECOND): + case <-time.After(2 * ONE_SECOND): t.FailNow() case <-wait(wg): } @@ -112,32 +145,89 @@ func TestAddWhileRunning(t *testing.T) { // Test timing with Entries. func TestSnapshotEntries(t *testing.T) { wg := &sync.WaitGroup{} - wg.Add(1) + wg.Add(2) - cron, err := New() + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + wg.Done() + return nil + })) if err != nil { t.Fatal("unexpected error") } - cron.AddJob(Job{ + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ Name: "test-snapshot-entries", Rhythm: "@every 2s", - Func: func(context.Context) error { - wg.Done() - return nil - }, }) - cron.Start(context.Background()) - defer cron.Stop() + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() - // Cron should fire in 2 seconds. After 1 second, call Entries. + // After 1 second, call Entries. select { case <-time.After(ONE_SECOND): cron.Entries() } - // Even though Entries was called, the cron should fire at the 2 second mark. + // Even though Entries was called, the cron should fire twice within 3 seconds (1 + 3). select { - case <-time.After(ONE_SECOND): + case <-time.After(3 * ONE_SECOND): + t.FailNow() + case <-wait(wg): + } +} + +// Test delayed add after un starts for a while. +func TestDelayedAdd(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + called := false + + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + if s == "noop" { + return nil + } + if called { + t.Fatal("cannot call twice") + } + called = true + wg.Done() + return nil + })) + if err != nil { + t.Fatal("unexpected error") + } + + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ + Name: "test-noop", + Rhythm: "@every 1s", + Type: "noop", + }) + + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() + + // Artificial delay before add another record. + time.Sleep(10 * time.Second) + + cron.AddJob(ctx, Job{ + Name: "test-ev-2s", + Rhythm: "@every 2s", + }) + + // Event should be called only once within 2 seconds. + select { + case <-time.After(3 * ONE_SECOND): t.FailNow() case <-wait(wg): } @@ -151,36 +241,47 @@ func TestMultipleEntries(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(2) - cron, err := New() + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + if s == "return-nil" { + return nil + } + + wg.Done() + return nil + })) if err != nil { t.Fatal("unexpected error") } - cron.AddJob(Job{ + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ Name: "test-multiple-1", Rhythm: "0 0 0 1 1 ?", - Func: func(context.Context) error { return nil }, + Type: "return-nil", }) - cron.AddJob(Job{ + cron.AddJob(ctx, Job{ Name: "test-multiple-2", Rhythm: "* * * * * ?", - Func: func(context.Context) error { wg.Done(); return nil }, }) - cron.AddJob(Job{ + cron.AddJob(ctx, Job{ Name: "test-multiple-3", Rhythm: "0 0 0 31 12 ?", - Func: func(context.Context) error { return nil }, + Type: "return-nil", }) - cron.AddJob(Job{ + cron.AddJob(ctx, Job{ Name: "test-multiple-4", Rhythm: "* * * * * ?", - Func: func(context.Context) error { wg.Done(); return nil }, }) - cron.Start(context.Background()) - defer cron.Stop() + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() select { - case <-time.After(ONE_SECOND): + case <-time.After(2 * ONE_SECOND): t.FailNow() case <-wait(wg): } @@ -191,28 +292,40 @@ func TestRunningJobTwice(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(2) - cron, err := New() + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + if s == "return-nil" { + return nil + } + + wg.Done() + return nil + })) if err != nil { t.Fatal("unexpected error") } - cron.AddJob(Job{ + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ Name: "test-twice-1", Rhythm: "0 0 0 1 1 ?", - Func: func(context.Context) error { return nil }, + Type: "return-nil", }) - cron.AddJob(Job{ + cron.AddJob(ctx, Job{ Name: "test-twice-2", Rhythm: "0 0 0 31 12 ?", - Func: func(context.Context) error { return nil }, + Type: "return-nil", }) - cron.AddJob(Job{ + cron.AddJob(ctx, Job{ Name: "test-twice-3", Rhythm: "* * * * * ?", - Func: func(context.Context) error { wg.Done(); return nil }, }) - cron.Start(context.Background()) - defer cron.Stop() + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() select { case <-time.After(2 * ONE_SECOND): @@ -225,32 +338,44 @@ func TestRunningMultipleSchedules(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(2) - cron, err := New() + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + if s == "return-nil" { + return nil + } + + wg.Done() + return nil + })) if err != nil { t.Fatal("unexpected error") } - cron.AddJob(Job{ + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ Name: "test-mschedule-1", Rhythm: "0 0 0 1 1 ?", - Func: func(context.Context) error { return nil }, + Type: "return-nil", }) - cron.AddJob(Job{ + cron.AddJob(ctx, Job{ Name: "test-mschedule-2", Rhythm: "0 0 0 31 12 ?", - Func: func(context.Context) error { return nil }, + Type: "return-nil", }) - cron.AddJob(Job{ + cron.AddJob(ctx, Job{ Name: "test-mschedule-3", Rhythm: "* * * * * ?", - Func: func(context.Context) error { wg.Done(); return nil }, }) - cron.Schedule(Every(time.Minute), Job{Name: "test-mschedule-4", Func: func(context.Context) error { return nil }}) - cron.Schedule(Every(time.Second), Job{Name: "test-mschedule-5", Func: func(context.Context) error { wg.Done(); return nil }}) - cron.Schedule(Every(time.Hour), Job{Name: "test-mschedule-6", Func: func(context.Context) error { return nil }}) - - cron.Start(context.Background()) - defer cron.Stop() + cron.schedule(Every(time.Minute), Job{Name: "test-mschedule-4", Type: "return-nil"}) + cron.schedule(Every(time.Second), Job{Name: "test-mschedule-5"}) + cron.schedule(Every(time.Hour), Job{Name: "test-mschedule-6", Type: "return-nil"}) + + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() select { case <-time.After(2 * ONE_SECOND): @@ -262,30 +387,39 @@ func TestRunningMultipleSchedules(t *testing.T) { // Test that the cron is run in the local time zone (as opposed to UTC). func TestLocalTimezone(t *testing.T) { wg := &sync.WaitGroup{} + called := atomic.Int32{} wg.Add(1) now := time.Now().Local() spec := fmt.Sprintf("%d %d %d %d %d ?", - now.Second()+1, now.Minute(), now.Hour(), now.Day(), now.Month()) - - cron, err := New() + now.Second()+2, now.Minute(), now.Hour(), now.Day(), now.Month()) + + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + if called.Add(1) > 1 { + return nil + } + wg.Done() + return nil + })) if err != nil { t.Fatal("unexpected error") } - cron.AddJob(Job{ + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ Name: "test-local", Rhythm: spec, - Func: func(context.Context) error { - wg.Done() - return nil - }, }) - cron.Start(context.Background()) - defer cron.Stop() + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() select { - case <-time.After(ONE_SECOND): + case <-time.After(3 * ONE_SECOND): t.FailNow() case <-wait(wg): } @@ -294,47 +428,55 @@ func TestLocalTimezone(t *testing.T) { // Simple test using Runnables. func TestJob(t *testing.T) { wg := &sync.WaitGroup{} + calledAlready := false wg.Add(1) - cron, err := New() + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + if calledAlready { + return nil + } + calledAlready = true + wg.Done() + return nil + })) if err != nil { t.Fatal("unexpected error") } - cron.AddJob(Job{ + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ Name: "job0", Rhythm: "0 0 0 30 Feb ?", - Func: func(context.Context) error { wg.Done(); return nil }, }) - cron.AddJob(Job{ + cron.AddJob(ctx, Job{ Name: "job1", Rhythm: "0 0 0 1 1 ?", - Func: func(context.Context) error { wg.Done(); return nil }, }) - cron.AddJob(Job{ + cron.AddJob(ctx, Job{ Name: "job2", Rhythm: "* * * * * ?", - Func: func(context.Context) error { wg.Done(); return nil }, }) - cron.AddJob(Job{ + cron.AddJob(ctx, Job{ Name: "job3", Rhythm: "1 0 0 1 1 ?", - Func: func(context.Context) error { wg.Done(); return nil }, }) - cron.Schedule(Every(5*time.Second+5*time.Nanosecond), Job{ + cron.schedule(Every(5*time.Second+5*time.Nanosecond), Job{ Name: "job4", - Func: func(context.Context) error { wg.Done(); return nil }, }) - cron.Schedule(Every(5*time.Minute), Job{ + cron.schedule(Every(5*time.Minute), Job{ Name: "job5", - Func: func(context.Context) error { wg.Done(); return nil }, }) - cron.Start(context.Background()) - defer cron.Stop() + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() select { - case <-time.After(ONE_SECOND): + case <-time.After(2 * ONE_SECOND): t.FailNow() case <-wait(wg): } @@ -361,31 +503,45 @@ func TestCron_Parallel(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(2) - cron1, err := New() + cron1, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + wg.Done() + return nil + })) if err != nil { t.Fatal("unexpected error") } - defer cron1.Stop() + ctx1, cancel1 := context.WithCancel(context.Background()) + defer func() { + cancel1() + cron1.Wait() + }() - cron2, err := New() + cron2, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + wg.Done() + return nil + })) if err != nil { t.Fatal("unexpected error") } - defer cron2.Stop() + ctx2, cancel2 := context.WithCancel(context.Background()) + defer func() { + cancel2() + cron2.Wait() + }() job := Job{ Name: "test-parallel", Rhythm: "* * * * * ?", - Func: func(context.Context) error { - wg.Done() - return nil - }, } - cron1.AddJob(job) - cron2.AddJob(job) + cron1.AddJob(ctx1, job) + cron2.AddJob(ctx2, job) - cron1.Start(context.Background()) - cron2.Start(context.Background()) + cron1.Start(ctx1) + cron2.Start(ctx2) select { case <-time.After(time.Duration(2) * ONE_SECOND): @@ -394,6 +550,46 @@ func TestCron_Parallel(t *testing.T) { } } +// Test job expires. +func TestTTL(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(5) + + firedOnce := atomic.Bool{} + + cron, err := New( + WithNamespace(randomNamespace()), + WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + firedOnce.Store(true) + wg.Done() + return nil + })) + if err != nil { + t.Fatal("unexpected error") + } + ctx, cancel := context.WithCancel(context.Background()) + cron.AddJob(ctx, Job{ + Name: "test-twice-3", + Rhythm: "* * * * * ?", + TTL: 2, + }) + + cron.Start(ctx) + defer func() { + cancel() + cron.Wait() + }() + + select { + case <-time.After(6 * ONE_SECOND): + // Success, it means it did not consume all the workgroup count because the job expired. + assert.True(t, firedOnce.Load()) + case <-wait(wg): + // Fails because TTL should delete the job and make it stop consuming the workgroup count. + t.FailNow() + } +} + func wait(wg *sync.WaitGroup) chan bool { ch := make(chan bool) go func() { @@ -403,11 +599,16 @@ func wait(wg *sync.WaitGroup) chan bool { return ch } -func stop(cron *Cron) chan bool { +func stop(cron *Cron, cancel context.CancelFunc) chan bool { ch := make(chan bool) go func() { - cron.Stop() + cancel() + cron.Wait() ch <- true }() return ch } + +func randomNamespace() string { + return uuid.New().String() +} diff --git a/doc.go b/doc.go index 0e975ff..0f17300 100644 --- a/doc.go +++ b/doc.go @@ -1,28 +1,64 @@ /* Package cron implements a cron spec parser and job runner. -Usage +# Usage -Callers may register Funcs to be invoked on a given schedule. Cron will run +Callers registers a single callback function and crons provide context via `type` and `payload`. Cron will run them in their own goroutines. - c := cron.New() - c.AddFunc("0 30 * * * *", func() { fmt.Println("Every hour on the half hour") }) - c.AddFunc("@hourly", func() { fmt.Println("Every hour") }) - c.AddFunc("@every 1h30m", func() { fmt.Println("Every hour thirty") }) - c.Start() + c := cron.New( + cron.WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error { + log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value)) + return nil + }), + ) + c.AddJob(ctx, etcdcron.Job{ + Name: "job-100", + Rhythm: "*\/2 * * * * *", + Type: "stdout", + Payload: &anypb.Any{Value: []byte("Hello every 2s")}, + }) + c.AddJob(ctx, etcdcron.Job{ + Name: "job-101", + Rhythm: "0 30 * * * *", + Type: "stdout", + Payload: &anypb.Any{Value: []byte("Every hour on the half hour")}, + }) + c.AddJob(ctx, etcdcron.Job{ + Name: "job-102", + Rhythm: "@hourly", + Type: "stdout", + Payload: &anypb.Any{Value: []byte("Every hour")}, + }) + c.AddJob(ctx, etcdcron.Job{ + Name: "job-103", + Rhythm: "@every 1h30m", + Type: "stdout", + Payload: &anypb.Any{Value: []byte("Every hour thirty")}, + }) + + ctx, cancel := context.WithCancel(context.Background()) + c.Start(ctx) .. - // Funcs are invoked in their own goroutine, asynchronously. + // Jobs are invoked in their own goroutine, asynchronously. ... - // Funcs may also be added to a running Cron - c.AddFunc("@daily", func() { fmt.Println("Every day") }) + // Jobs may also be added to a running Cron + c.AddJob(ctx, etcdcron.Job{ + Name: "job-103", + Rhythm: "@daily", + Type: "stdout", + Payload: &anypb.Any{Value: []byte("Every day")}, + }) .. // Inspect the cron job entries' next and previous run times. inspect(c.Entries()) .. - c.Stop() // Stop the scheduler (does not stop any jobs already running). -CRON Expression Format + // Stop the scheduler via context (it can cancel jobs already running). + cancel() + c.Wait() + +# CRON Expression Format A cron expression represents a set of times, using 6 space-separated fields. @@ -38,7 +74,7 @@ A cron expression represents a set of times, using 6 space-separated fields. Note: Month and Day-of-week field values are case insensitive. "SUN", "Sun", and "sun" are equally accepted. -Special Characters +# Special Characters Asterisk ( * ) @@ -70,7 +106,7 @@ Question mark ( ? ) Question mark may be used instead of '*' for leaving either day-of-month or day-of-week blank. -Predefined schedules +# Predefined schedules You may use one of several pre-defined schedules in place of a cron expression. @@ -82,12 +118,12 @@ You may use one of several pre-defined schedules in place of a cron expression. @daily (or @midnight) | Run once a day, midnight | 0 0 0 * * * @hourly | Run once an hour, beginning of hour | 0 0 * * * * -Intervals +# Intervals You may also schedule a job to execute at fixed intervals. This is supported by formatting the cron spec like this: - @every + @every where "duration" is a string accepted by time.ParseDuration (http://golang.org/pkg/time/#ParseDuration). @@ -99,7 +135,7 @@ Note: The interval does not take the job runtime into account. For example, if a job takes 3 minutes to run, and it is scheduled to run every 5 minutes, it will have only 2 minutes of idle time between each run. -Time zones +# Time zones All interpretation and scheduling is done in the machine's local time zone (as provided by the Go time package (http://www.golang.org/pkg/time). @@ -107,7 +143,7 @@ provided by the Go time package (http://www.golang.org/pkg/time). Be aware that jobs scheduled during daylight-savings leap-ahead transitions will not be run! -Thread safety +# Thread safety Since the Cron service runs concurrently with the calling code, some amount of care must be taken to ensure proper synchronization. @@ -115,15 +151,15 @@ care must be taken to ensure proper synchronization. All cron methods are designed to be correctly synchronized as long as the caller ensures that invocations have a clear happens-before ordering between them. -Implementation +# Implementation Cron entries are stored in an array, sorted by their next activation time. Cron sleeps until the next job is due to be run. Upon waking: - - it runs each entry that is active on that second - - it calculates the next run times for the jobs that were run - - it re-sorts the array of entries by next activation time. - - it goes to sleep until the soonest job. + - it runs each entry that is active on that second + - it calculates the next run times for the jobs that were run + - it re-sorts the array of entries by next activation time. + - it goes to sleep until the soonest job. */ package etcdcron diff --git a/etcd.go b/etcd.go index c42eea8..54c2581 100644 --- a/etcd.go +++ b/etcd.go @@ -1,3 +1,8 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + package etcdcron import ( @@ -11,11 +16,17 @@ type DistributedMutex interface { IsOwner() etcdclient.Cmp Key() string Lock(ctx context.Context) error + TryLock(ctx context.Context) error Unlock(ctx context.Context) error } type EtcdMutexBuilder interface { NewMutex(pfx string) (DistributedMutex, error) + NewJobStore( + organizer Organizer, + partitioning Partitioning, + putCallback func(context.Context, Job) error, + deleteCallback func(context.Context, string) error) JobStore } type etcdMutexBuilder struct { @@ -31,13 +42,19 @@ func NewEtcdMutexBuilder(config etcdclient.Config) (EtcdMutexBuilder, error) { } func (c etcdMutexBuilder) NewMutex(pfx string) (DistributedMutex, error) { - // As each task iteration lock name is unique, we don't really care about unlocking it - // So the etcd lease will alst 10 minutes, it ensures that even if another server - // clock is ill-configured (with a maximum span of 10 minutes), it won't execute the task - // twice. - session, err := concurrency.NewSession(c.Client, concurrency.WithTTL(60*10)) + // We keep the lock per run, reusing the lock over multiple iterations. + // If we lose the lock, another instance will take it. + session, err := concurrency.NewSession(c.Client) if err != nil { return nil, err } return concurrency.NewMutex(session, pfx), nil } + +func (c etcdMutexBuilder) NewJobStore( + organizer Organizer, + p Partitioning, + putCallback func(context.Context, Job) error, + deleteCallback func(context.Context, string) error) JobStore { + return NewEtcdJobStore(c.Client, organizer, p, putCallback, deleteCallback) +} diff --git a/examples/cron_example.go b/examples/cron_example.go index 6a57c48..5582edb 100644 --- a/examples/cron_example.go +++ b/examples/cron_example.go @@ -1,38 +1,131 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + package main import ( "context" - "errors" + "fmt" "log" "os" - "time" + "os/signal" + "strconv" + "sync" + "syscall" - "github.com/Scalingo/go-etcd-cron" + etcdcron "github.com/diagridio/go-etcd-cron" + "google.golang.org/protobuf/types/known/anypb" ) func main() { - log.Println("starting") - cron, err := etcdcron.New() + hostId, err := strconv.Atoi(os.Getenv("HOST_ID")) if err != nil { - log.Fatal("fail to create etcd-cron", err) + hostId = 0 + } + numHosts, err := strconv.Atoi(os.Getenv("NUM_HOSTS")) + if err != nil { + numHosts = 1 + } + numPartitions, err := strconv.Atoi(os.Getenv("NUM_PARTITIONS")) + if err != nil { + numPartitions = 1 + } + namespace := os.Getenv("NAMESPACE") + if namespace == "" { + namespace = "example" + } + + log.Printf("starting hostId=%d for total of %d hosts and %d partitions", hostId, numHosts, numPartitions) + + p, err := etcdcron.NewPartitioning(numPartitions, numHosts, hostId) + if err != nil { + log.Fatal("fail to create partitioning", err) } - cron.AddJob(etcdcron.Job{ - Name: "test", - Rhythm: "*/4 * * * * *", - Func: func(ctx context.Context) error { - // Use default logging of etcd-cron - return errors.New("Horrible Error") - }, - }) - cron.AddJob(etcdcron.Job{ - Name: "test-v2", - Rhythm: "*/10 * * * * *", - Func: func(ctx context.Context) error { - log.Println("Every 10 seconds from", os.Getpid()) + cron, err := etcdcron.New( + etcdcron.WithNamespace(namespace), + etcdcron.WithPartitioning(p), + etcdcron.WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error { + log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value)) return nil - }, - }) - cron.Start(context.Background()) - time.Sleep(100 * time.Second) - cron.Stop() + }), + ) + if err != nil { + log.Fatal("fail to create etcd-cron", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM) + var wg sync.WaitGroup + // Start a goroutine to listen for signals + go func() { + // Wait for a signal + sig := <-signalChannel + fmt.Println("\nReceived signal:", sig) + + // Clean up and notify the main goroutine to exit + cancel() + wg.Done() + }() + + if os.Getenv("ADD") == "1" { + cron.AddJob(ctx, etcdcron.Job{ + Name: "every-2s-b34w5y5hbwthjs", + Rhythm: "*/2 * * * * *", + Type: "stdout", // can be anything the client wants + Payload: &anypb.Any{Value: []byte("ev 2s")}, + }) + cron.AddJob(ctx, etcdcron.Job{ + Name: "every-10s-bnsf45354wbdsnd", + Rhythm: "*/10 * * * * *", + Type: "stdout", // can be anything the client wants + Payload: &anypb.Any{Value: []byte("ev 10s")}, + }) + cron.AddJob(ctx, etcdcron.Job{ + Name: "every-3s-mdhgm764324rqdg", + Rhythm: "*/3 * * * * *", + Type: "stdout", // can be anything the client wants + Payload: &anypb.Any{Value: []byte("ev 3s")}, + }) + cron.AddJob(ctx, etcdcron.Job{ + Name: "every-4s-vdafbrtjnysh245", + Rhythm: "*/4 * * * * *", + Type: "stdout", // can be anything the client wants + Payload: &anypb.Any{Value: []byte("ev 4s")}, + }) + cron.AddJob(ctx, etcdcron.Job{ + Name: "every-5s-adjbg43q5rbafbr44", + Rhythm: "*/5 * * * * *", + Type: "stdout", // can be anything the client wants + Payload: &anypb.Any{Value: []byte("ev 5s")}, + }) + cron.AddJob(ctx, etcdcron.Job{ + Name: "every-6s-abadfh52jgdyj467", + Rhythm: "*/6 * * * * *", + Type: "stdout", // can be anything the client wants + Payload: &anypb.Any{Value: []byte("ev 6s")}, + }) + cron.AddJob(ctx, etcdcron.Job{ + Name: "every-7s-bndasfbn4q55fgn", + Rhythm: "*/7 * * * * *", + Type: "stdout", // can be anything the client wants + Payload: &anypb.Any{Value: []byte("ev 7s")}, + }) + cron.AddJob(ctx, etcdcron.Job{ + Name: "every-1s-then-expire-hadfh452erhh", + Rhythm: "*/1 * * * * *", + Type: "stdout", // can be anything the client wants + TTL: 10, + Payload: &anypb.Any{Value: []byte("ev 1s then expires after 10s")}, + }) + } + cron.Start(ctx) + + // Wait for graceful shutdown on interrupt signal + wg.Add(1) + wg.Wait() + + fmt.Println("Program gracefully terminated.") } diff --git a/go.mod b/go.mod index 203fdfe..89dd6c8 100644 --- a/go.mod +++ b/go.mod @@ -1,28 +1,34 @@ -module github.com/Scalingo/go-etcd-cron +module github.com/diagridio/go-etcd-cron go 1.20 require ( - github.com/iancoleman/strcase v0.3.0 github.com/pkg/errors v0.9.1 go.etcd.io/etcd/client/v3 v3.5.12 ) +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.3 // indirect - go.etcd.io/etcd/api/v3 v3.5.12 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/uuid v1.6.0 + github.com/stretchr/testify v1.9.0 + go.etcd.io/etcd/api/v3 v3.5.12 go.etcd.io/etcd/client/pkg/v3 v3.5.12 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.26.0 // indirect - golang.org/x/net v0.20.0 // indirect - golang.org/x/sys v0.16.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/net v0.22.0 // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect - google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect - google.golang.org/grpc v1.61.0 // indirect - google.golang.org/protobuf v1.32.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect + google.golang.org/grpc v1.62.1 // indirect + google.golang.org/protobuf v1.33.0 ) diff --git a/go.sum b/go.sum index 7150b60..585757e 100644 --- a/go.sum +++ b/go.sum @@ -3,22 +3,23 @@ github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03V github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= -github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/etcd/api/v3 v3.5.12 h1:W4sw5ZoU2Juc9gBWuLk5U6fHfNVyY1WC5g9uiXZio/c= @@ -27,11 +28,11 @@ go.etcd.io/etcd/client/pkg/v3 v3.5.12 h1:EYDL6pWwyOsylrQyLp2w+HkQ46ATiOvoEdMarin go.etcd.io/etcd/client/pkg/v3 v3.5.12/go.mod h1:seTzl2d9APP8R5Y2hFL3NVlD6qC/dOT+3kvrqPyTas4= go.etcd.io/etcd/client/v3 v3.5.12 h1:v5lCPXn1pf1Uu3M4laUE2hp/geOTc5uPcYYsNe1lDxg= go.etcd.io/etcd/client/v3 v3.5.12/go.mod h1:tSbBCakoWmmddL+BKVAJHa9km+O/E+bumDe9mSbPiqw= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= -go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -41,16 +42,16 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= -golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= -golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= @@ -63,18 +64,15 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 h1:wpZ8pe2x1Q3f2KyT5f8oP/fa9rHAKgFPr/HZdNuS+PQ= -google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:J7XzRzVy1+IPwWHZUzoD0IccYZIrXILAQpc+Qy9CMhY= -google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 h1:JpwMPBpFN3uKhdaekDpiNlImDdkUAyiJ6ez/uxGaUSo= -google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:0xJLfVdJqpAPl8tDg1ujOCGzx6LFLttXT5NhllGOXY4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA= -google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0= -google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= -google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 h1:8eadJkXbwDEMNwcB5O0s5Y5eCfyuCLdvaiOIaGTrWmQ= +google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8/go.mod h1:O1cOfN1Cy6QEYr7VxtjOyP5AdAuR0aJ/MYZaaof623Y= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 h1:IR+hp6ypxjH24bkMfEJ0yHR21+gwPWdV+/IBrPQyn3k= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs= +google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= +google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/job.pb.go b/job.pb.go new file mode 100644 index 0000000..3ab7a53 --- /dev/null +++ b/job.pb.go @@ -0,0 +1,183 @@ +// +//Copyright (c) 2024 Diagrid Inc. +//Licensed under the MIT License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: proto/job.proto + +package etcdcron + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + anypb "google.golang.org/protobuf/types/known/anypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// JobRecord is the record persisted on Etcd. +type JobRecord struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Rhythm string `protobuf:"bytes,2,opt,name=rhythm,proto3" json:"rhythm,omitempty"` + Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"` + Payload *anypb.Any `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (x *JobRecord) Reset() { + *x = JobRecord{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_job_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *JobRecord) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*JobRecord) ProtoMessage() {} + +func (x *JobRecord) ProtoReflect() protoreflect.Message { + mi := &file_proto_job_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use JobRecord.ProtoReflect.Descriptor instead. +func (*JobRecord) Descriptor() ([]byte, []int) { + return file_proto_job_proto_rawDescGZIP(), []int{0} +} + +func (x *JobRecord) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *JobRecord) GetRhythm() string { + if x != nil { + return x.Rhythm + } + return "" +} + +func (x *JobRecord) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *JobRecord) GetPayload() *anypb.Any { + if x != nil { + return x.Payload + } + return nil +} + +var File_proto_job_proto protoreflect.FileDescriptor + +var file_proto_job_proto_rawDesc = []byte{ + 0x0a, 0x0f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6a, 0x6f, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x12, 0x08, 0x65, 0x74, 0x63, 0x64, 0x63, 0x72, 0x6f, 0x6e, 0x1a, 0x19, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x7b, 0x0a, 0x09, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x63, + 0x6f, 0x72, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x68, 0x79, 0x74, 0x68, + 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x68, 0x79, 0x74, 0x68, 0x6d, 0x12, + 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, + 0x79, 0x70, 0x65, 0x12, 0x2e, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, + 0x6f, 0x61, 0x64, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, + 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, 0x6e, 0x3b, 0x65, 0x74, 0x63, 0x64, 0x63, 0x72, 0x6f, + 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_job_proto_rawDescOnce sync.Once + file_proto_job_proto_rawDescData = file_proto_job_proto_rawDesc +) + +func file_proto_job_proto_rawDescGZIP() []byte { + file_proto_job_proto_rawDescOnce.Do(func() { + file_proto_job_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_job_proto_rawDescData) + }) + return file_proto_job_proto_rawDescData +} + +var file_proto_job_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_proto_job_proto_goTypes = []interface{}{ + (*JobRecord)(nil), // 0: etcdcron.JobRecord + (*anypb.Any)(nil), // 1: google.protobuf.Any +} +var file_proto_job_proto_depIdxs = []int32{ + 1, // 0: etcdcron.JobRecord.payload:type_name -> google.protobuf.Any + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_proto_job_proto_init() } +func file_proto_job_proto_init() { + if File_proto_job_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_job_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*JobRecord); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_job_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_job_proto_goTypes, + DependencyIndexes: file_proto_job_proto_depIdxs, + MessageInfos: file_proto_job_proto_msgTypes, + }.Build() + File_proto_job_proto = out.File + file_proto_job_proto_rawDesc = nil + file_proto_job_proto_goTypes = nil + file_proto_job_proto_depIdxs = nil +} diff --git a/local_mutexer.go b/local_mutexer.go new file mode 100644 index 0000000..ab5e75b --- /dev/null +++ b/local_mutexer.go @@ -0,0 +1,60 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package etcdcron + +import ( + "context" + "sync" +) + +// Mutexer locks and unlocks mutexes locally based on key, with garbage collection method. +type Mutexer struct { + mutex sync.RWMutex + + mutexes map[string]*sync.RWMutex + collector *Collector +} + +func NewMutexer(collector *Collector) *Mutexer { + return &Mutexer{ + mutexes: map[string]*sync.RWMutex{}, + collector: collector, + } +} + +func (o *Mutexer) Get(key string) *sync.RWMutex { + // Optimistic read lock. + o.mutex.RLock() + m, ok := o.mutexes[key] + o.mutex.RUnlock() + if !ok { + // Now we need to check again after getting lock, common pattern. + o.mutex.Lock() + defer o.mutex.Unlock() + m, ok = o.mutexes[key] + if !ok { + m = &sync.RWMutex{} + o.collector.Add(func(ctx context.Context) { + o.Delete(key) + }) + o.mutexes[key] = m + return m + } + } + + return m +} + +func (o *Mutexer) Delete(keys ...string) { + o.mutex.Lock() + defer o.mutex.Unlock() + for _, key := range keys { + _, ok := o.mutexes[key] + if ok { + delete(o.mutexes, key) + } + } +} diff --git a/mutex_cache.go b/mutex_cache.go new file mode 100644 index 0000000..95effdf --- /dev/null +++ b/mutex_cache.go @@ -0,0 +1,64 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package etcdcron + +import ( + "context" + "sync" +) + +// MutexStore allows reuse of the same dist mutex in Etcd for a given key. +type MutexStore struct { + lock sync.RWMutex + cache map[string]DistributedMutex + mutexBuilder EtcdMutexBuilder + collector *Collector +} + +func NewMutexStore(mutexBuilder EtcdMutexBuilder, collector *Collector) *MutexStore { + return &MutexStore{ + cache: map[string]DistributedMutex{}, + mutexBuilder: mutexBuilder, + collector: collector, + } +} + +func (m *MutexStore) Get(key string) (DistributedMutex, error) { + m.lock.RLock() + mutex := m.cache[key] + m.lock.RUnlock() + if mutex != nil { + return mutex, nil + } + + m.lock.Lock() + defer m.lock.Unlock() + mutex = m.cache[key] + if mutex != nil { + return mutex, nil + } + + mutex, err := m.mutexBuilder.NewMutex(key) + if err != nil { + return nil, err + } + m.collector.Add(func(ctx context.Context) { + m.Delete(key) + }) + m.cache[key] = mutex + return mutex, nil +} + +func (m *MutexStore) Delete(keys ...string) { + m.lock.Lock() + defer m.lock.Unlock() + for _, key := range keys { + _, ok := m.cache[key] + if ok { + delete(m.cache, key) + } + } +} diff --git a/organizer.go b/organizer.go new file mode 100644 index 0000000..744cf50 --- /dev/null +++ b/organizer.go @@ -0,0 +1,42 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package etcdcron + +import ( + "path/filepath" + "strconv" +) + +// The Organizer decides about key locations. +type Organizer interface { + JobPath(jobName string) string + JobsPath(partitionId int) string + TicksPath(partitionId int) string +} + +type organizer struct { + partitioning Partitioning + namespace string +} + +func NewOrganizer(namespace string, p Partitioning) Organizer { + return &organizer{ + partitioning: p, + namespace: namespace, + } +} + +func (o *organizer) JobPath(jobName string) string { + return filepath.Join(o.namespace, "partitions", strconv.Itoa(o.partitioning.CalculatePartitionId(jobName)), "jobs", jobName) +} + +func (o *organizer) JobsPath(partitionId int) string { + return filepath.Join(o.namespace, "partitions", strconv.Itoa(partitionId), "jobs") +} + +func (o *organizer) TicksPath(partitionId int) string { + return filepath.Join(o.namespace, "partitions", strconv.Itoa(partitionId), "ticks") +} diff --git a/partition.go b/partition.go new file mode 100644 index 0000000..37f3805 --- /dev/null +++ b/partition.go @@ -0,0 +1,90 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package etcdcron + +import ( + "fmt" + "hash/fnv" +) + +// The Partitioning splits the jobs between partitions. +type Partitioning interface { + CalculatePartitionId(key string) int + ListPartitions() []int + CheckPartitionLeader(partitionId int) bool +} + +type partitioning struct { + // Number of hosts to compete to leadership of partitions. + numHosts int + // Maximum number of virtual partitions to split the jobs. + numVirtualPartitions int + // This hostId (must be between 0 and numHosts) + hostId int + // Partitions that this instance will try to become a leader for. + myPartitions []int +} + +// NoPartitioning will make any host a leader of the only partition. +func NoPartitioning() Partitioning { + p, _ := NewPartitioning(1, 1, 0) + return p +} + +// NewPartitioning holds the logic to hold job partitioning +func NewPartitioning(numVirtualPartitions int, numHosts int, hostId int) (Partitioning, error) { + if numVirtualPartitions < numHosts { + return nil, fmt.Errorf("cannot have more hosts than partitions") + } + if hostId < 0 { + return nil, fmt.Errorf("hostId cannot be zero") + } + if hostId >= numHosts { + return nil, fmt.Errorf("hostId cannot be greater or equal to %d", numHosts) + } + partitions := []int{} + for i := 0; i < numVirtualPartitions; i++ { + if checkPartitionLeader(i, numVirtualPartitions, hostId, numHosts) { + partitions = append(partitions, i) + } + } + return &partitioning{ + numVirtualPartitions: numVirtualPartitions, + numHosts: numHosts, + hostId: hostId, + myPartitions: partitions, + }, nil +} + +func (p *partitioning) CalculatePartitionId(key string) int { + // Changing this algorithm is a breaking change and stored data is no longer valid. + hash := fnv.New32a() + hash.Write([]byte(key)) + return int(uint32(hash.Sum32()) % uint32(p.numVirtualPartitions)) +} + +func (p *partitioning) ListPartitions() []int { + return p.myPartitions +} + +func (p *partitioning) CheckPartitionLeader(partitionId int) bool { + return checkPartitionLeader(partitionId, p.numVirtualPartitions, p.hostId, p.numHosts) +} + +func checkPartitionLeader(partitionId, numVirtualPartitions, hostId, numHosts int) bool { + // First, make sure partitionId is within range. + sanitizedPartitionId := positiveMod(partitionId, numVirtualPartitions) + // Now, check if the mods match. + return positiveMod(hostId, numHosts) == positiveMod(sanitizedPartitionId, numHosts) +} + +func positiveMod(key, max int) int { + result := key % max + if result < 0 { + result += max + } + return result +} diff --git a/partition_test.go b/partition_test.go new file mode 100644 index 0000000..aca4209 --- /dev/null +++ b/partition_test.go @@ -0,0 +1,155 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package etcdcron + +import ( + "math/rand" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPartitionCalculation(t *testing.T) { + // If this test fails, it means the partitioning logic changes and it is a breaking change. + // Users cannot run or delete their persisted jobs if this fails. + numVirtualPartitions := 53 + numHosts := 1 + p, _ := NewPartitioning(numVirtualPartitions, numHosts, 0) + assert.Equal(t, 14, p.CalculatePartitionId("1")) + assert.Equal(t, 49, p.CalculatePartitionId("10")) +} + +func TestPartitionLeadershipWith3Hosts(t *testing.T) { + numVirtualPartitions := 53 + numHosts := 3 + p0, _ := NewPartitioning(numVirtualPartitions, numHosts, 0) + p1, _ := NewPartitioning(numVirtualPartitions, numHosts, 1) + p2, _ := NewPartitioning(numVirtualPartitions, numHosts, 2) + + assert.True(t, p0.CheckPartitionLeader(0)) + assert.False(t, p0.CheckPartitionLeader(1)) + assert.False(t, p0.CheckPartitionLeader(2)) + + assert.False(t, p1.CheckPartitionLeader(0)) + assert.True(t, p1.CheckPartitionLeader(1)) + assert.False(t, p1.CheckPartitionLeader(2)) + + assert.False(t, p2.CheckPartitionLeader(0)) + assert.False(t, p2.CheckPartitionLeader(1)) + assert.True(t, p2.CheckPartitionLeader(2)) + + p3, p3Err := NewPartitioning(numVirtualPartitions, numHosts, 3) + assert.Nil(t, p3) + assert.NotNil(t, p3Err) + + p4, p4Err := NewPartitioning(numVirtualPartitions, numHosts, 4) + assert.Nil(t, p4) + assert.NotNil(t, p4Err) + + pNeg, pNegErr := NewPartitioning(numVirtualPartitions, numHosts, -1) + assert.Nil(t, pNeg) + assert.NotNil(t, pNegErr) +} + +func TestPartitionLeadershipDist(t *testing.T) { + + tryAllHostsAndPartitions := func(numVirtParts, numHosts int) []int { + count := make([]int, numHosts) + for hostId := 0; hostId < numHosts; hostId++ { + p, _ := NewPartitioning(numVirtParts, numHosts, hostId) + for partitionId := 0; partitionId < numVirtParts; partitionId++ { + if p.CheckPartitionLeader(partitionId) { + count[hostId] = count[hostId] + 1 + } + } + } + + return count + } + + t.Run("No partitioning", func(t *testing.T) { + count := tryAllHostsAndPartitions(1, 1) + assert.Equal(t, 1, len(count)) + assert.Equal(t, 1, count[0]) + }) + + t.Run("53 partitions and 3 hosts", func(t *testing.T) { + count := tryAllHostsAndPartitions(53, 3) + assert.Equal(t, 3, len(count)) + assert.Equal(t, 18, count[0]) + assert.Equal(t, 18, count[1]) + assert.Equal(t, 17, count[2]) + }) + + t.Run("31 partitions and 3 hosts", func(t *testing.T) { + count := tryAllHostsAndPartitions(31, 3) + assert.Equal(t, 3, len(count)) + assert.Equal(t, 11, count[0]) + assert.Equal(t, 10, count[1]) + assert.Equal(t, 10, count[2]) + }) + + t.Run("53 partitions and 1 host", func(t *testing.T) { + count := tryAllHostsAndPartitions(53, 1) + assert.Equal(t, 1, len(count)) + assert.Equal(t, 53, count[0]) + }) + + t.Run("3 partitions and 3 hosts", func(t *testing.T) { + count := tryAllHostsAndPartitions(3, 3) + assert.Equal(t, 3, len(count)) + assert.Equal(t, 1, count[0]) + assert.Equal(t, 1, count[1]) + assert.Equal(t, 1, count[2]) + }) +} + +func TestPartitionIdWithinRange(t *testing.T) { + seed := int64(7) // Fixed seed value for reproducibility + randSource := rand.NewSource(seed) + randGen := rand.New(randSource) + numSamples := 1000 + + numVirtualPartitions := 53 + p, _ := NewPartitioning(53, 1, 0) + maxPartitionId := numVirtualPartitions - 1 + + for i := 0; i < numSamples; i++ { + randomUUID, err := uuid.NewRandomFromReader(randGen) + require.NoError(t, err) + partitionId := p.CalculatePartitionId(randomUUID.String()) + assert.GreaterOrEqual(t, partitionId, 0) + assert.LessOrEqual(t, partitionId, maxPartitionId) + } +} + +func TestNoPartitioningSingleLeader(t *testing.T) { + partitionIdStart := -10 + partitionIdEnd := 30 + + p := NoPartitioning() + for partitionId := partitionIdStart; partitionId <= partitionIdEnd; partitionId++ { + assert.True(t, p.CheckPartitionLeader(partitionId)) + } +} + +func TestNoPartitioningSinglePartition(t *testing.T) { + seed := int64(7) // Fixed seed value for reproducibility + randSource := rand.NewSource(seed) + randGen := rand.New(randSource) + numSamples := 1000 + + p := NoPartitioning() + + for i := 0; i < numSamples; i++ { + randomUUID, err := uuid.NewRandomFromReader(randGen) + require.NoError(t, err) + partitionId := p.CalculatePartitionId(randomUUID.String()) + assert.Equal(t, 0, partitionId) + } +} diff --git a/proto/job.proto b/proto/job.proto new file mode 100644 index 0000000..1d2b9d7 --- /dev/null +++ b/proto/job.proto @@ -0,0 +1,21 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +syntax = "proto3"; + +package etcdcron; + +import "google/protobuf/any.proto"; + +option go_package = "github.com/diagridio/go-etcd-cron;etcdcron"; + +// JobRecord is the record persisted as value in Etcd (not the key). +// Namespace is handled by prefixing the key in the database. +message JobRecord { + string name = 1; + string rhythm = 2; + string type = 3; + google.protobuf.Any payload = 4; +} \ No newline at end of file diff --git a/store.go b/store.go new file mode 100644 index 0000000..cda1eaf --- /dev/null +++ b/store.go @@ -0,0 +1,203 @@ +/* +Copyright (c) 2024 Diagrid Inc. +Licensed under the MIT License. +*/ + +package etcdcron + +import ( + "context" + "fmt" + "log" + "path/filepath" + "sync" + + "github.com/pkg/errors" + "go.etcd.io/etcd/api/v3/mvccpb" + etcdclient "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/mirror" + "google.golang.org/protobuf/proto" +) + +// The JobStore persists and reads jobs from Etcd. +type JobStore interface { + Start(ctx context.Context) error + Put(ctx context.Context, job Job) error + Delete(ctx context.Context, jobName string) error + Wait() +} + +type etcdStore struct { + runWaitingGroup sync.WaitGroup + etcdClient *etcdclient.Client + kvStore etcdclient.KV + partitioning Partitioning + organizer Organizer + putCallback func(context.Context, Job) error + deleteCallback func(context.Context, string) error +} + +type noStore struct { + putCallback func(context.Context, Job) error + deleteCallback func(context.Context, string) error +} + +func NoStore( + putCallback func(context.Context, Job) error, + deleteCallback func(context.Context, string) error) JobStore { + return &noStore{ + putCallback: putCallback, + deleteCallback: deleteCallback, + } +} + +func (s *noStore) Start(ctx context.Context) error { + return nil +} + +func (s *noStore) Put(ctx context.Context, job Job) error { + s.putCallback(ctx, job) + return nil +} + +func (s *noStore) Delete(ctx context.Context, jobName string) error { + s.deleteCallback(ctx, jobName) + return nil +} + +func (s *noStore) Wait() {} + +func NewEtcdJobStore( + client *etcdclient.Client, + organizer Organizer, + partitioning Partitioning, + putCallback func(context.Context, Job) error, + deleteCallback func(context.Context, string) error) JobStore { + return &etcdStore{ + etcdClient: client, + kvStore: etcdclient.NewKV(client), + partitioning: partitioning, + organizer: organizer, + putCallback: putCallback, + deleteCallback: deleteCallback, + } +} + +func (s *etcdStore) Start(ctx context.Context) error { + for _, partitionId := range s.partitioning.ListPartitions() { + // TODO(artursouza): parallelize this per partition. + partitionPrefix := s.organizer.JobsPath(partitionId) + "/" + partitionSyncer := mirror.NewSyncer(s.etcdClient, partitionPrefix, 0) + rc, errc := partitionSyncer.SyncBase(ctx) + + for r := range rc { + for _, kv := range r.Kvs { + err := s.notifyPut(ctx, kv, s.putCallback) + if err != nil { + return err + } + } + } + + err := <-errc + if err != nil { + return err + } + + s.sync(ctx, partitionPrefix, partitionSyncer) + } + + return nil +} + +func (s *etcdStore) Put(ctx context.Context, job Job) error { + opts := []etcdclient.OpOption{} + if job.TTL > 0 { + // Create a lease + lease, err := s.etcdClient.Grant(ctx, job.TTL) + if err != nil { + return errors.Errorf("failed to create lease to save job %s: %v", job.Name, err) + } + + opts = append(opts, etcdclient.WithLease(lease.ID)) + } + + record := &JobRecord{ + Name: job.Name, + Rhythm: job.Rhythm, + Type: job.Type, + Payload: job.Payload, + } + bytes, err := proto.Marshal(record) + if err != nil { + return err + } + _, err = s.kvStore.Put( + ctx, + s.organizer.JobPath(job.Name), + string(bytes), + opts..., + ) + return err +} + +func (s *etcdStore) Delete(ctx context.Context, jobName string) error { + _, err := s.kvStore.Delete( + ctx, + s.organizer.JobPath(jobName)) + return err +} + +func (s *etcdStore) Wait() { + s.runWaitingGroup.Wait() +} + +func (s *etcdStore) notifyPut(ctx context.Context, kv *mvccpb.KeyValue, callback func(context.Context, Job) error) error { + record := JobRecord{} + err := proto.Unmarshal(kv.Value, &record) + if err != nil { + return fmt.Errorf("could not unmarshal job for key %s: %v", string(kv.Key), err) + } + if record.GetName() == "" || record.GetRhythm() == "" { + return fmt.Errorf("could not deserialize job for key %s", string(kv.Key)) + } + + return callback(ctx, Job{ + Name: record.Name, + Rhythm: record.Rhythm, + Type: record.Type, + Payload: record.Payload, + }) +} + +func (s *etcdStore) notifyDelete(ctx context.Context, name string, callback func(context.Context, string) error) error { + return callback(ctx, name) +} + +func (s *etcdStore) sync(ctx context.Context, prefix string, syncer mirror.Syncer) { + s.runWaitingGroup.Add(1) + go func() { + log.Printf("Started sync for path: %s\n", prefix) + wc := syncer.SyncUpdates(ctx) + for { + select { + case <-ctx.Done(): + s.runWaitingGroup.Done() + return + case wr := <-wc: + for _, ev := range wr.Events { + t := ev.Type + switch t { + case mvccpb.PUT: + s.notifyPut(ctx, ev.Kv, s.putCallback) + case mvccpb.DELETE: + _, name := filepath.Split(string(ev.Kv.Key)) + s.notifyDelete(ctx, name, s.deleteCallback) + default: + log.Printf("Unknown etcd event type: %v", t.String()) + } + } + } + } + }() +}