forked from vgarvardt/gue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdoc.go
122 lines (99 loc) · 2.53 KB
/
doc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*
Package gue implements Golang queues on top of PostgreSQL.
It uses transaction-level locks for concurrent work.
# PostgreSQL drivers
Package supports several PostgreSQL drivers using adapter interface internally.
Currently, adapters for the following drivers have been implemented:
- github.com/jackc/pgx/v4
- github.com/jackc/pgx/v5
- github.com/lib/pq
# Usage
Here is a complete example showing worker setup for pgx/v4 and two jobs enqueued, one with a delay:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/sync/errgroup"
"github.com/vgarvardt/gue/v5"
"github.com/vgarvardt/gue/v5/adapter/pgxv5"
)
type printNameArgs struct {
Name string
}
func main() {
printName := func(j *gue.Job) error {
var args printNameArgs
if err := json.Unmarshal(j.Args, &args); err != nil {
return err
}
fmt.Printf("Hello %s!\n", args.Name)
return nil
}
pgxCfg, err := pgxpool.ParseConfig(os.Getenv("DATABASE_URL"))
if err != nil {
log.Fatal(err)
}
pgxPool, err := pgxpool.NewWithConfig(context.Background(), pgxCfg)
if err != nil {
log.Fatal(err)
}
defer pgxPool.Close()
poolAdapter := pgxv5.NewConnPool(pgxPool)
gc, err := gue.NewClient(poolAdapter)
if err != nil {
log.Fatal(err)
}
wm := gue.WorkMap{
"PrintName": printName,
}
// create a pool w/ 2 workers
workers, err := gue.NewWorkerPool(gc, wm, 2, gue.WithPoolQueue("name_printer"))
if err != nil {
log.Fatal(err)
}
ctx, shutdown := context.WithCancel(context.Background())
// work jobs in goroutine
g, gctx := errgroup.WithContext(ctx)
g.Go(func() error {
err := workers.Run(gctx)
if err != nil {
// In a real-world applications, use a better way to shut down
// application on unrecoverable error. E.g. fx.Shutdowner from
// go.uber.org/fx module.
log.Fatal(err)
}
return err
})
args, err := json.Marshal(printNameArgs{Name: "vgarvardt"})
if err != nil {
log.Fatal(err)
}
j := &gue.Job{
Type: "PrintName",
Args: args,
}
if err := gc.Enqueue(context.Background(), j); err != nil {
log.Fatal(err)
}
j := &gue.Job{
Type: "PrintName",
RunAt: time.Now().UTC().Add(30 * time.Second), // delay 30 seconds
Args: args,
}
if err := gc.Enqueue(context.Background(), j); err != nil {
log.Fatal(err)
}
time.Sleep(30 * time.Second) // wait for while
// send shutdown signal to worker
shutdown()
if err := g.Wait(); err != nil {
log.Fatal(err)
}
}
*/
package gue