-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
160 lines (128 loc) · 3.13 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package main
import (
"context"
"database/sql"
"sync"
"time"
_ "github.com/lib/pq"
"github.com/sirupsen/logrus"
"github.com/Gurpartap/lifecycle-go"
"github.com/Gurpartap/lifecycle-go/example/providers"
)
type Config struct {
StopTimeout time.Duration
DatabaseURL string
Addr string
}
type App struct {
config Config
lifecycle *lifecycle.Lifecycle
logger *logrus.Logger
db *sql.DB
}
func main() {
app := App{
config: Config{
StopTimeout: 60 * time.Second,
DatabaseURL: "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable",
Addr: ":3000",
},
lifecycle: &lifecycle.Lifecycle{},
}
app.logger = providers.Logger()
app.db = providers.DB(app.lifecycle, app.logger, "postgres", app.config.DatabaseURL)
var stoppedSignals []<-chan struct{}
app.lifecycle.Append(&lifecycle.Hook{
OnStart: func(ctx context.Context) error {
app.logger.Infoln("starting cron jobs…")
// the jobs started can select on ctx.Done to determine when to stop
stoppedSignals = append(stoppedSignals, []<-chan struct{}{
_every(ctx, 1*time.Minute, sendEmails(app.logger, app.db)),
_every(ctx, 5*time.Minute, doHousekeepingTasks(app.logger, app.db)),
}...)
app.logger.Printf("started cron jobs")
return nil
},
OnStop: func(ctx context.Context) error {
app.logger.Println("stopping cron jobs…")
// select on ctx.Done to kill on stop timeout
select {
case <-_whenAllStopped(stoppedSignals):
break
case <-ctx.Done():
app.logger.Warnln("timed out trying to stop cron jobs")
}
return nil
},
})
app.lifecycle.Run(app.logger, app.config.StopTimeout)
}
func sendEmails(logger *logrus.Logger, db *sql.DB) func(ctx context.Context) {
return func(ctx context.Context) {
logger.Infoln("sending emails queued in db…")
// did we receive a stop command?
select {
case <-ctx.Done():
// time to stop
return
default:
// keep going
}
// do work
time.Sleep(2 * time.Second)
logger.Infoln("emails sent")
}
}
func doHousekeepingTasks(logger *logrus.Logger, db *sql.DB) func(ctx context.Context) {
return func(ctx context.Context) {
logger.Infoln("deleting stale data from db…")
// did we receive a stop command?
select {
case <-ctx.Done():
// time to stop
return
default:
// keep going
}
// do work
time.Sleep(5 * time.Second)
logger.Infoln("clean up done")
}
}
func _every(ctx context.Context, interval time.Duration, fn func(ctx context.Context)) <-chan struct{} {
stopped := make(chan struct{})
firstRun := make(chan struct{})
ticker := time.NewTicker(interval)
go func() {
for {
select {
case <-firstRun:
fn(ctx)
case <-ticker.C:
fn(ctx)
case <-ctx.Done():
ticker.Stop()
stopped <- struct{}{}
return
}
}
}()
firstRun <- struct{}{}
return stopped
}
func _whenAllStopped(stoppedSignals []<-chan struct{}) <-chan struct{} {
stopped := make(chan struct{}, 1)
wg := sync.WaitGroup{}
for i := range stoppedSignals {
wg.Add(1)
go func(ch <-chan struct{}) {
<-ch
wg.Done()
}(stoppedSignals[i])
}
go func() {
wg.Wait()
stopped <- struct{}{}
}()
return stopped
}