diff --git a/notifier/mockstore.go b/notifier/mockstore.go index c97debc20d..98fdb455ed 100644 --- a/notifier/mockstore.go +++ b/notifier/mockstore.go @@ -10,6 +10,7 @@ import ( type MockStore struct { Notifications_ func(ctx context.Context, id uuid.UUID, page *Page) ([]Notification, Page, error) PutNotifications_ func(ctx context.Context, opts PutOpts) error + PutReceipt_ func(ctx context.Context, updater string, r Receipt) error DeleteNotitfications_ func(ctx context.Context, id uuid.UUID) error Receipt_ func(ctx context.Context, id uuid.UUID) (Receipt, error) ReceiptByUOID_ func(ctx context.Context, id uuid.UUID) (Receipt, error) @@ -41,6 +42,14 @@ func (m *MockStore) PutNotifications(ctx context.Context, opts PutOpts) error { return m.PutNotifications_(ctx, opts) } +// PutReceipt allows for the caller to directly add a receipt to the store +// without notifications being created. +// +// After this method returns all methods on the Receipter interface must work accordingly. +func (m *MockStore) PutReceipt(ctx context.Context, updater string, r Receipt) error { + return m.PutReceipt_(ctx, updater, r) +} + // DeleteNotifications garbage collects all notifications associated // with a notification id. // diff --git a/notifier/postgres/e2e_test.go b/notifier/postgres/e2e_test.go index 2574b6149f..6446dec893 100644 --- a/notifier/postgres/e2e_test.go +++ b/notifier/postgres/e2e_test.go @@ -91,6 +91,7 @@ func (e *e2e) Run(t *testing.T) { {"SetDelivered", e.SetDelivered}, {"SetDeliveryFailed", e.SetDeliveryFailed}, {"SetDeleted", e.SetDeleted}, + {"PutReceipt", e.PutReceipt}, } for i := range subtests { subtest := subtests[i] @@ -262,3 +263,38 @@ func (e *e2e) SetDeleted(t *testing.T) { t.Fatal(cmp.Diff(receipt.Status, notifier.Deleted)) } } + +// PutReceipt will confirm a receipt can be directly placed into the +// the database. +func (e *e2e) PutReceipt(t *testing.T) { + defer func() { + e.failed = t.Failed() + }() + noteID := uuid.New() + UOID := uuid.New() + r := notifier.Receipt{ + NotificationID: noteID, + UOID: UOID, + Status: notifier.Delivered, + } + err := e.store.PutReceipt(e.ctx, "test-updater", r) + if err != nil { + t.Fatalf("failed to put receipt: %v", err) + } + + rPrime, err := e.store.Receipt(e.ctx, noteID) + if err != nil { + t.Fatal(err) + } + if !cmp.Equal(rPrime, r, cmpopts.IgnoreFields(rPrime, "TS")) { + t.Fatal(cmp.Diff(rPrime, r)) + } + + rPrime, err = e.store.ReceiptByUOID(e.ctx, UOID) + if err != nil { + t.Fatal(err) + } + if !cmp.Equal(rPrime, r, cmpopts.IgnoreFields(rPrime, "TS")) { + t.Fatal(cmp.Diff(rPrime, r)) + } +} diff --git a/notifier/postgres/putnotifications.go b/notifier/postgres/putnotifications.go index e2a777bd85..075a157064 100644 --- a/notifier/postgres/putnotifications.go +++ b/notifier/postgres/putnotifications.go @@ -31,7 +31,7 @@ func putNotifications(ctx context.Context, pool *pgxpool.Pool, opts notifier.Put insertReceipt = `INSERT INTO receipt (notification_id, uo_id, status, ts) VALUES ($1, $2, 'created', CURRENT_TIMESTAMP);` insertUpdateOperation = ` INSERT INTO notifier_update_operation (updater, uo_id, ts) - VALUES ($1, $2, $3) + VALUES ($1, $2, CURRENT_TIMESTAMP) ` ) tx, err := pool.Begin(ctx) @@ -64,7 +64,7 @@ func putNotifications(ctx context.Context, pool *pgxpool.Pool, opts notifier.Put } // update known update operations - _, err = tx.Exec(ctx, insertUpdateOperation, opts.Updater, opts.UpdateID, time.Now()) + _, err = tx.Exec(ctx, insertUpdateOperation, opts.Updater, opts.UpdateID) if err != nil { return clairerror.ErrPutNotifications{opts.NotificationID, err} } diff --git a/notifier/postgres/putreceipt.go b/notifier/postgres/putreceipt.go new file mode 100644 index 0000000000..d2d22ecb6a --- /dev/null +++ b/notifier/postgres/putreceipt.go @@ -0,0 +1,59 @@ +package postgres + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v4/pgxpool" + "github.com/quay/clair/v4/notifier" +) + +func putReceipt(ctx context.Context, pool *pgxpool.Pool, updater string, r notifier.Receipt) error { + const ( + insertNotification = `INSERT INTO notification (id) VALUES ($1);` + insertReceipt = `INSERT INTO receipt (notification_id, uo_id, status, ts) VALUES ($1, $2, $3, CURRENT_TIMESTAMP);` + insertUpdateOperation = ` + INSERT INTO notifier_update_operation (updater, uo_id, ts) + VALUES ($1, $2, CURRENT_TIMESTAMP) + ` + ) + tx, err := pool.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to create tx: %v", err) + } + defer tx.Rollback(ctx) + + tag, err := tx.Exec(ctx, insertNotification, r.NotificationID) + if err != nil { + return fmt.Errorf("failed to insert notification id: %v", err) + } + if tag.RowsAffected() == 0 { + return fmt.Errorf("insert of notification id had no effect") + } + + tag, err = tx.Exec(ctx, insertUpdateOperation, updater, r.UOID) + if err != nil { + return fmt.Errorf("failed to insert update operation id: %v", err) + } + if tag.RowsAffected() == 0 { + return fmt.Errorf("insert of update operation had no effect") + } + + tag, err = tx.Exec(ctx, + insertReceipt, + r.NotificationID, + r.UOID, + r.Status, + ) + if err != nil { + return fmt.Errorf("failed to insert receipt: %v", err) + } + if tag.RowsAffected() == 0 { + return fmt.Errorf("insert of receipt had no effect") + } + err = tx.Commit(ctx) + if err != nil { + return fmt.Errorf("failed to commit tx: %v", err) + } + return nil +} diff --git a/notifier/postgres/receipt.go b/notifier/postgres/receipt.go index f2dcac6cdb..43e9ceb153 100644 --- a/notifier/postgres/receipt.go +++ b/notifier/postgres/receipt.go @@ -16,12 +16,13 @@ import ( // if the receipt does not exist a ErrNoReceipt is returned func receipt(ctx context.Context, pool *pgxpool.Pool, id uuid.UUID) (notifier.Receipt, error) { const ( - query = `SELECT notification_id, status, ts FROM receipt WHERE notification_id = $1` + query = `SELECT uo_id, notification_id, status, ts FROM receipt WHERE notification_id = $1` ) var r notifier.Receipt row := pool.QueryRow(ctx, query, id.String()) err := row.Scan( + &r.UOID, &r.NotificationID, &r.Status, &r.TS, diff --git a/notifier/postgres/receiptbyuoid.go b/notifier/postgres/receiptbyuoid.go index 395c59d869..4b3c5d9902 100644 --- a/notifier/postgres/receiptbyuoid.go +++ b/notifier/postgres/receiptbyuoid.go @@ -16,12 +16,13 @@ import ( // if the receipt does not exist a ErrNoReceipt is returned func receiptByUOID(ctx context.Context, pool *pgxpool.Pool, id uuid.UUID) (notifier.Receipt, error) { const ( - query = `SELECT notification_id, status, ts FROM receipt WHERE uo_id = $1` + query = `SELECT uo_id, notification_id, status, ts FROM receipt WHERE uo_id = $1` ) var r notifier.Receipt row := pool.QueryRow(ctx, query, id.String()) err := row.Scan( + &r.UOID, &r.NotificationID, &r.Status, &r.TS, diff --git a/notifier/postgres/store.go b/notifier/postgres/store.go index c376f118bf..8290863851 100644 --- a/notifier/postgres/store.go +++ b/notifier/postgres/store.go @@ -37,6 +37,10 @@ func (s *Store) PutNotifications(ctx context.Context, opts notifier.PutOpts) err return putNotifications(ctx, s.pool, opts) } +func (s *Store) PutReceipt(ctx context.Context, updater string, r notifier.Receipt) error { + return putReceipt(ctx, s.pool, updater, r) +} + // DeleteNotifications garbage collects all notifications associated // with a notification id. // diff --git a/notifier/processor.go b/notifier/processor.go index 7fc1a2ae9b..311fa72a06 100644 --- a/notifier/processor.go +++ b/notifier/processor.go @@ -128,7 +128,18 @@ func (p *Processor) create(ctx context.Context, e Event, prev uuid.UUID) error { log.Debug().Int("added", len(added.VulnerableManifests)).Int("removed", len(removed.VulnerableManifests)).Msg("affected manifest counts") if len(added.VulnerableManifests) == 0 && len(removed.VulnerableManifests) == 0 { - log.Debug().Msg("0 affected manifests. will not create notifications.") + // directly add a "delivered" receipt, this will stop subsequent processing + // of this update operation and also avoid delivery attempts. + r := Receipt{ + NotificationID: uuid.New(), + UOID: e.uo.Ref, + Status: Delivered, + } + log.Debug().Str("update_operation", e.uo.Ref.String()).Msg("no affected manifests for update operation, setting to delivered.") + err := p.store.PutReceipt(ctx, e.uo.Updater, r) + if err != nil { + return fmt.Errorf("failed to put receipt: %v", err) + } return nil } @@ -189,6 +200,7 @@ func (p *Processor) safe(ctx context.Context, e Event) (bool, uuid.UUID) { Str("updater", e.updater). Str("UOID", uoid). Logger() + // confirm we are not making duplicate notifications var errNoReceipt clairerror.ErrNoReceipt _, err := p.store.ReceiptByUOID(ctx, e.uo.Ref) diff --git a/notifier/receipt.go b/notifier/receipt.go index d77dd77d9c..9d14916bce 100644 --- a/notifier/receipt.go +++ b/notifier/receipt.go @@ -22,6 +22,8 @@ const ( // Receipt represents the current status of a notification type Receipt struct { + // The update operation associated with this receipt + UOID uuid.UUID // the id a client may use to retrieve a set of notifications NotificationID uuid.UUID // the current status of the notification diff --git a/notifier/store.go b/notifier/store.go index 2f8e665c13..3f7675a740 100644 --- a/notifier/store.go +++ b/notifier/store.go @@ -53,6 +53,11 @@ type Notificationer interface { // successful persistence of notifications in such a way that Receipter.Created() // returns the persisted notification id. PutNotifications(ctx context.Context, opts PutOpts) error + // PutReceipt allows for the caller to directly add a receipt to the store + // without notifications being created. + // + // After this method returns all methods on the Receipter interface must work accordingly. + PutReceipt(ctx context.Context, updater string, r Receipt) error // DeleteNotifications garbage collects all notifications associated // with a notification id. //