From 7059bf0676ff7a5a689ba3539c889a7600a5aebb Mon Sep 17 00:00:00 2001 From: Artur Souza Date: Fri, 15 Mar 2024 09:10:04 -0700 Subject: [PATCH] Better handle of Dapr compatibility (#6) * Better handle TTL for Dapr compat. Signed-off-by: Artur Souza * Update .examples/cron_example.go Co-authored-by: Josh van Leeuwen Signed-off-by: Artur Souza --------- Signed-off-by: Artur Souza Co-authored-by: Josh van Leeuwen --- .examples/cron_example.go | 29 ++++++------- README.md | 5 ++- cron.go | 9 ++-- cron_test.go | 86 +++++++++++++++++++++------------------ doc.go | 5 --- job.go | 34 ++++++---------- proto/storage/job.proto | 2 +- storage/job.pb.go | 66 +++++++++++++++++------------- storage/store.go | 5 ++- 9 files changed, 122 insertions(+), 119 deletions(-) diff --git a/.examples/cron_example.go b/.examples/cron_example.go index 5582edb..8dae85d 100644 --- a/.examples/cron_example.go +++ b/.examples/cron_example.go @@ -14,8 +14,10 @@ import ( "strconv" "sync" "syscall" + "time" etcdcron "github.com/diagridio/go-etcd-cron" + "github.com/diagridio/go-etcd-cron/partitioning" "google.golang.org/protobuf/types/known/anypb" ) @@ -39,15 +41,15 @@ func main() { log.Printf("starting hostId=%d for total of %d hosts and %d partitions", hostId, numHosts, numPartitions) - p, err := etcdcron.NewPartitioning(numPartitions, numHosts, hostId) + p, err := partitioning.NewPartitioning(numPartitions, numHosts, hostId) if err != nil { log.Fatal("fail to create partitioning", err) } cron, err := etcdcron.New( etcdcron.WithNamespace(namespace), etcdcron.WithPartitioning(p), - etcdcron.WithTriggerFunc(func(ctx context.Context, triggerType string, payload *anypb.Any) error { - log.Printf("Trigger from pid %d: %s %s\n", os.Getpid(), triggerType, string(payload.Value)) + etcdcron.WithTriggerFunc(func(ctx context.Context, metadata map[string]string, payload *anypb.Any) error { + log.Printf("Trigger from pid %d: %s\n", os.Getpid(), string(payload.Value)) return nil }), ) @@ -72,51 +74,50 @@ func main() { if os.Getenv("ADD") == "1" { cron.AddJob(ctx, etcdcron.Job{ - Name: "every-2s-b34w5y5hbwthjs", - Rhythm: "*/2 * * * * *", - Type: "stdout", // can be anything the client wants - Payload: &anypb.Any{Value: []byte("ev 2s")}, + Name: "every-2s-dFG3F3DSGSGds", + Rhythm: "*/2 * * * * *", + StartTime: time.Time{}, // even seconds + Payload: &anypb.Any{Value: []byte("ev 2s even")}, + }) + cron.AddJob(ctx, etcdcron.Job{ + Name: "every-2s-b34w5y5hbwthjs", + Rhythm: "*/2 * * * * *", + StartTime: time.Time{}.Add(time.Second), // odd seconds + Payload: &anypb.Any{Value: []byte("ev 2s odd")}, }) cron.AddJob(ctx, etcdcron.Job{ Name: "every-10s-bnsf45354wbdsnd", Rhythm: "*/10 * * * * *", - Type: "stdout", // can be anything the client wants Payload: &anypb.Any{Value: []byte("ev 10s")}, }) cron.AddJob(ctx, etcdcron.Job{ Name: "every-3s-mdhgm764324rqdg", Rhythm: "*/3 * * * * *", - Type: "stdout", // can be anything the client wants Payload: &anypb.Any{Value: []byte("ev 3s")}, }) cron.AddJob(ctx, etcdcron.Job{ Name: "every-4s-vdafbrtjnysh245", Rhythm: "*/4 * * * * *", - Type: "stdout", // can be anything the client wants Payload: &anypb.Any{Value: []byte("ev 4s")}, }) cron.AddJob(ctx, etcdcron.Job{ Name: "every-5s-adjbg43q5rbafbr44", Rhythm: "*/5 * * * * *", - Type: "stdout", // can be anything the client wants Payload: &anypb.Any{Value: []byte("ev 5s")}, }) cron.AddJob(ctx, etcdcron.Job{ Name: "every-6s-abadfh52jgdyj467", Rhythm: "*/6 * * * * *", - Type: "stdout", // can be anything the client wants Payload: &anypb.Any{Value: []byte("ev 6s")}, }) cron.AddJob(ctx, etcdcron.Job{ Name: "every-7s-bndasfbn4q55fgn", Rhythm: "*/7 * * * * *", - Type: "stdout", // can be anything the client wants Payload: &anypb.Any{Value: []byte("ev 7s")}, }) cron.AddJob(ctx, etcdcron.Job{ Name: "every-1s-then-expire-hadfh452erhh", Rhythm: "*/1 * * * * *", - Type: "stdout", // can be anything the client wants TTL: 10, Payload: &anypb.Any{Value: []byte("ev 1s then expires after 10s")}, }) diff --git a/README.md b/README.md index 3e622a6..9823bb9 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,9 @@ cron, _ := etcdcron.New( cron.AddJob(Job{ Name: "job0", Rhythm: "*/2 * * * * *", - Type: "my-job-type", + Metadata: map[string]string{ + "type", "my-job-type" + }, Payload: &anypb.Any{Value: []byte("hello every 2s")}, }) ``` @@ -69,7 +71,6 @@ cron, _ := etcdcron.New( cron.AddJob(context.TODO(), Job{ Name: "job0", Rhythm: "*/2 * * * * *", - Type: "my-job-type", Payload: &anypb.Any{Value: []byte("hello every 2s")}, }) ``` diff --git a/cron.go b/cron.go index a93426c..c64c75a 100644 --- a/cron.go +++ b/cron.go @@ -43,7 +43,7 @@ type Cron struct { etcdErrorsHandler func(context.Context, Job, error) errorsHandler func(context.Context, Job, error) funcCtx func(context.Context, Job) context.Context - triggerFunc func(context.Context, string, *anypb.Any) error + triggerFunc func(context.Context, map[string]string, *anypb.Any) error running bool runWaitingGroup sync.WaitGroup etcdclient *etcdclient.Client @@ -124,7 +124,7 @@ func WithFuncCtx(f func(context.Context, Job) context.Context) CronOpt { }) } -func WithTriggerFunc(f func(context.Context, string, *anypb.Any) error) CronOpt { +func WithTriggerFunc(f func(context.Context, map[string]string, *anypb.Any) error) CronOpt { return CronOpt(func(cron *Cron) { cron.triggerFunc = f }) @@ -237,8 +237,7 @@ func (c *Cron) GetJob(jobName string) *Job { return nil } - //Avoids caller from modifying scheduler's job - return entry.Job.clone() + return &entry.Job } // Schedule adds a Job to the Cron to be run on the given schedule. @@ -407,7 +406,7 @@ func (c *Cron) run(ctx context.Context) { return } - err = c.triggerFunc(ctx, e.Job.Type, e.Job.Payload) + err = c.triggerFunc(ctx, e.Job.Metadata, e.Job.Payload) if err != nil { go c.errorsHandler(ctx, e.Job, err) return diff --git a/cron_test.go b/cron_test.go index fb79205..6e01242 100644 --- a/cron_test.go +++ b/cron_test.go @@ -47,7 +47,7 @@ func TestStopCausesJobsToNotRun(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { wg.Done() return nil })) @@ -79,7 +79,7 @@ func TestAddBeforeRunning(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { if calledAlready { return nil } @@ -117,7 +117,7 @@ func TestAddWhileRunning(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { wg.Done() return nil })) @@ -150,7 +150,7 @@ func TestSnapshotEntries(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { wg.Done() return nil })) @@ -190,8 +190,8 @@ func TestDelayedAdd(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { - if s == "noop" { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + if m["op"] == "noop" { return nil } if called { @@ -207,9 +207,9 @@ func TestDelayedAdd(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cron.AddJob(ctx, Job{ - Name: "test-noop", - Rhythm: "@every 1s", - Type: "noop", + Name: "test-noop", + Rhythm: "@every 1s", + Metadata: singleMetadata("op", "noop"), }) cron.Start(ctx) @@ -244,8 +244,8 @@ func TestMultipleEntries(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { - if s == "return-nil" { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + if m["op"] == "return-nil" { return nil } @@ -257,18 +257,18 @@ func TestMultipleEntries(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) cron.AddJob(ctx, Job{ - Name: "test-multiple-1", - Rhythm: "0 0 0 1 1 ?", - Type: "return-nil", + Name: "test-multiple-1", + Rhythm: "0 0 0 1 1 ?", + Metadata: singleMetadata("op", "return-nil"), }) cron.AddJob(ctx, Job{ Name: "test-multiple-2", Rhythm: "* * * * * ?", }) cron.AddJob(ctx, Job{ - Name: "test-multiple-3", - Rhythm: "0 0 0 31 12 ?", - Type: "return-nil", + Name: "test-multiple-3", + Rhythm: "0 0 0 31 12 ?", + Metadata: singleMetadata("op", "return-nil"), }) cron.AddJob(ctx, Job{ Name: "test-multiple-4", @@ -295,8 +295,8 @@ func TestRunningJobTwice(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { - if s == "return-nil" { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + if m["op"] == "return-nil" { return nil } @@ -308,14 +308,14 @@ func TestRunningJobTwice(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) cron.AddJob(ctx, Job{ - Name: "test-twice-1", - Rhythm: "0 0 0 1 1 ?", - Type: "return-nil", + Name: "test-twice-1", + Rhythm: "0 0 0 1 1 ?", + Metadata: singleMetadata("op", "return-nil"), }) cron.AddJob(ctx, Job{ - Name: "test-twice-2", - Rhythm: "0 0 0 31 12 ?", - Type: "return-nil", + Name: "test-twice-2", + Rhythm: "0 0 0 31 12 ?", + Metadata: singleMetadata("op", "return-nil"), }) cron.AddJob(ctx, Job{ Name: "test-twice-3", @@ -341,8 +341,8 @@ func TestRunningMultipleSchedules(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { - if s == "return-nil" { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { + if m["op"] == "return-nil" { return nil } @@ -355,22 +355,22 @@ func TestRunningMultipleSchedules(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cron.AddJob(ctx, Job{ - Name: "test-mschedule-1", - Rhythm: "0 0 0 1 1 ?", - Type: "return-nil", + Name: "test-mschedule-1", + Rhythm: "0 0 0 1 1 ?", + Metadata: singleMetadata("op", "return-nil"), }) cron.AddJob(ctx, Job{ - Name: "test-mschedule-2", - Rhythm: "0 0 0 31 12 ?", - Type: "return-nil", + Name: "test-mschedule-2", + Rhythm: "0 0 0 31 12 ?", + Metadata: singleMetadata("op", "return-nil"), }) cron.AddJob(ctx, Job{ Name: "test-mschedule-3", Rhythm: "* * * * * ?", }) - cron.schedule(rhythm.Every(time.Minute), &Job{Name: "test-mschedule-4", Type: "return-nil"}) + cron.schedule(rhythm.Every(time.Minute), &Job{Name: "test-mschedule-4", Metadata: singleMetadata("op", "return-nil")}) cron.schedule(rhythm.Every(time.Second), &Job{Name: "test-mschedule-5"}) - cron.schedule(rhythm.Every(time.Hour), &Job{Name: "test-mschedule-6", Type: "return-nil"}) + cron.schedule(rhythm.Every(time.Hour), &Job{Name: "test-mschedule-6", Metadata: singleMetadata("op", "return-nil")}) cron.Start(ctx) defer func() { @@ -397,7 +397,7 @@ func TestLocalTimezone(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { if called.Add(1) > 1 { return nil } @@ -434,7 +434,7 @@ func TestJob(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { if calledAlready { return nil } @@ -506,7 +506,7 @@ func TestCron_Parallel(t *testing.T) { cron1, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { wg.Done() return nil })) @@ -521,7 +521,7 @@ func TestCron_Parallel(t *testing.T) { cron2, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { wg.Done() return nil })) @@ -560,7 +560,7 @@ func TestTTL(t *testing.T) { cron, err := New( WithNamespace(randomNamespace()), - WithTriggerFunc(func(ctx context.Context, s string, p *anypb.Any) error { + WithTriggerFunc(func(ctx context.Context, m map[string]string, p *anypb.Any) error { firedOnce.Store(true) wg.Done() return nil @@ -613,3 +613,9 @@ func stop(cron *Cron, cancel context.CancelFunc) chan bool { func randomNamespace() string { return uuid.New().String() } + +func singleMetadata(key, value string) map[string]string { + m := map[string]string{} + m[key] = value + return m +} diff --git a/doc.go b/doc.go index 5bd481f..b0504b2 100644 --- a/doc.go +++ b/doc.go @@ -15,25 +15,21 @@ them in their own goroutines. c.AddJob(ctx, etcdcron.Job{ Name: "job-100", Rhythm: "*\/2 * * * * *", - Type: "stdout", Payload: &anypb.Any{Value: []byte("Hello every 2s")}, }) c.AddJob(ctx, etcdcron.Job{ Name: "job-101", Rhythm: "0 30 * * * *", - Type: "stdout", Payload: &anypb.Any{Value: []byte("Every hour on the half hour")}, }) c.AddJob(ctx, etcdcron.Job{ Name: "job-102", Rhythm: "@hourly", - Type: "stdout", Payload: &anypb.Any{Value: []byte("Every hour")}, }) c.AddJob(ctx, etcdcron.Job{ Name: "job-103", Rhythm: "@every 1h30m", - Type: "stdout", Payload: &anypb.Any{Value: []byte("Every hour thirty")}, }) @@ -46,7 +42,6 @@ them in their own goroutines. c.AddJob(ctx, etcdcron.Job{ Name: "job-103", Rhythm: "@daily", - Type: "stdout", Payload: &anypb.Any{Value: []byte("Every day")}, }) .. diff --git a/job.go b/job.go index 69c6503..2ea6687 100644 --- a/job.go +++ b/job.go @@ -6,6 +6,8 @@ Licensed under the MIT License. package etcdcron import ( + "time" + "github.com/diagridio/go-etcd-cron/storage" anypb "google.golang.org/protobuf/types/known/anypb" ) @@ -16,30 +18,20 @@ type Job struct { Name string // Cron-formatted rhythm (ie. 0,10,30 1-5 0 * * *) Rhythm string - // The type of trigger that client undertsands - Type string + // Any metadata that the client understands. + Metadata map[string]string // The payload containg all the information for the trigger Payload *anypb.Any // Optional number of seconds until this job expires (if > 0) - TTL int64 -} - -func (j *Job) clone() *Job { - return &Job{ - Name: j.Name, - Rhythm: j.Rhythm, - Type: j.Type, - Payload: j.Payload, - TTL: j.TTL, - } + TTL time.Duration } func (j *Job) toJobRecord() (*storage.JobRecord, storage.JobRecordOptions) { return &storage.JobRecord{ - Name: j.Name, - Rhythm: j.Rhythm, - Type: j.Type, - Payload: j.Payload, + Name: j.Name, + Rhythm: j.Rhythm, + Metadata: j.Metadata, + Payload: j.Payload, }, storage.JobRecordOptions{ TTL: j.TTL, } @@ -51,9 +43,9 @@ func jobFromJobRecord(r *storage.JobRecord) *Job { } return &Job{ - Name: r.Name, - Rhythm: r.Rhythm, - Type: r.Type, - Payload: r.Payload, + Name: r.Name, + Rhythm: r.Rhythm, + Metadata: r.Metadata, + Payload: r.Payload, } } diff --git a/proto/storage/job.proto b/proto/storage/job.proto index aafa677..98adcdc 100644 --- a/proto/storage/job.proto +++ b/proto/storage/job.proto @@ -16,6 +16,6 @@ option go_package = "github.com/diagridio/go-etcd-cron/storage;storage"; message JobRecord { string name = 1; string rhythm = 2; - string type = 3; + map metadata = 3; google.protobuf.Any payload = 4; } \ No newline at end of file diff --git a/storage/job.pb.go b/storage/job.pb.go index 02a1e14..738a2ec 100644 --- a/storage/job.pb.go +++ b/storage/job.pb.go @@ -32,10 +32,10 @@ type JobRecord struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - Rhythm string `protobuf:"bytes,2,opt,name=rhythm,proto3" json:"rhythm,omitempty"` - Type string `protobuf:"bytes,3,opt,name=type,proto3" json:"type,omitempty"` - Payload *anypb.Any `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Rhythm string `protobuf:"bytes,2,opt,name=rhythm,proto3" json:"rhythm,omitempty"` + Metadata map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Payload *anypb.Any `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` } func (x *JobRecord) Reset() { @@ -84,11 +84,11 @@ func (x *JobRecord) GetRhythm() string { return "" } -func (x *JobRecord) GetType() string { +func (x *JobRecord) GetMetadata() map[string]string { if x != nil { - return x.Type + return x.Metadata } - return "" + return nil } func (x *JobRecord) GetPayload() *anypb.Any { @@ -104,19 +104,25 @@ var file_proto_storage_job_proto_rawDesc = []byte{ 0x0a, 0x17, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2f, 0x6a, 0x6f, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x7b, 0x0a, - 0x09, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, - 0x0a, 0x06, 0x72, 0x68, 0x79, 0x74, 0x68, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, - 0x72, 0x68, 0x79, 0x74, 0x68, 0x6d, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x2e, 0x0a, 0x07, 0x70, 0x61, - 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, - 0x79, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, 0x69, 0x64, - 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, 0x74, 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, 0x6e, 0x2f, - 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x3b, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe2, 0x01, + 0x0a, 0x09, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, + 0x16, 0x0a, 0x06, 0x72, 0x68, 0x79, 0x74, 0x68, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x06, 0x72, 0x68, 0x79, 0x74, 0x68, 0x6d, 0x12, 0x3c, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x73, 0x74, 0x6f, 0x72, + 0x61, 0x67, 0x65, 0x2e, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x2e, 0x4d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x2e, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x07, 0x70, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x1a, 0x3b, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, + 0x38, 0x01, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x64, 0x69, 0x61, 0x67, 0x72, 0x69, 0x64, 0x69, 0x6f, 0x2f, 0x67, 0x6f, 0x2d, 0x65, 0x74, + 0x63, 0x64, 0x2d, 0x63, 0x72, 0x6f, 0x6e, 0x2f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x3b, + 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -131,18 +137,20 @@ func file_proto_storage_job_proto_rawDescGZIP() []byte { return file_proto_storage_job_proto_rawDescData } -var file_proto_storage_job_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_proto_storage_job_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_proto_storage_job_proto_goTypes = []interface{}{ (*JobRecord)(nil), // 0: storage.JobRecord - (*anypb.Any)(nil), // 1: google.protobuf.Any + nil, // 1: storage.JobRecord.MetadataEntry + (*anypb.Any)(nil), // 2: google.protobuf.Any } var file_proto_storage_job_proto_depIdxs = []int32{ - 1, // 0: storage.JobRecord.payload:type_name -> google.protobuf.Any - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 1, // 0: storage.JobRecord.metadata:type_name -> storage.JobRecord.MetadataEntry + 2, // 1: storage.JobRecord.payload:type_name -> google.protobuf.Any + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_proto_storage_job_proto_init() } @@ -170,7 +178,7 @@ func file_proto_storage_job_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proto_storage_job_proto_rawDesc, NumEnums: 0, - NumMessages: 1, + NumMessages: 2, NumExtensions: 0, NumServices: 0, }, diff --git a/storage/store.go b/storage/store.go index 2b8232e..3498547 100644 --- a/storage/store.go +++ b/storage/store.go @@ -11,6 +11,7 @@ import ( "log" "path/filepath" "sync" + "time" "github.com/pkg/errors" "go.etcd.io/etcd/api/v3/mvccpb" @@ -31,7 +32,7 @@ type JobStore interface { // Optional params to store a job type JobRecordOptions struct { - TTL int64 + TTL time.Duration } type etcdStore struct { @@ -91,7 +92,7 @@ func (s *etcdStore) Put(ctx context.Context, job *JobRecord, options JobRecordOp opts := []etcdclient.OpOption{} if options.TTL > 0 { // Create a lease - lease, err := s.etcdClient.Grant(ctx, options.TTL) + lease, err := s.etcdClient.Grant(ctx, int64(options.TTL.Seconds())) if err != nil { return errors.Errorf("failed to create lease to save job %s: %v", job.Name, err) }