Skip to content

Commit

Permalink
config: swap notifier config structs
Browse files Browse the repository at this point in the history
This change cuts over to use the structs in the config package instead
of structs defined in the various notifier packages. Because the
notifiers were implemented as just keeping a pointer to their config
around, this requires some involved changes.

Signed-off-by: Hank Donnay <hdonnay@redhat.com>
  • Loading branch information
hdonnay committed Nov 3, 2021
1 parent 9ca1e8b commit c6b2d3c
Show file tree
Hide file tree
Showing 19 changed files with 342 additions and 444 deletions.
20 changes: 8 additions & 12 deletions config/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ import (
"net/url"
"strings"
"time"

"github.com/quay/clair/v4/notifier/amqp"
"github.com/quay/clair/v4/notifier/stomp"
"github.com/quay/clair/v4/notifier/webhook"
)

// Notifier provides Clair Notifier node configuration
type Notifier struct {
// Only one of the following should be provided in the configuration
//
// Configures the notifier for webhook delivery
Webhook *Webhook `yaml:"webhook" json:"webhook"`
// Configures the notifier for AMQP delivery.
AMQP *AMQP `yaml:"amqp" json:"amqp"`
// Configures the notifier for STOMP delivery.
STOMP *STOMP `yaml:"stomp" json:"stomp"`
// A Postgres connection string.
//
// Formats:
Expand Down Expand Up @@ -54,14 +58,6 @@ type Notifier struct {
// For a machine-consumption use case, it may be easier to instead have the
// notifier push all the data.
DisableSummary bool `yaml:"disable_summary" json:"disable_summary"`
// Only one of the following should be provided in the configuration
//
// Configures the notifier for webhook delivery
Webhook *webhook.Config `yaml:"webhook" json:"webhook"`
// Configures the notifier for AMQP delivery.
AMQP *amqp.Config `yaml:"amqp" json:"amqp"`
// Configures the notifier for STOMP delivery.
STOMP *stomp.Config `yaml:"stomp" json:"stomp"`
// A "true" or "false" value
//
// Whether Notifier nodes handle migrations to their database.
Expand Down
153 changes: 49 additions & 104 deletions notifier/amqp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,135 +3,80 @@ package amqp
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"net/url"
"os"
"strings"
)

type TLS struct {
// The filesystem path where a root CA can be read.
RootCA string `yaml:"root_ca"`
// The filesystem path where a tls certificate can be read.
Cert string `yaml:"cert"`
// The filesystem path where a tls private key can be read.
Key string `yaml:"key"`
}
"github.com/quay/clair/v4/config"
)

