Skip to content

Commit

Permalink
Persist and partition job (#3)
Browse files Browse the repository at this point in the history
* WIP: change lock to be per job.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Lock per job + namespace handing.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Partitioning of jobs.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* job store.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* GH workflow to test.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Remove dependabot automerge workflow.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Record job using protobuf.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Handle TTL.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Some random fixes.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix TestLocalTimezone flakiness.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix more flaky tests.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Addressed comments.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Stop via context only.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Remove update routine.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* nit: comment

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Fix detected race conditions.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Update RADME.md and docs.go

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Addressing more comments.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Reduce locking contention for cron dist mutex reuse.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Does not try to recover from panic.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Allow test to take longer.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
artursouza authored Mar 13, 2024
1 parent 28dd6ee commit 04fed7e
Show file tree
Hide file tree
Showing 20 changed files with 1,841 additions and 363 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright (c) 2024 Diagrid Inc.
# Licensed under the MIT License.

name: "Build and testing"

on:
push:
branches:
- master
pull_request:
branches:
- master

jobs:
test:
name: Build and Test
runs-on: ubuntu-latest
timeout-minutes: 30
services:
etcd:
image: quay.io/coreos/etcd:v3.5.5
env:
ETCD_ADVERTISE_CLIENT_URLS: http://0.0.0.0:2379
ETCD_LISTEN_CLIENT_URLS: http://0.0.0.0:2379
ports:
- 2379:2379
env:
GOOS: linux
GOARCH: amd64
GOPROXY: https://proxy.golang.org
steps:
- name: Check out code
uses: actions/checkout@v4
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'
cache: 'false'
- name: Check mod tidy
run: |
go mod tidy
git diff --exit-code ./go.mod # check no changes
git diff --exit-code ./go.sum # check no changes
- name: Run test
run: go test -v -timeout 300s --race
17 changes: 0 additions & 17 deletions .github/workflows/dependabot.yml

This file was deleted.

45 changes: 45 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
################################################################################
# Target: modtidy #
################################################################################
.PHONY: modtidy
modtidy:
go mod tidy

################################################################################
# Target: gen-proto #
################################################################################
PROTOC ?=protoc
PROTOC_VERSION = 3.21.12
PROTOBUF_SUITE_VERSION = 21.12
PROTOC_GEN_GO_VERSION = v1.28.1

PROTOC_GEN_GO_GRPC_VERSION = 1.2.0

PROTOS:=$(shell ls proto)
PROTO_PREFIX:=github.com/diagridio/go-etcd-cron

.PHONY: check-proto-version
check-proto-version: ## Checking the version of proto related tools
@test "$(shell protoc --version)" = "libprotoc $(PROTOC_VERSION)" \
|| { echo "please use protoc $(PROTOC_VERSION) (protobuf $(PROTOBUF_SUITE_VERSION)) to generate proto"; exit 1; }

@test "$(shell protoc-gen-go-grpc --version)" = "protoc-gen-go-grpc $(PROTOC_GEN_GO_GRPC_VERSION)" \
|| { echo "please use protoc-gen-go-grpc $(PROTOC_GEN_GO_GRPC_VERSION) to generate proto"; exit 1; }

@test "$(shell protoc-gen-go --version 2>&1)" = "protoc-gen-go $(PROTOC_GEN_GO_VERSION)" \
|| { echo "please use protoc-gen-go $(PROTOC_GEN_GO_VERSION) to generate proto"; exit 1; }

# Generate archive files for each binary
# $(1): the binary name to be archived
define genProtoc
.PHONY: gen-proto-$(1)
gen-proto-$(1):
$(PROTOC) --go_out=. --go_opt=module=$(PROTO_PREFIX) --go-grpc_out=. --go-grpc_opt=require_unimplemented_servers=false,module=$(PROTO_PREFIX) ./proto/$(1)
endef

$(foreach ITEM,$(PROTOS),$(eval $(call genProtoc,$(ITEM))))

GEN_PROTOS:=$(foreach ITEM,$(PROTOS),gen-proto-$(ITEM))

.PHONY: gen-proto
gen-proto: check-proto-version $(GEN_PROTOS) modtidy
47 changes: 36 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,36 @@ This package aims at implementing a distributed and fault tolerant cron in order

* Run an identical process on several hosts
* Each of these process instantiate a subset of the cron jobs
* Ensure only one of these processes executes a job
* Allow concurrent instances to execute the same job
* Ensure only one of these processes can trigger a job
* Number of cron jobs can scale by increasing the number of hosts
* Eventualy, a trigger can be missed due to change in leadership for a cron job
* Jobs are persisted and loaded from etcd
* Jobs can have a TTL and be auto-deleted
* Jobs can have a delayed start
* Jobs can have a max run count
* Cron can be used for multiple tenants, via namespacing

## Etcd Initialization
## Getting started

By default the library creates an etcd client on `127.0.0.1:2379`

```go
c, _ := etcdcron.NewEtcdMutexBuilder(clientv3.Config{
Endpoints: []string{"etcd-host1:2379", "etcd-host2:2379"},
})
cron, _ := etcdcron.New(WithEtcdMutexBuilder(c))
cron, _ := etcdcron.New(
WithEtcdMutexBuilder(c),
WithNamespace("my-example"), // multi-tenancy
WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error {
log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value))
return nil
}),
)
cron.AddJob(Job{
Name: "job0",
Rhythm: "*/2 * * * * *",
Func: func(ctx context.Context) error {
// Handler
},
Type: "my-job-type",
Payload: &anypb.Any{Value: []byte("hello every 2s")},
})
```

Expand All @@ -46,17 +57,31 @@ etcdErrorsHandler := func(ctx context.Context, job etcdcron.Job, err error) {
cron, _ := etcdcron.New(
WithErrorsHandler(errorsHandler),
WithEtcdErrorsHandler(etcdErrorsHandler),
WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error {
log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value))
return nil
}),
)

