diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 7d883b6a7..317b5ca77 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -1,5 +1,5 @@ /* -Copyright 2020 The Flux authors +Copyright 2020, 2021 The Flux authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -17,11 +17,18 @@ limitations under the License. package controllers import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/http/httptest" "path/filepath" "testing" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" @@ -30,7 +37,11 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" - notificationv1 "github.com/fluxcd/notification-controller/api/v1beta1" + "github.com/fluxcd/pkg/apis/meta" + "github.com/fluxcd/pkg/recorder" + + notifyv1 "github.com/fluxcd/notification-controller/api/v1beta1" + "github.com/fluxcd/notification-controller/internal/server" // +kubebuilder:scaffold:imports ) @@ -64,15 +75,8 @@ var _ = BeforeSuite(func(done Done) { Expect(err).ToNot(HaveOccurred()) Expect(cfg).ToNot(BeNil()) - err = notificationv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = notificationv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = notificationv1.AddToScheme(scheme.Scheme) + err = notifyv1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) - // +kubebuilder:scaffold:scheme k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) @@ -87,3 +91,173 @@ var _ = AfterSuite(func() { err := testEnv.Stop() Expect(err).ToNot(HaveOccurred()) }) + +var _ = Describe("Event handlers", func() { + + var ( + namespace = "default" + rcvServer *httptest.Server + providerName = "test-provider" + provider notifyv1.Provider + stopCh chan struct{} + req *http.Request + ) + + // This sets up the minimal objects so that we can test the + // events handling. + BeforeEach(func() { + ctx := context.Background() + + // We're not testing the provider, but this is a way to know + // whether events have been handled. + rcvServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + req = r + w.WriteHeader(200) + })) + + provider = notifyv1.Provider{ + Spec: notifyv1.ProviderSpec{ + Type: "generic", + Address: rcvServer.URL, + }, + } + provider.Name = providerName + provider.Namespace = namespace + By("Creating provider") + Expect(k8sClient.Create(ctx, &provider)).To(Succeed()) + + By("Creating and starting event server") + // TODO let OS assign port number + eventServer := server.NewEventServer("127.0.0.1:56789", logf.Log, k8sClient) + stopCh = make(chan struct{}) + go eventServer.ListenAndServe(stopCh) + }) + + AfterEach(func() { + req = nil + rcvServer.Close() + close(stopCh) + Expect(k8sClient.Delete(context.Background(), &provider)).To(Succeed()) + }) + + // The following test "templates" will create the alert, then + // serialise the event and post it to the event server. They + // differ on what's expected to happen to the event. + + var ( + alert notifyv1.Alert + event recorder.Event + ) + + JustBeforeEach(func() { + alert.Name = "test-alert" + alert.Namespace = namespace + Expect(k8sClient.Create(context.Background(), &alert)).To(Succeed()) + // the event server won't dispatch to an alert if it has + // not been marked "ready" + meta.SetResourceCondition(&alert, meta.ReadyCondition, metav1.ConditionTrue, meta.ReconciliationSucceededReason, "artificially set to ready") + Expect(k8sClient.Status().Update(context.Background(), &alert)).To(Succeed()) + }) + + AfterEach(func() { + Expect(k8sClient.Delete(context.Background(), &alert)).To(Succeed()) + }) + + testSent := func() { + buf := &bytes.Buffer{} + Expect(json.NewEncoder(buf).Encode(&event)).To(Succeed()) + res, err := http.Post("http://localhost:56789/", "application/json", buf) + Expect(err).ToNot(HaveOccurred()) + Expect(res.StatusCode).To(Equal(202)) // event_server responds with 202 Accepted + } + + testForwarded := func() { + Eventually(func() bool { + return req == nil + }, "2s", "0.1s").Should(BeFalse()) + } + + testFiltered := func() { + // The event_server does forwarding in a goroutine, after + // responding to the POST of the event. This makes it + // difficult to know whether the provider has filtered the + // event, or just not run the goroutine yet. For now, I'll use + // a timeout (and Consistently so it can fail early) + Consistently(func() bool { + return req == nil + }, "1s", "0.1s").Should(BeTrue()) + } + + Describe("event forwarding", func() { + BeforeEach(func() { + alert = notifyv1.Alert{} + alert.Spec = notifyv1.AlertSpec{ + ProviderRef: meta.LocalObjectReference{ + Name: providerName, + }, + EventSeverity: "info", + EventSources: []notifyv1.CrossNamespaceObjectReference{ + { + Kind: "Bucket", + Name: "hyacinth", + Namespace: "default", + }, + }, + } + event = recorder.Event{ + InvolvedObject: corev1.ObjectReference{ + Kind: "Bucket", + Name: "hyacinth", + Namespace: "default", + }, + Severity: "info", + Timestamp: metav1.Now(), + Message: "well that happened", + Reason: "event-happened", + ReportingController: "source-controller", + } + }) + + Context("matching by source", func() { + It("forwards when source is a match", func() { + testSent() + testForwarded() + }) + It("drops event when source Kind does not match", func() { + event.InvolvedObject.Kind = "GitRepository" + testSent() + testFiltered() + }) + It("drops event when source name does not match", func() { + event.InvolvedObject.Name = "slop" + testSent() + testFiltered() + }) + It("drops event when source namespace does not match", func() { + event.InvolvedObject.Namespace = "all-buckets" + testSent() + testFiltered() + }) + }) + + Context("filtering by ExclusionList", func() { + BeforeEach(func() { + alert.Spec.ExclusionList = []string{ + "doesnotoccur", // not intended to match + "well", + } + }) + + It("forwards event that is not matched", func() { + event.Message = "not excluded" + testSent() + testForwarded() + }) + + It("drops event that is matched by exclusion", func() { + testSent() + testFiltered() + }) + }) + }) +}) diff --git a/internal/server/event_handlers.go b/internal/server/event_handlers.go index 0e6ca80dc..5e16b59db 100644 --- a/internal/server/event_handlers.go +++ b/internal/server/event_handlers.go @@ -67,30 +67,26 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) // find matching alerts alerts := make([]v1beta1.Alert, 0) + each_alert: for _, alert := range allAlerts.Items { // skip suspended and not ready alerts isReady := apimeta.IsStatusConditionTrue(alert.Status.Conditions, meta.ReadyCondition) if alert.Spec.Suspend || !isReady { - continue + continue each_alert } // skip alert if the message matches a regex from the exclusion list - var skip bool if len(alert.Spec.ExclusionList) > 0 { for _, exp := range alert.Spec.ExclusionList { if r, err := regexp.Compile(exp); err == nil { if r.Match([]byte(event.Message)) { - skip = true - break + continue each_alert } } else { s.logger.Error(err, fmt.Sprintf("failed to compile regex: %s", exp)) } } } - if skip { - continue - } // filter alerts by object and severity for _, source := range alert.Spec.EventSources { diff --git a/internal/server/event_server.go b/internal/server/event_server.go index 9de858ed2..89d19cde7 100644 --- a/internal/server/event_server.go +++ b/internal/server/event_server.go @@ -44,8 +44,7 @@ func NewEventServer(port string, logger logr.Logger, kubeClient client.Client) * // ListenAndServe starts the HTTP server on the specified port func (s *EventServer) ListenAndServe(stopCh <-chan struct{}) { - mux := http.DefaultServeMux - + mux := http.NewServeMux() mux.HandleFunc("/", s.handleEvent()) srv := &http.Server{ diff --git a/internal/server/recevier_server.go b/internal/server/receiver_server.go similarity index 100% rename from internal/server/recevier_server.go rename to internal/server/receiver_server.go