// Exchange are the required fields necessary to check
// the existence of an Exchange
// Exchange are the required fields necessary to check the existence of an
// Exchange.
//
// For more details see: https://godoc.org/github.com/streadway/amqp#Channel.ExchangeDeclarePassive
type Exchange struct {
type exchange struct {
// The name of the exchange
Name string `yaml:"name"`
Name string
// The type of the exchange. Typically:
// "direct"
// "fanout"
// "topic"
// "headers"
Type string `yaml:"type"`
Type string
// Whether the exchange survives server restarts
Durable bool `yaml:"durability"`
Durable bool
// Whether bound consumers define the lifecycle of the Exchange.
AutoDelete bool `yaml:"auto_delete"`
}

// Config provides configuration for an AMQP deliverer.
type Config struct {
// Configures the AMQP delivery to deliver notifications directly to
// the configured Exchange.
//
// If true "Callback" is ignored.
// If false a notifier.Callback is delivered to the queue and clients
// utilize the pagination API to retrieve.
Direct bool
// Specifies the number of notifications delivered in single AMQP message
// when Direct is true.
//
// Ignored if Direct is not true
// If 0 or 1 is provided no rollup occurs and each notification is delivered
// separately.
Rollup int
// The AMQP exchange notifications will be delivered to.
// A passive declare is performed and if the exchange does not exist
// the declare will fail.
Exchange Exchange `yaml:"exchange"`
// The routing key used to route notifications to the desired queue.
RoutingKey string `yaml:"routing_key"`
// The callback url where notifications are retrieved.
Callback string
callback url.URL
// A list of AMQP compliant URI scheme. see: https://www.rabbitmq.com/uri-spec.html
// example: "amqp://user:pass@host:10000/vhost"
//
// The first successful connection will be used by the amqp deliverer.
//
// If "amqps://" broker URI schemas are provided the TLS configuration below is required.
URIs []string `yaml:"uris"`
TLS *TLS `yaml:"tls"`
tls *tls.Config
AutoDelete bool
}

// Validate confirms configuration is valid and fills in private members
// with parsed values on success.
func (c *Config) Validate() (Config, error) {
conf := *c
if c.Exchange.Type == "" {
return conf, fmt.Errorf("AMQP config requires the exchange.type field")
func loadTLSConfig(c *config.AMQP) (*tls.Config, error) {
var cfg tls.Config
usingTLS := false
for _, u := range c.URIs {
if strings.HasPrefix(u, "amqps://") {
usingTLS = true
break
}
}
if !usingTLS {
return &cfg, nil
}
if c.RoutingKey == "" {
return conf, fmt.Errorf("AMQP config requires the routing key field")

if c.TLS.Cert == "" || c.TLS.Key == "" {
return nil, errors.New("both tls cert and key are required")
}
for _, uri := range c.URIs {
if strings.HasPrefix(uri, "amqps://") {
if c.TLS.RootCA == "" {
return conf, fmt.Errorf("amqps:// broker requires tls_root_ca")
if c.TLS.RootCA != "" {
if c.TLS.RootCA != "" {
p, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
if c.TLS.Cert == "" {
return conf, fmt.Errorf("amqps:// broker requires tls_cert")
ca, err := os.ReadFile(c.TLS.RootCA)
if err != nil {
return nil, fmt.Errorf("failed to read tls root ca: %w", err)
}
if c.TLS.Key == "" {
return conf, fmt.Errorf("amqps:// broker requires tls_key")
if !p.AppendCertsFromPEM(ca) {
return nil, errors.New("unable to add certificate to pool")
}
cfg.RootCAs = p
}
}

if c.TLS != nil {
if c.TLS.Cert == "" || c.TLS.Key == "" {
return conf, fmt.Errorf("both tls cert and key are required")
}
TLS := tls.Config{}
if pool, err := x509.SystemCertPool(); err != nil {
TLS.RootCAs = x509.NewCertPool()
} else {
TLS.RootCAs = pool
}

if c.TLS.RootCA != "" {
var err error
ca := []byte{}
if ca, err = ioutil.ReadFile(c.TLS.RootCA); err != nil {
return conf, fmt.Errorf("failed to read tls root ca: %v", err)
}
TLS.RootCAs.AppendCertsFromPEM(ca)
}

var err error
var cert tls.Certificate
if cert, err = tls.LoadX509KeyPair(c.TLS.Cert, c.TLS.Key); err != nil {
return conf, fmt.Errorf("failed to read x509 cert and key pair: %v", err)
}
TLS.Certificates = append(TLS.Certificates, cert)
conf.tls = &TLS
cert, err := tls.LoadX509KeyPair(c.TLS.Cert, c.TLS.Key)
if err != nil {
return nil, fmt.Errorf("failed to read x509 cert and key pair: %w", err)
}
cfg.Certificates = append(cfg.Certificates, cert)

if !c.Direct {
callback, err := url.Parse(c.Callback)
if err != nil {
return conf, fmt.Errorf("failed to parse callback url")
}
conf.callback = *callback
return &cfg, nil
}

func exchangeFrom(c *config.AMQP) exchange {
return exchange{
Name: c.Exchange.Name,
Type: c.Exchange.Type,
Durable: c.Exchange.Durable,
AutoDelete: c.Exchange.AutoDelete,
}
return conf, nil
}
63 changes: 46 additions & 17 deletions notifier/amqp/deliverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"encoding/json"
"fmt"
"net/url"
"path"

"github.com/google/uuid"
samqp "github.com/streadway/amqp"

clairerror "github.com/quay/clair/v4/clair-error"
"github.com/quay/clair/v4/config"
"github.com/quay/clair/v4/notifier"
samqp "github.com/streadway/amqp"
)

// Deliverer is an AMQP deliverer which publishes a notifier.Callback to the
Expand All @@ -19,27 +22,53 @@ import (
// Administrators should configure the Exchange, Queue, and Bindings before starting
// this deliverer.
type Deliverer struct {
conf Config
fo *failOver
callback *url.URL
fo failOver
routingKey string
exchange exchange
rollup int
direct bool
}

func New(conf Config) (*Deliverer, error) {
var c Config
var err error
if c, err = conf.Validate(); err != nil {
func New(conf *config.AMQP) (*Deliverer, error) {
var d Deliverer
if err := d.load(conf); err != nil {
return nil, err
}
fo := &failOver{
Config: c,
return &d, nil
}

func (d *Deliverer) load(conf *config.AMQP) error {
var err error
if !conf.Direct {
d.callback, err = url.Parse(conf.Callback)
if err != nil {
return err
}
}
d.fo.tls, err = loadTLSConfig(conf)
if err != nil {
return err
}

// Copy everything else out of the config:
d.direct = conf.Direct
d.rollup = conf.Rollup
d.exchange = exchangeFrom(conf)
d.routingKey = conf.RoutingKey
d.fo.uris = make([]*url.URL, len(conf.URIs))
for i, u := range conf.URIs {
d.fo.uris[i], err = url.Parse(u)
if err != nil {
return err
}
}
return &Deliverer{
conf: c,
fo: fo,
}, nil
d.fo.exchange = &d.exchange
return nil
}

func (d *Deliverer) Name() string {
return fmt.Sprintf("amqp-%s", d.conf.Exchange.Name)
return fmt.Sprintf("amqp-%s", d.exchange.Name)
}

func (d *Deliverer) Deliver(ctx context.Context, nID uuid.UUID) error {
Expand All @@ -55,7 +84,7 @@ func (d *Deliverer) Deliver(ctx context.Context, nID uuid.UUID) error {
}
defer ch.Close()

callback := d.conf.callback
callback := *d.callback
callback.Path = path.Join(callback.Path, nID.String())

cb := notifier.Callback{
Expand All @@ -72,8 +101,8 @@ func (d *Deliverer) Deliver(ctx context.Context, nID uuid.UUID) error {
Body: b,
}
err = ch.Publish(
d.conf.Exchange.Name,
d.conf.RoutingKey,
d.exchange.Name,
d.routingKey,
false,
false,
msg,
Expand Down
24 changes: 13 additions & 11 deletions notifier/amqp/deliverer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"golang.org/x/sync/errgroup"

"github.com/google/uuid"
"github.com/quay/clair/v4/config"
"github.com/quay/claircore/test/integration"
"github.com/quay/zlog"
samqp "github.com/streadway/amqp"
)

Expand All @@ -23,22 +25,22 @@ const (
// callback is successfully delivered to the amqp broker.
func TestDeliverer(t *testing.T) {
integration.Skip(t)
ctx := zlog.Test(context.Background(), t)
const (
callback = "http://clair-notifier/notifier/api/v1/notifications"
)
var (
uri = os.Getenv("RABBITMQ_CONNECTION_STRING")
queueAndKey = uuid.New().String()
// our test assumes a default exchange
exchange = Exchange{
Name: "",
Type: "direct",
Durable: true,
AutoDelete: false,
}
conf = Config{
Callback: callback,
Exchange: exchange,
conf = config.AMQP{
Callback: callback,
Exchange: config.Exchange{
Name: "",
Type: "direct",
Durable: true,
AutoDelete: false,
},
RoutingKey: queueAndKey,
}
)
Expand Down Expand Up @@ -85,13 +87,13 @@ func TestDeliverer(t *testing.T) {
for i := 0; i < 4; i++ {
g.Go(func() error {
noteID := uuid.New()
d, err := New(conf)
d, err := New(&conf)
if err != nil {
return fmt.Errorf("could not create deliverer: %v", err)
}
// we simply need to check for an error. amqp
// will error if message cannot be delivered to broker
err = d.Deliver(context.TODO(), noteID)
err = d.Deliver(ctx, noteID)
if err != nil {
return fmt.Errorf("failed to deliver message: %v", err)
}
Expand Down
Loading

0 comments on commit c6b2d3c

Please sign in to comment.