cron.AddJob(Job{
cron.AddJob(context.TODO(), Job{
Name: "job0",
Rhythm: "*/2 * * * * *",
Func: func(ctx context.Context) error {
// Handler
},
Type: "my-job-type",
Payload: &anypb.Any{Value: []byte("hello every 2s")},
})
```

## Tests

Pre-requisites to run the tests locally:
- Run etcd locally via one of the options below:
- Locally: [Install etcd](https://etcd.io/docs/v3.4/install/) then run `etcd --logger=zap`
- Docker: [Running a single node etcd](https://etcd.io/docs/v3.5/op-guide/container/#running-a-single-node-etcd-1)

```bash
go test -v --race
```

## History

This is a fork of [https://github.com/Scalingo/go-etcd-cron](https://github.com/Scalingo/go-etcd-cron), which had been based on [https://github.com/robfig/cron](https://github.com/robfig/cron).
Expand Down
115 changes: 115 additions & 0 deletions collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
Copyright (c) 2024 Diagrid Inc.
Licensed under the MIT License.
*/

package etcdcron

import (
"context"
"sync"
"time"
)

// collector garbage collects items after a globally configured TTL.
type Collector struct {
ttl int64 // time to wait to perform collection
bufferTime int64 // arbitrary delay to allow buffering of operations

running bool
mutex sync.RWMutex
operations []*collectorEntry
changed chan bool
runWaitingGroup sync.WaitGroup
}

type collectorEntry struct {
expiration int64
op func(ctx context.Context)
}

func NewCollector(ttl int64, bufferTime int64) *Collector {
return &Collector{
ttl: ttl,
bufferTime: bufferTime,
running: false,
changed: make(chan bool),
operations: []*collectorEntry{},
}
}

func (c *Collector) Start(ctx context.Context) {
if c.running {
return
}

c.running = true

doIt := func(ctx context.Context) {
c.mutex.Lock()
defer c.mutex.Unlock()

now := time.Now().Unix()
nextStartIndex := -1
for i, o := range c.operations {
if o.expiration <= now {
o.op(ctx)
} else {
nextStartIndex = i
break
}
}

if nextStartIndex >= 0 {
c.operations = c.operations[nextStartIndex:]
return
}
c.operations = []*collectorEntry{}
}

waitTimeForNext := func() time.Duration {
c.mutex.RLock()
defer c.mutex.RUnlock()

now := time.Now().Unix()
if len(c.operations) > 0 {
diff := c.operations[0].expiration - now
if diff <= 0 {
return 0
}

return time.Duration(diff)
}

// Some arbitrarily large number that gives us certainty that some record will be added.
return 24 * time.Hour
}

c.runWaitingGroup.Add(1)
go func(ctx context.Context) {
for {
doIt(ctx)
select {
case <-time.After(waitTimeForNext() + time.Duration(c.bufferTime)):
continue
case <-c.changed:
continue
case <-ctx.Done():
c.runWaitingGroup.Done()
return
}
}
}(ctx)
}

func (c *Collector) Add(op func(ctx context.Context)) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.operations = append(c.operations, &collectorEntry{
expiration: time.Now().Unix() + c.ttl,
})
}

func (c *Collector) Wait() {
c.runWaitingGroup.Wait()
}
Loading

0 comments on commit 04fed7e

Please sign in to comment.