-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathservice.go
335 lines (294 loc) · 9.42 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
package service
import (
"context"
"database/sql"
"fmt"
"net/http"
"os"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v4/pgxpool"
_ "github.com/jackc/pgx/v4/stdlib"
pgdl "github.com/quay/claircore/pkg/distlock/postgres"
"github.com/remind101/migrate"
"github.com/rs/zerolog"
"github.com/quay/clair/v4/indexer"
"github.com/quay/clair/v4/matcher"
"github.com/quay/clair/v4/notifier"
namqp "github.com/quay/clair/v4/notifier/amqp"
"github.com/quay/clair/v4/notifier/keymanager"
"github.com/quay/clair/v4/notifier/migrations"
"github.com/quay/clair/v4/notifier/postgres"
"github.com/quay/clair/v4/notifier/stomp"
"github.com/quay/clair/v4/notifier/webhook"
)
const (
processors = 4
deliveries = 4
)
// Service is an interface wrapping ClairV4's notifier functionality.
//
// This remains an interface so remote clients may implement as well.
type Service interface {
// Retrieves an optional paginated set of notifications given an notification id
Notifications(ctx context.Context, id uuid.UUID, page *notifier.Page) ([]notifier.Notification, notifier.Page, error)
// Deletes the provided notification id
DeleteNotifications(ctx context.Context, id uuid.UUID) error
// KeyStore returns the notifier's KeyStore.
KeyStore(ctx context.Context) notifier.KeyStore
// KeyManager returns the notifier's KeyManager.
KeyManager(ctx context.Context) *keymanager.Manager
}
var _ Service = (*service)(nil)
// service is a local implementation of a notifier service.
type service struct {
store notifier.Store
keystore notifier.KeyStore
keymanager *keymanager.Manager
}
func (s *service) Notifications(ctx context.Context, id uuid.UUID, page *notifier.Page) ([]notifier.Notification, notifier.Page, error) {
return s.store.Notifications(ctx, id, page)
}
func (s *service) DeleteNotifications(ctx context.Context, id uuid.UUID) error {
return s.store.SetDeleted(ctx, id)
}
func (s *service) KeyStore(_ context.Context) notifier.KeyStore {
return s.keystore
}
func (s *service) KeyManager(_ context.Context) *keymanager.Manager {
return s.keymanager
}
// Opts configures the notifier service
type Opts struct {
PollInterval time.Duration
DeliveryInterval time.Duration
Migrations bool
ConnString string
Matcher matcher.Service
Indexer indexer.Service
DisableSummary bool
Client *http.Client
Webhook *webhook.Config
AMQP *namqp.Config
STOMP *stomp.Config
}
// New kicks off the notifier subsystem.
//
// Canceling the ctx will kill any concurrent routines affiliated with
// the notifier.
func New(ctx context.Context, opts Opts) (*service, error) {
log := zerolog.Ctx(ctx).With().
Str("component", "notifier/service/Init").
Logger()
ctx = log.WithContext(ctx)
// initialize store and dist lock pool
store, keystore, lockPool, err := storeInit(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to initialize store and lockpool: %v", err)
}
// kick off key manager
kmgr, err := keyManagerInit(ctx, keystore)
if err != nil {
return nil, fmt.Errorf("failed to initialize key manager: %v", err)
}
// check for test mode
if tm := os.Getenv("NOTIFIER_TEST_MODE"); tm != "" {
log.Info().Str("interval", opts.PollInterval.String()).Msg("NOTIFIER TEST MODE ENABLED. NOTIFIER WILL CREATE TEST NOTIFICATIONS ON A SET INTERVAL")
testModeInit(ctx, &opts)
}
// kick off the poller
log.Info().Str("interval", opts.PollInterval.String()).Msg("initializing poller")
poller := notifier.NewPoller(opts.PollInterval, store, opts.Matcher)
c := poller.Poll(ctx)
// kick off the processors
log.Info().Int("count", processors).Msg("initializing processors")
for i := 0; i < processors; i++ {
// processors only use try locks
distLock := pgdl.NewPool(lockPool, 0)
p := notifier.NewProcessor(
i,
distLock,
opts.Indexer,
opts.Matcher,
store,
)
p.NoSummary = opts.DisableSummary
p.Process(ctx, c)
}
// kick off configured deliverer type
switch {
case opts.Webhook != nil:
if err := webhookDeliveries(ctx, opts, lockPool, store, kmgr); err != nil {
return nil, err
}
case opts.AMQP != nil:
if err := amqpDeliveries(ctx, opts, lockPool, store); err != nil {
return nil, err
}
case opts.STOMP != nil:
if err := stompDeliveries(ctx, opts, lockPool, store); err != nil {
return nil, err
}
}
return &service{
store: store,
keymanager: kmgr,
keystore: keystore,
}, nil
}
// testModeInit will inject a mock Indexer and Matcher into opts
// to be used in testing mode.
func testModeInit(ctx context.Context, opts *Opts) error {
mm := &matcher.Mock{}
im := &indexer.Mock{}
matcherForTestMode(mm)
indexerForTestMode(im)
opts.Matcher = mm
opts.Indexer = im
return nil
}
func storeInit(ctx context.Context, opts Opts) (*postgres.Store, *postgres.KeyStore, *pgxpool.Pool, error) {
log := zerolog.Ctx(ctx).With().
Str("component", "notifier/service/storeInit").
Logger()
ctx = log.WithContext(ctx)
cfg, err := pgxpool.ParseConfig(opts.ConnString)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to parse ConnString: %v", err)
}
cfg.MaxConns = 30
pool, err := pgxpool.ConnectConfig(ctx, cfg)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create ConnPool: %v", err)
}
db, err := sql.Open("pgx", opts.ConnString)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to open db: %v", err)
}
defer db.Close()
// do migrations if requested
if opts.Migrations {
log.Info().Msg("performing notifier migrations")
migrator := migrate.NewPostgresMigrator(db)
migrator.Table = migrations.MigrationTable
err := migrator.Exec(migrate.Up, migrations.Migrations...)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to perform migrations: %w", err)
}
}
log.Info().Msg("initializing notifier store")
store := postgres.NewStore(pool)
keystore := postgres.NewKeyStore(pool)
return store, keystore, pool, nil
}
func keyManagerInit(ctx context.Context, keystore notifier.KeyStore) (*keymanager.Manager, error) {
log := zerolog.Ctx(ctx).With().
Str("component", "notifier/service/keyManagerInit").
Logger()
ctx = log.WithContext(ctx)
log.Debug().Msg("initializing keymanager")
mgr, err := keymanager.NewManager(ctx, keystore)
if err != nil {
return nil, err
}
return mgr, nil
}
func webhookDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, store notifier.Store, keymanager *keymanager.Manager) error {
log := zerolog.Ctx(ctx).With().
Str("component", "notifier/service/webhookInit").
Logger()
ctx = log.WithContext(ctx)
log.Info().Int("count", deliveries).Msg("initializing webhook deliverers")
conf, err := opts.Webhook.Validate()
if err != nil {
return err
}
ds := make([]*notifier.Delivery, 0, deliveries)
for i := 0; i < deliveries; i++ {
distLock := pgdl.NewPool(lockPool, 0)
wh, err := webhook.New(conf, opts.Client, keymanager)
if err != nil {
return fmt.Errorf("failed to create webhook deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, wh, opts.DeliveryInterval, store, distLock)
ds = append(ds, delivery)
}
for _, d := range ds {
d.Deliver(ctx)
}
return nil
}
func amqpDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, store notifier.Store) error {
log := zerolog.Ctx(ctx).With().
Str("component", "notifier/service/amqpInit").
Logger()
ctx = log.WithContext(ctx)
conf, err := opts.AMQP.Validate()
if err != nil {
return fmt.Errorf("amqp validation failed: %v", err)
}
if len(conf.URIs) == 0 {
log.Warn().Msg("amqp delivery was configured with no broker URIs to connect to. delivery of notifications will not occur.")
return nil
}
ds := make([]*notifier.Delivery, 0, deliveries)
for i := 0; i < deliveries; i++ {
distLock := pgdl.NewPool(lockPool, 0)
if conf.Direct {
q, err := namqp.NewDirectDeliverer(conf)
if err != nil {
return fmt.Errorf("failed to create AMQP deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock)
ds = append(ds, delivery)
} else {
q, err := namqp.New(conf)
if err != nil {
return fmt.Errorf("failed to create AMQP deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock)
ds = append(ds, delivery)
}
}
for _, d := range ds {
d.Deliver(ctx)
}
return nil
}
func stompDeliveries(ctx context.Context, opts Opts, lockPool *pgxpool.Pool, store notifier.Store) error {
log := zerolog.Ctx(ctx).With().
Str("component", "notifier/service/stompInit").
Logger()
ctx = log.WithContext(ctx)
conf, err := opts.STOMP.Validate()
if err != nil {
return fmt.Errorf("stomp validation failed: %v", err)
}
if len(conf.URIs) == 0 {
log.Warn().Msg("stomp delivery was configured with no broker URIs to connect to. delivery of notifications will not occur.")
return nil
}
ds := make([]*notifier.Delivery, 0, deliveries)
for i := 0; i < deliveries; i++ {
distLock := pgdl.NewPool(lockPool, 0)
if conf.Direct {
q, err := stomp.NewDirectDeliverer(conf)
if err != nil {
return fmt.Errorf("failed to create STOMP direct deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock)
ds = append(ds, delivery)
} else {
q, err := stomp.New(conf)
if err != nil {
return fmt.Errorf("failed to create STOMP deliverer: %v", err)
}
delivery := notifier.NewDelivery(i, q, opts.DeliveryInterval, store, distLock)
ds = append(ds, delivery)
}
}
for _, d := range ds {
d.Deliver(ctx)
}
return nil
}