Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit tests for event forwarding #145

Merged
merged 5 commits into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 184 additions & 10 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2020 The Flux authors
Copyright 2020, 2021 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -17,11 +17,18 @@ limitations under the License.
package controllers

import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"path/filepath"
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -30,7 +37,11 @@ import (
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

notificationv1 "github.com/fluxcd/notification-controller/api/v1beta1"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/recorder"

notifyv1 "github.com/fluxcd/notification-controller/api/v1beta1"
"github.com/fluxcd/notification-controller/internal/server"
// +kubebuilder:scaffold:imports
)

Expand Down Expand Up @@ -64,15 +75,8 @@ var _ = BeforeSuite(func(done Done) {
Expect(err).ToNot(HaveOccurred())
Expect(cfg).ToNot(BeNil())

err = notificationv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

err = notificationv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

err = notificationv1.AddToScheme(scheme.Scheme)
err = notifyv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())

// +kubebuilder:scaffold:scheme

k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
Expand All @@ -87,3 +91,173 @@ var _ = AfterSuite(func() {
err := testEnv.Stop()
Expect(err).ToNot(HaveOccurred())
})

var _ = Describe("Event handlers", func() {

var (
namespace = "default"
rcvServer *httptest.Server
providerName = "test-provider"
provider notifyv1.Provider
stopCh chan struct{}
req *http.Request
)

// This sets up the minimal objects so that we can test the
// events handling.
BeforeEach(func() {
ctx := context.Background()

// We're not testing the provider, but this is a way to know
// whether events have been handled.
rcvServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
req = r
w.WriteHeader(200)
}))

provider = notifyv1.Provider{
Spec: notifyv1.ProviderSpec{
Type: "generic",
Address: rcvServer.URL,
},
}
provider.Name = providerName
provider.Namespace = namespace
By("Creating provider")
Expect(k8sClient.Create(ctx, &provider)).To(Succeed())

By("Creating and starting event server")
// TODO let OS assign port number
eventServer := server.NewEventServer("127.0.0.1:56789", logf.Log, k8sClient)
stopCh = make(chan struct{})
go eventServer.ListenAndServe(stopCh)
})

AfterEach(func() {
req = nil
rcvServer.Close()
close(stopCh)
Expect(k8sClient.Delete(context.Background(), &provider)).To(Succeed())
})

// The following test "templates" will create the alert, then
// serialise the event and post it to the event server. They
// differ on what's expected to happen to the event.

var (
alert notifyv1.Alert
event recorder.Event
)

JustBeforeEach(func() {
alert.Name = "test-alert"
alert.Namespace = namespace
Expect(k8sClient.Create(context.Background(), &alert)).To(Succeed())
// the event server won't dispatch to an alert if it has
// not been marked "ready"
meta.SetResourceCondition(&alert, meta.ReadyCondition, metav1.ConditionTrue, meta.ReconciliationSucceededReason, "artificially set to ready")
Expect(k8sClient.Status().Update(context.Background(), &alert)).To(Succeed())
})

AfterEach(func() {
Expect(k8sClient.Delete(context.Background(), &alert)).To(Succeed())
})

testSent := func() {
buf := &bytes.Buffer{}
Expect(json.NewEncoder(buf).Encode(&event)).To(Succeed())
res, err := http.Post("http://localhost:56789/", "application/json", buf)
Expect(err).ToNot(HaveOccurred())
Expect(res.StatusCode).To(Equal(202)) // event_server responds with 202 Accepted
}

testForwarded := func() {
Eventually(func() bool {
return req == nil
}, "2s", "0.1s").Should(BeFalse())
}

testFiltered := func() {
// The event_server does forwarding in a goroutine, after
// responding to the POST of the event. This makes it
// difficult to know whether the provider has filtered the
// event, or just not run the goroutine yet. For now, I'll use
// a timeout (and Consistently so it can fail early)
Consistently(func() bool {
return req == nil
}, "1s", "0.1s").Should(BeTrue())
}

Describe("event forwarding", func() {
BeforeEach(func() {
alert = notifyv1.Alert{}
alert.Spec = notifyv1.AlertSpec{
ProviderRef: meta.LocalObjectReference{
Name: providerName,
},
EventSeverity: "info",
EventSources: []notifyv1.CrossNamespaceObjectReference{
{
Kind: "Bucket",
Name: "hyacinth",
Namespace: "default",
},
},
}
event = recorder.Event{
InvolvedObject: corev1.ObjectReference{
Kind: "Bucket",
Name: "hyacinth",
Namespace: "default",
},
Severity: "info",
Timestamp: metav1.Now(),
Message: "well that happened",
Reason: "event-happened",
ReportingController: "source-controller",
}
})

Context("matching by source", func() {
It("forwards when source is a match", func() {
testSent()
testForwarded()
})
It("drops event when source Kind does not match", func() {
event.InvolvedObject.Kind = "GitRepository"
testSent()
testFiltered()
})
It("drops event when source name does not match", func() {
event.InvolvedObject.Name = "slop"
testSent()
testFiltered()
})
It("drops event when source namespace does not match", func() {
event.InvolvedObject.Namespace = "all-buckets"
testSent()
testFiltered()
})
})

Context("filtering by ExclusionList", func() {
BeforeEach(func() {
alert.Spec.ExclusionList = []string{
"doesnotoccur", // not intended to match
"well",
}
})

It("forwards event that is not matched", func() {
event.Message = "not excluded"
testSent()
testForwarded()
})

It("drops event that is matched by exclusion", func() {
testSent()
testFiltered()
})
})
})
})
10 changes: 3 additions & 7 deletions internal/server/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,30 +67,26 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)

// find matching alerts
alerts := make([]v1beta1.Alert, 0)
each_alert:
for _, alert := range allAlerts.Items {
// skip suspended and not ready alerts
isReady := apimeta.IsStatusConditionTrue(alert.Status.Conditions, meta.ReadyCondition)
if alert.Spec.Suspend || !isReady {
continue
continue each_alert
}

// skip alert if the message matches a regex from the exclusion list
var skip bool
if len(alert.Spec.ExclusionList) > 0 {
for _, exp := range alert.Spec.ExclusionList {
if r, err := regexp.Compile(exp); err == nil {
if r.Match([]byte(event.Message)) {
skip = true
break
continue each_alert
}
} else {
s.logger.Error(err, fmt.Sprintf("failed to compile regex: %s", exp))
}
}
}
if skip {
continue
}

// filter alerts by object and severity
for _, source := range alert.Spec.EventSources {
Expand Down
3 changes: 1 addition & 2 deletions internal/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func NewEventServer(port string, logger logr.Logger, kubeClient client.Client) *

// ListenAndServe starts the HTTP server on the specified port
func (s *EventServer) ListenAndServe(stopCh <-chan struct{}) {
mux := http.DefaultServeMux

mux := http.NewServeMux()
mux.HandleFunc("/", s.handleEvent())

srv := &http.Server{
Expand Down
File renamed without changes.