diff --git a/initialize/services.go b/initialize/services.go index 3f0a712adc..80bdc47f97 100644 --- a/initialize/services.go +++ b/initialize/services.go @@ -21,6 +21,9 @@ const ( // DefaultUpdatePeriod is the default period used for running updaters // within matcher processes. DefaultUpdatePeriod = 30 * time.Minute + // NotifierIssuer is the value used for the issuer claim of any outgoing + // HTTP requests the notifier makes, if PSK auth is configured. + NotifierIssuer = `clair-notifier` ) // Services will initialize the correct ClairCore services @@ -96,12 +99,17 @@ func (i *Init) Services() error { Msg: "notifier: failed to parse poll interval: " + err.Error(), } } + c, _, err := i.conf.Client(nil, notifierClaim) + if err != nil { + return err + } n, err := notifier.New(i.GlobalCTX, notifier.Opts{ DeliveryInterval: dInterval, ConnString: i.conf.Notifier.ConnString, Indexer: libI, Matcher: libV, + Client: c, Migrations: i.conf.Notifier.Migrations, PollInterval: pInterval, Webhook: i.conf.Notifier.Webhook, @@ -172,7 +180,7 @@ func (i *Init) Services() error { return fmt.Errorf("failed to initialize libvuln: %v", err) } // matcher mode needs a remote indexer client - c, auth, err := i.conf.Client(nil, intraservice) + c, auth, err := i.conf.Client(nil, intraserviceClaim) switch { case err != nil: return err @@ -192,7 +200,7 @@ func (i *Init) Services() error { i.Matcher = libV case config.NotifierMode: // notifier uses a remote indexer and matcher - c, auth, err := i.conf.Client(nil, intraservice) + c, auth, err := i.conf.Client(nil, intraserviceClaim) switch { case err != nil: return err @@ -229,12 +237,17 @@ func (i *Init) Services() error { Msg: "notifier: failed to parse poll interval: " + err.Error(), } } + c, _, err = i.conf.Client(nil, notifierClaim) + if err != nil { + return err + } n, err := notifier.New(i.GlobalCTX, notifier.Opts{ DeliveryInterval: dInterval, ConnString: i.conf.Notifier.ConnString, Indexer: remoteIndexer, Matcher: remoteMatcher, + Client: c, Migrations: i.conf.Notifier.Migrations, PollInterval: pInterval, Webhook: i.conf.Notifier.Webhook, @@ -257,4 +270,7 @@ func (i *Init) Services() error { return nil } -var intraservice = jwt.Claims{Issuer: httptransport.IntraserviceIssuer} +var ( + intraserviceClaim = jwt.Claims{Issuer: httptransport.IntraserviceIssuer} + notifierClaim = jwt.Claims{Issuer: NotifierIssuer} +) diff --git a/notifier/service/service.go b/notifier/service/service.go index 6dc9f15cb6..f0f4e2d5cd 100644 --- a/notifier/service/service.go +++ b/notifier/service/service.go @@ -3,12 +3,17 @@ package service import ( "context" "fmt" + "net/http" "os" "time" "github.com/google/uuid" "github.com/jackc/pgx/v4/pgxpool" "github.com/jmoiron/sqlx" + pgdl "github.com/quay/claircore/pkg/distlock/postgres" + "github.com/remind101/migrate" + "github.com/rs/zerolog" + "github.com/quay/clair/v4/indexer" "github.com/quay/clair/v4/matcher" "github.com/quay/clair/v4/notifier" @@ -18,9 +23,6 @@ import ( "github.com/quay/clair/v4/notifier/postgres" "github.com/quay/clair/v4/notifier/stomp" "github.com/quay/clair/v4/notifier/webhook" - pgdl "github.com/quay/claircore/pkg/distlock/postgres" - "github.com/remind101/migrate" - "github.com/rs/zerolog" ) const ( @@ -75,6 +77,7 @@ type Opts struct { ConnString string Matcher matcher.Service Indexer indexer.Service + Client *http.Client Webhook *webhook.Config AMQP *namqp.Config STOMP *stomp.Config @@ -109,12 +112,12 @@ func New(ctx context.Context, opts Opts) (*service, error) { } // kick off the poller - log.Info().Str("interval", opts.PollInterval.String()).Msg("intializing poller") + log.Info().Str("interval", opts.PollInterval.String()).Msg("initializing poller") poller := notifier.NewPoller(opts.PollInterval, store, opts.Matcher) c := poller.Poll(ctx) // kick off the processors - log.Info().Int("count", processors).Msg("intializing processors") + log.Info().Int("count", processors).Msg("initializing processors") for i := 0; i < processors; i++ { // processors only use try locks distLock := pgdl.NewLock(lockPool, 0) @@ -195,7 +198,7 @@ func storeInit(ctx context.Context, opts Opts) (*postgres.Store, *postgres.KeySt } } - log.Info().Msg("intializing notifier store") + log.Info().Msg("initializing notifier store") store := postgres.NewStore(pool) keystore := postgres.NewKeyStore(pool) return store, keystore, lockPool, nil @@ -220,7 +223,7 @@ func webhookDeliveries(ctx context.Context, opts Opts, lockPool *sqlx.DB, store Str("component", "notifier/service/webhookInit"). Logger() ctx = log.WithContext(ctx) - log.Info().Int("count", deliveries).Msg("intializing webhook deliverers") + log.Info().Int("count", deliveries).Msg("initializing webhook deliverers") conf, err := opts.Webhook.Validate() if err != nil { @@ -230,7 +233,7 @@ func webhookDeliveries(ctx context.Context, opts Opts, lockPool *sqlx.DB, store ds := make([]*notifier.Delivery, 0, deliveries) for i := 0; i < deliveries; i++ { distLock := pgdl.NewLock(lockPool, 0) - wh, err := webhook.New(conf, nil, keymanager) + wh, err := webhook.New(conf, opts.Client, keymanager) if err != nil { return fmt.Errorf("failed to create webhook deliverer: %v", err) }