Skip to content

Commit

Permalink
Merge pull request #506 from aryan9600/indexing
Browse files Browse the repository at this point in the history
index receivers using webhook path as key
  • Loading branch information
hiddeco authored May 3, 2023
2 parents f798c76 + 00fab66 commit 4effb15
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 34 deletions.
7 changes: 7 additions & 0 deletions internal/controllers/receiver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/fluxcd/pkg/runtime/predicates"

apiv1 "github.com/fluxcd/notification-controller/api/v1"
"github.com/fluxcd/notification-controller/internal/server"
)

// ReceiverReconciler reconciles a Receiver object
Expand All @@ -62,6 +63,12 @@ func (r *ReceiverReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func (r *ReceiverReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts ReceiverReconcilerOptions) error {
// This index is used to list Receivers by their webhook path after the receiver server
// gets a request.
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &apiv1.Receiver{},
server.WebhookPathIndexKey, server.IndexReceiverWebhookPath); err != nil {
return err
}
recoverPanic := true
return ctrl.NewControllerManagedBy(mgr).
For(&apiv1.Receiver{}, builder.WithPredicates(
Expand Down
4 changes: 3 additions & 1 deletion internal/controllers/receiver_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ func TestReceiverReconciler_EventHandler(t *testing.T) {
timeout := 30 * time.Second
resultR := &apiv1.Receiver{}

receiverServer := server.NewReceiverServer("127.0.0.1:56788", logf.Log, k8sClient)
// Use the client from the manager as the server handler needs to list objects from the cache
// which the "live" k8s client does not have access to.
receiverServer := server.NewReceiverServer("127.0.0.1:56788", logf.Log, testEnv.GetClient())
receiverMdlw := middleware.New(middleware.Config{
Recorder: prommetrics.NewRecorder(prommetrics.Config{
Prefix: "gotk_receiver",
Expand Down
11 changes: 6 additions & 5 deletions internal/server/receiver_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func Test_handlePayload(t *testing.T) {
Conditions: []metav1.Condition{{Type: meta.StalledCondition, Status: metav1.ConditionFalse}},
},
},
expectedResponseCode: http.StatusNotFound,
expectedResponseCode: http.StatusServiceUnavailable,
},
{
name: "suspended receiver ignored",
Expand All @@ -337,7 +337,7 @@ func Test_handlePayload(t *testing.T) {
Conditions: []metav1.Condition{{Type: meta.ReadyCondition, Status: metav1.ConditionTrue}},
},
},
expectedResponseCode: http.StatusNotFound,
expectedResponseCode: http.StatusServiceUnavailable,
},
{
name: "missing apiVersion in resource",
Expand Down Expand Up @@ -372,7 +372,7 @@ func Test_handlePayload(t *testing.T) {
"token": []byte("token"),
},
},
expectedResponseCode: http.StatusBadRequest,
expectedResponseCode: http.StatusInternalServerError,
},
{
name: "resource by name not found",
Expand Down Expand Up @@ -406,7 +406,7 @@ func Test_handlePayload(t *testing.T) {
"token": []byte("token"),
},
},
expectedResponseCode: http.StatusBadRequest,
expectedResponseCode: http.StatusInternalServerError,
},
{
name: "annotating resources by label match",
Expand Down Expand Up @@ -563,7 +563,7 @@ func Test_handlePayload(t *testing.T) {
"token": []byte("token"),
},
},
expectedResponseCode: http.StatusBadRequest,
expectedResponseCode: http.StatusInternalServerError,
},
{
name: "resource matchLabels is ignored if name is not *",
Expand Down Expand Up @@ -641,6 +641,7 @@ func Test_handlePayload(t *testing.T) {
}

builder.WithObjects(tt.resources...)
builder.WithIndex(&apiv1.Receiver{}, WebhookPathIndexKey, IndexReceiverWebhookPath)

if tt.secret != nil {
builder.WithObjects(tt.secret)
Expand Down
74 changes: 46 additions & 28 deletions internal/server/receiver_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -43,6 +44,10 @@ import (
apiv1 "github.com/fluxcd/notification-controller/api/v1"
)

var (
WebhookPathIndexKey = ".metadata.webhookPath"
)

// defaultFluxAPIVersions is a map of Flux API kinds to their API versions.
var defaultFluxAPIVersions = map[string]string{
"Bucket": "source.toolkit.fluxcd.io/v1beta2",
Expand All @@ -53,6 +58,16 @@ var defaultFluxAPIVersions = map[string]string{
"ImageRepository": "image.toolkit.fluxcd.io/v1beta2",
}

// IndexReceiverWebhookPath is a client.IndexerFunc that returns the Receiver's
// webhook path, if present in its status.
func IndexReceiverWebhookPath(o client.Object) []string {
receiver := o.(*apiv1.Receiver)
if receiver.Status.WebhookPath != "" {
return []string{receiver.Status.WebhookPath}
}
return nil
}

func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
Expand All @@ -61,50 +76,53 @@ func (s *ReceiverServer) handlePayload() func(w http.ResponseWriter, r *http.Req
s.logger.Info(fmt.Sprintf("handling request: %s", digest))

var allReceivers apiv1.ReceiverList
err := s.kubeClient.List(ctx, &allReceivers)
err := s.kubeClient.List(ctx, &allReceivers, client.MatchingFields{
WebhookPathIndexKey: r.RequestURI,
}, client.Limit(1))
if err != nil {
s.logger.Error(err, "unable to list receivers")
w.WriteHeader(http.StatusBadRequest)
w.WriteHeader(http.StatusInternalServerError)
return
}

receivers := make([]apiv1.Receiver, 0)
for _, receiver := range allReceivers.Items {
if !receiver.Spec.Suspend &&
conditions.IsReady(&receiver) &&
receiver.Status.WebhookPath == fmt.Sprintf("%s%s", apiv1.ReceiverWebhookPath, digest) {
receivers = append(receivers, receiver)
}
}

if len(receivers) == 0 {
if len(allReceivers.Items) == 0 {
w.WriteHeader(http.StatusNotFound)
return
}

withErrors := false
for _, receiver := range receivers {
logger := s.logger.WithValues(
"reconciler kind", apiv1.ReceiverKind,
"name", receiver.Name,
"namespace", receiver.Namespace)
receiver := allReceivers.Items[0]
logger := s.logger.WithValues(
"reconciler kind", apiv1.ReceiverKind,
"name", receiver.Name,
"namespace", receiver.Namespace)

if err := s.validate(ctx, receiver, r); err != nil {
logger.Error(err, "unable to validate payload")
withErrors = true
continue
if receiver.Spec.Suspend || !conditions.IsReady(&receiver) {
err := errors.New("unable to process request")
if receiver.Spec.Suspend {
logger.Error(err, "receiver is suspended")
} else {
logger.Error(err, "receiver is not ready")
}
w.WriteHeader(http.StatusServiceUnavailable)
return
}

for _, resource := range receiver.Spec.Resources {
if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil {
logger.Error(err, "unable to process resource")
withErrors = true
}
if err := s.validate(ctx, receiver, r); err != nil {
logger.Error(err, "unable to validate payload")
w.WriteHeader(http.StatusBadRequest)
return
}

var withErrors bool
for _, resource := range receiver.Spec.Resources {
if err := s.requestReconciliation(ctx, logger, resource, receiver.Namespace); err != nil {
logger.Error(err, "unable to request reconciliation")
withErrors = true
}
}

if withErrors {
w.WriteHeader(http.StatusBadRequest)
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
Expand Down

0 comments on commit 4effb15

Please sign in to comment.