Skip to content

Commit

Permalink
notifier: do less work
Browse files Browse the repository at this point in the history
previously the notifer would attempt to create notifications for the
same update operation if no affected manifests were discovered.

with this patch we book keep the update operations we encountered and
short circuit the processing of notifications which do not produce
affected manifests.

a receipt in "delivered" status is written to the database directly for
update operations that do not produce affected manifests. this effectively
removes them from subsequent processing and avoides any delivery
attempts.

Signed-off-by: ldelossa <ldelossa@redhat.com>
  • Loading branch information
ldelossa authored and ldelossa committed Oct 21, 2020
1 parent 37f7791 commit 40abaa6
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 5 deletions.
9 changes: 9 additions & 0 deletions notifier/mockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type MockStore struct {
Notifications_ func(ctx context.Context, id uuid.UUID, page *Page) ([]Notification, Page, error)
PutNotifications_ func(ctx context.Context, opts PutOpts) error
PutReceipt_ func(ctx context.Context, updater string, r Receipt) error
DeleteNotitfications_ func(ctx context.Context, id uuid.UUID) error
Receipt_ func(ctx context.Context, id uuid.UUID) (Receipt, error)
ReceiptByUOID_ func(ctx context.Context, id uuid.UUID) (Receipt, error)
Expand Down Expand Up @@ -41,6 +42,14 @@ func (m *MockStore) PutNotifications(ctx context.Context, opts PutOpts) error {
return m.PutNotifications_(ctx, opts)
}

// PutReceipt allows for the caller to directly add a receipt to the store
// without notifications being created.
//
// After this method returns all methods on the Receipter interface must work accordingly.
func (m *MockStore) PutReceipt(ctx context.Context, updater string, r Receipt) error {
return m.PutReceipt_(ctx, updater, r)
}

// DeleteNotifications garbage collects all notifications associated
// with a notification id.
//
Expand Down
36 changes: 36 additions & 0 deletions notifier/postgres/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (e *e2e) Run(t *testing.T) {
{"SetDelivered", e.SetDelivered},
{"SetDeliveryFailed", e.SetDeliveryFailed},
{"SetDeleted", e.SetDeleted},
{"PutReceipt", e.PutReceipt},
}
for i := range subtests {
subtest := subtests[i]
Expand Down Expand Up @@ -262,3 +263,38 @@ func (e *e2e) SetDeleted(t *testing.T) {
t.Fatal(cmp.Diff(receipt.Status, notifier.Deleted))
}
}

// PutReceipt will confirm a receipt can be directly placed into the
// the database.
func (e *e2e) PutReceipt(t *testing.T) {
defer func() {
e.failed = t.Failed()
}()
noteID := uuid.New()
UOID := uuid.New()
r := notifier.Receipt{
NotificationID: noteID,
UOID: UOID,
Status: notifier.Delivered,
}
err := e.store.PutReceipt(e.ctx, "test-updater", r)
if err != nil {
t.Fatalf("failed to put receipt: %v", err)
}

rPrime, err := e.store.Receipt(e.ctx, noteID)
if err != nil {
t.Fatal(err)
}
if !cmp.Equal(rPrime, r, cmpopts.IgnoreFields(rPrime, "TS")) {
t.Fatal(cmp.Diff(rPrime, r))
}

rPrime, err = e.store.ReceiptByUOID(e.ctx, UOID)
if err != nil {
t.Fatal(err)
}
if !cmp.Equal(rPrime, r, cmpopts.IgnoreFields(rPrime, "TS")) {
t.Fatal(cmp.Diff(rPrime, r))
}
}
4 changes: 2 additions & 2 deletions notifier/postgres/putnotifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func putNotifications(ctx context.Context, pool *pgxpool.Pool, opts notifier.Put
insertReceipt = `INSERT INTO receipt (notification_id, uo_id, status, ts) VALUES ($1, $2, 'created', CURRENT_TIMESTAMP);`
insertUpdateOperation = `
INSERT INTO notifier_update_operation (updater, uo_id, ts)
VALUES ($1, $2, $3)
VALUES ($1, $2, CURRENT_TIMESTAMP)
`
)
tx, err := pool.Begin(ctx)
Expand Down Expand Up @@ -64,7 +64,7 @@ func putNotifications(ctx context.Context, pool *pgxpool.Pool, opts notifier.Put
}

// update known update operations
_, err = tx.Exec(ctx, insertUpdateOperation, opts.Updater, opts.UpdateID, time.Now())
_, err = tx.Exec(ctx, insertUpdateOperation, opts.Updater, opts.UpdateID)
if err != nil {
return clairerror.ErrPutNotifications{opts.NotificationID, err}
}
Expand Down
59 changes: 59 additions & 0 deletions notifier/postgres/putreceipt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package postgres

import (
"context"
"fmt"

"github.com/jackc/pgx/v4/pgxpool"
"github.com/quay/clair/v4/notifier"
)

func putReceipt(ctx context.Context, pool *pgxpool.Pool, updater string, r notifier.Receipt) error {
const (
insertNotification = `INSERT INTO notification (id) VALUES ($1);`
insertReceipt = `INSERT INTO receipt (notification_id, uo_id, status, ts) VALUES ($1, $2, $3, CURRENT_TIMESTAMP);`
insertUpdateOperation = `
INSERT INTO notifier_update_operation (updater, uo_id, ts)
VALUES ($1, $2, CURRENT_TIMESTAMP)
`
)
tx, err := pool.Begin(ctx)
if err != nil {
return fmt.Errorf("failed to create tx: %v", err)
}
defer tx.Rollback(ctx)

tag, err := tx.Exec(ctx, insertNotification, r.NotificationID)
if err != nil {
return fmt.Errorf("failed to insert notification id: %v", err)
}
if tag.RowsAffected() == 0 {
return fmt.Errorf("insert of notification id had no effect")
}

tag, err = tx.Exec(ctx, insertUpdateOperation, updater, r.UOID)
if err != nil {
return fmt.Errorf("failed to insert update operation id: %v", err)
}
if tag.RowsAffected() == 0 {
return fmt.Errorf("insert of update operation had no effect")
}

tag, err = tx.Exec(ctx,
insertReceipt,
r.NotificationID,
r.UOID,
r.Status,
)
if err != nil {
return fmt.Errorf("failed to insert receipt: %v", err)
}
if tag.RowsAffected() == 0 {
return fmt.Errorf("insert of receipt had no effect")
}
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("failed to commit tx: %v", err)
}
return nil
}
3 changes: 2 additions & 1 deletion notifier/postgres/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
// if the receipt does not exist a ErrNoReceipt is returned
func receipt(ctx context.Context, pool *pgxpool.Pool, id uuid.UUID) (notifier.Receipt, error) {
const (
query = `SELECT notification_id, status, ts FROM receipt WHERE notification_id = $1`
query = `SELECT uo_id, notification_id, status, ts FROM receipt WHERE notification_id = $1`
)

var r notifier.Receipt
row := pool.QueryRow(ctx, query, id.String())
err := row.Scan(
&r.UOID,
&r.NotificationID,
&r.Status,
&r.TS,
Expand Down
3 changes: 2 additions & 1 deletion notifier/postgres/receiptbyuoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
// if the receipt does not exist a ErrNoReceipt is returned
func receiptByUOID(ctx context.Context, pool *pgxpool.Pool, id uuid.UUID) (notifier.Receipt, error) {
const (
query = `SELECT notification_id, status, ts FROM receipt WHERE uo_id = $1`
query = `SELECT uo_id, notification_id, status, ts FROM receipt WHERE uo_id = $1`
)

var r notifier.Receipt
row := pool.QueryRow(ctx, query, id.String())
err := row.Scan(
&r.UOID,
&r.NotificationID,
&r.Status,
&r.TS,
Expand Down
4 changes: 4 additions & 0 deletions notifier/postgres/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (s *Store) PutNotifications(ctx context.Context, opts notifier.PutOpts) err
return putNotifications(ctx, s.pool, opts)
}

func (s *Store) PutReceipt(ctx context.Context, updater string, r notifier.Receipt) error {
return putReceipt(ctx, s.pool, updater, r)
}

// DeleteNotifications garbage collects all notifications associated
// with a notification id.
//
Expand Down
14 changes: 13 additions & 1 deletion notifier/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,18 @@ func (p *Processor) create(ctx context.Context, e Event, prev uuid.UUID) error {
log.Debug().Int("added", len(added.VulnerableManifests)).Int("removed", len(removed.VulnerableManifests)).Msg("affected manifest counts")

if len(added.VulnerableManifests) == 0 && len(removed.VulnerableManifests) == 0 {
log.Debug().Msg("0 affected manifests. will not create notifications.")
// directly add a "delivered" receipt, this will stop subsequent processing
// of this update operation and also avoid delivery attempts.
r := Receipt{
NotificationID: uuid.New(),
UOID: e.uo.Ref,
Status: Delivered,
}
log.Debug().Str("update_operation", e.uo.Ref.String()).Msg("no affected manifests for update operation, setting to delivered.")
err := p.store.PutReceipt(ctx, e.uo.Updater, r)
if err != nil {
return fmt.Errorf("failed to put receipt: %v", err)
}
return nil
}

Expand Down Expand Up @@ -189,6 +200,7 @@ func (p *Processor) safe(ctx context.Context, e Event) (bool, uuid.UUID) {
Str("updater", e.updater).
Str("UOID", uoid).
Logger()

// confirm we are not making duplicate notifications
var errNoReceipt clairerror.ErrNoReceipt
_, err := p.store.ReceiptByUOID(ctx, e.uo.Ref)
Expand Down
2 changes: 2 additions & 0 deletions notifier/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (

// Receipt represents the current status of a notification
type Receipt struct {
// The update operation associated with this receipt
UOID uuid.UUID
// the id a client may use to retrieve a set of notifications
NotificationID uuid.UUID
// the current status of the notification
Expand Down
5 changes: 5 additions & 0 deletions notifier/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ type Notificationer interface {
// successful persistence of notifications in such a way that Receipter.Created()
// returns the persisted notification id.
PutNotifications(ctx context.Context, opts PutOpts) error
// PutReceipt allows for the caller to directly add a receipt to the store
// without notifications being created.
//
// After this method returns all methods on the Receipter interface must work accordingly.
PutReceipt(ctx context.Context, updater string, r Receipt) error
// DeleteNotifications garbage collects all notifications associated
// with a notification id.
//
Expand Down

0 comments on commit 40abaa6

Please sign in to comment.