Skip to content

Commit

Permalink
event handler: Add k8s events for Alerts
Browse files Browse the repository at this point in the history
Improve logging by including the event and alert data in the log and
emit the same message as events on the respective alert to make the
message visible on the alert it belogs to.

Signed-off-by: Sunny <darkowlzz@protonmail.com>
  • Loading branch information
darkowlzz committed Jun 6, 2023
1 parent 7695097 commit f650b00
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 68 deletions.
130 changes: 77 additions & 53 deletions internal/server/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,45 +35,57 @@ import (

eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/masktoken"
"github.com/go-logr/logr"

apiv1 "github.com/fluxcd/notification-controller/api/v1"
apiv1beta3 "github.com/fluxcd/notification-controller/api/v1beta3"
"github.com/fluxcd/notification-controller/internal/notifier"
)

func loggerWithEventInvolvedObject(log logr.Logger, event *eventv1.Event) logr.Logger {
return log.WithValues("eventInvolvedObject", event.InvolvedObject)
}

func loggerWithAlert(log logr.Logger, alert *apiv1beta3.Alert) logr.Logger {
return log.WithValues("alert", client.ObjectKeyFromObject(alert))
}

func involvedObjectString(o corev1.ObjectReference) string {
return fmt.Sprintf("%s/%s/%s/%s", o.APIVersion, o.Kind, o.Namespace, o.Name)
}

func crossNSObjectRefString(o apiv1.CrossNamespaceObjectReference) string {
return fmt.Sprintf("%s/%s/%s/%s", o.APIVersion, o.Kind, o.Namespace, o.Name)
}

func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
logger := loggerWithEventInvolvedObject(s.logger, event)

ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
defer cancel()

alerts, err := s.getAllAlertsForEvent(ctx, event)
if err != nil {
s.logger.Error(err, "failed to get alerts for the event: %w", err)
logger.Error(err, "failed to get alerts for the event: %w", err)
}

if len(alerts) == 0 {
s.logger.Info("Discarding event, no alerts found for the involved object",
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"namespace", event.InvolvedObject.Namespace)
logger.Info("discarding event, no alerts found for the involved object")
w.WriteHeader(http.StatusAccepted)
return
}

s.logger.Info(fmt.Sprintf("Dispatching event: %s", event.Message),
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"namespace", event.InvolvedObject.Namespace)
logger.Info("dispatching event", "message", event.Message)

// Dispatch notifications.
for _, alert := range alerts {
if err := s.dispatchNotification(ctx, event, alert); err != nil {
s.logger.Error(err, "failed to dispatch notification to provider",
"reconciler kind", apiv1beta3.ProviderKind,
"name", alert.Spec.ProviderRef.Name,
"namespace", alert.Namespace)
if err := s.dispatchNotification(ctx, event, &alert); err != nil {
alogger := loggerWithAlert(logger, &alert)
alogger.Error(err, "failed to dispatch notification")
s.Eventf(&alert, corev1.EventTypeWarning, "NotificationDispatchFailed",
"failed to dispatch notification for %s: %s", involvedObjectString(event.InvolvedObject), err)
}
}

Expand Down Expand Up @@ -102,17 +114,17 @@ func (s *EventServer) filterAlertsForEvent(ctx context.Context, alerts []apiv1be
continue
}
// Check if the event matches any of the alert sources.
if !s.eventMatchesAlertSources(ctx, event, alert) {
if !s.eventMatchesAlertSources(ctx, event, &alert) {
continue
}
// Check if the event message is allowed for the alert based on the
// inclusion list.
if !s.messageIsIncluded(event.Message, alert.Spec.InclusionList) {
if !s.messageIsIncluded(event.Message, &alert) {
continue
}
// Check if the event message is allowed for the alert based on the
// exclusion list.
if s.messageIsExcluded(event.Message, alert.Spec.ExclusionList) {
if s.messageIsExcluded(event.Message, &alert) {
continue
}
results = append(results, alert)
Expand All @@ -122,61 +134,67 @@ func (s *EventServer) filterAlertsForEvent(ctx context.Context, alerts []apiv1be

// eventMatchesAlertSources returns if a given event matches with any of the
// alert sources.
func (s *EventServer) eventMatchesAlertSources(ctx context.Context, event *eventv1.Event, alert apiv1beta3.Alert) bool {
func (s *EventServer) eventMatchesAlertSources(ctx context.Context, event *eventv1.Event, alert *apiv1beta3.Alert) bool {
for _, source := range alert.Spec.EventSources {
if source.Namespace == "" {
source.Namespace = alert.Namespace
}
if s.eventMatchesAlert(ctx, event, source, alert.Spec.EventSeverity) {
if s.eventMatchesAlertSource(ctx, event, alert, source) {
return true
}
}
return false
}

// messageIsIncluded returns if the given message matches with the inclusion
// rules.
func (s *EventServer) messageIsIncluded(msg string, inclusionList []string) bool {
if len(inclusionList) == 0 {
// messageIsIncluded returns if the given message matches with the given alert's
// inclusion rules.
func (s *EventServer) messageIsIncluded(msg string, alert *apiv1beta3.Alert) bool {
logger := loggerWithAlert(s.logger, alert)
if len(alert.Spec.InclusionList) == 0 {
return true
}

for _, exp := range inclusionList {
for _, exp := range alert.Spec.InclusionList {
if r, err := regexp.Compile(exp); err == nil {
if r.Match([]byte(msg)) {
return true
}
} else {
// TODO: Record event on the respective Alert object.
s.logger.Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", exp))
logger.Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", exp))
s.Eventf(alert, corev1.EventTypeWarning,
"InvalidConfig", "failed to compile inclusion regex: %s", exp)
}
}
return false
}

// messageIsExcluded returns if the given message matches with the exclusion
// rules.
func (s *EventServer) messageIsExcluded(msg string, exclusionList []string) bool {
if len(exclusionList) == 0 {
// messageIsExcluded returns if the given message matches with the given alert's
// exclusion rules.
func (s *EventServer) messageIsExcluded(msg string, alert *apiv1beta3.Alert) bool {
logger := loggerWithAlert(s.logger, alert)
if len(alert.Spec.ExclusionList) == 0 {
return false
}

for _, exp := range exclusionList {
for _, exp := range alert.Spec.ExclusionList {
if r, err := regexp.Compile(exp); err == nil {
if r.Match([]byte(msg)) {
return true
}
} else {
// TODO: Record event on the respective Alert object.
s.logger.Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exp))
logger.Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exp))
s.Eventf(alert, corev1.EventTypeWarning, "InvalidConfig",
"failed to compile exclusion regex: %s", exp)
}
}
return false
}

// dispatchNotification constructs and sends notification from the given event
// and alert data.
func (s *EventServer) dispatchNotification(ctx context.Context, event *eventv1.Event, alert apiv1beta3.Alert) error {
func (s *EventServer) dispatchNotification(ctx context.Context, event *eventv1.Event, alert *apiv1beta3.Alert) error {
logger := loggerWithEventInvolvedObject(loggerWithAlert(s.logger, alert), event)

sender, notification, token, timeout, err := s.getNotificationParams(ctx, event, alert)
if err != nil {
return err
Expand All @@ -196,11 +214,9 @@ func (s *EventServer) dispatchNotification(ctx context.Context, event *eventv1.E
} else {
err = errors.New(maskedErrStr)
}
// TODO: Record failed event on the associated Alert object.
s.logger.Error(err, "failed to send notification",
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"namespace", event.InvolvedObject.Namespace)
logger.Error(err, "failed to send notification")
s.Eventf(alert, corev1.EventTypeWarning, "NotificationDispatchFailed",
"failed to send notification for %s: %s", involvedObjectString(event.InvolvedObject), err)
}
}(sender, *notification)

Expand All @@ -211,12 +227,12 @@ func (s *EventServer) dispatchNotification(ctx context.Context, event *eventv1.E
// event and alert, and returns a notifier, event, token and timeout for sending
// the notification. The returned event is a mutated form of the input event
// based on the alert configuration.
func (s *EventServer) getNotificationParams(ctx context.Context, event *eventv1.Event, alert apiv1beta3.Alert) (notifier.Interface, *eventv1.Event, string, time.Duration, error) {
func (s *EventServer) getNotificationParams(ctx context.Context, event *eventv1.Event, alert *apiv1beta3.Alert) (notifier.Interface, *eventv1.Event, string, time.Duration, error) {
// Check if event comes from a different namespace.
if s.noCrossNamespaceRefs && event.InvolvedObject.Namespace != alert.Namespace {
accessDenied := fmt.Errorf(
"alert '%s/%s' can't process event from '%s/%s/%s', cross-namespace references have been blocked",
alert.Namespace, alert.Name, event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name)
"alert '%s/%s' can't process event from '%s', cross-namespace references have been blocked",
alert.Namespace, alert.Name, involvedObjectString(event.InvolvedObject))
return nil, nil, "", 0, fmt.Errorf("discarding event, access denied to cross-namespace sources: %w", accessDenied)
}

Expand Down Expand Up @@ -331,9 +347,11 @@ func createNotifier(ctx context.Context, kubeClient client.Client, provider apiv
return sender, token, nil
}

// eventMatchesAlert returns if a given event matches with the given alert
// eventMatchesAlertSource returns if a given event matches with the given alert
// source configuration and severity.
func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Event, source apiv1.CrossNamespaceObjectReference, severity string) bool {
func (s *EventServer) eventMatchesAlertSource(ctx context.Context, event *eventv1.Event, alert *apiv1beta3.Alert, source apiv1.CrossNamespaceObjectReference) bool {
logger := loggerWithEventInvolvedObject(loggerWithAlert(s.logger, alert), event)

// No match if the event and source don't have the same namespace and kind.
if event.InvolvedObject.Namespace != source.Namespace ||
event.InvolvedObject.Kind != source.Kind {
Expand All @@ -342,6 +360,7 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even

// No match if the alert severity doesn't match the event severity and
// the alert severity isn't info.
severity := alert.Spec.EventSeverity
if event.Severity != severity && severity != eventv1.EventSeverityInfo {
return false
}
Expand All @@ -367,22 +386,29 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even
Namespace: event.InvolvedObject.Namespace,
Name: event.InvolvedObject.Name,
}, &obj); err != nil {
s.logger.Error(err, "error getting object", "kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name, "apiVersion", event.InvolvedObject.APIVersion)
logger.Error(err, "error getting source object %s", involvedObjectString(event.InvolvedObject))
s.Eventf(alert, corev1.EventTypeWarning, "SourceFetchFailed",
"error getting source object %s", involvedObjectString(event.InvolvedObject))
return false
}

sel, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: source.MatchLabels,
})
if err != nil {
s.logger.Error(err, fmt.Sprintf("error using matchLabels from event source '%s'", source.Name))
logger.Error(err, fmt.Sprintf("error using matchLabels from event source %s", crossNSObjectRefString(source)))
s.Eventf(alert, corev1.EventTypeWarning, "InvalidConfig",
"error using matchLabels from event source %s", crossNSObjectRefString(source))
return false
}

return sel.Matches(labels.Set(obj.GetLabels()))
}

// enhanceEventWithAlertMetadata enhances the event with Alert metadata.
func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert apiv1beta3.Alert) {
func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert *apiv1beta3.Alert) {
logger := loggerWithEventInvolvedObject(loggerWithAlert(s.logger, alert), event)

meta := event.Metadata
if meta == nil {
meta = make(map[string]string)
Expand All @@ -392,11 +418,9 @@ func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert
if _, alreadyPresent := meta[key]; !alreadyPresent {
meta[key] = value
} else {
s.logger.Info("metadata key found in the existing set of metadata",
"reconciler kind", apiv1beta3.AlertKind,
"name", alert.Name,
"namespace", alert.Namespace,
"key", key)
logger.Info("metadata key found in the existing set of metadata", "key", key)
s.Eventf(alert, corev1.EventTypeWarning, "MetadataAppendFailed",
"metadata key found in the existing set of metadata for '%s' in %s", key, involvedObjectString(event.InvolvedObject))
}
}

Expand Down
39 changes: 28 additions & 11 deletions internal/server/event_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
log "sigs.k8s.io/controller-runtime/pkg/log"

Expand Down Expand Up @@ -273,8 +274,9 @@ func TestFilterAlertsForEvent(t *testing.T) {
builder := fakeclient.NewClientBuilder().WithScheme(scheme)
builder.WithObjects(testProvider)
eventServer := EventServer{
kubeClient: builder.Build(),
logger: log.Log,
kubeClient: builder.Build(),
logger: log.Log,
EventRecorder: record.NewFakeRecorder(32),
}

result := eventServer.filterAlertsForEvent(context.TODO(), alerts, testEvent)
Expand Down Expand Up @@ -356,11 +358,12 @@ func TestDispatchNotification(t *testing.T) {
builder := fakeclient.NewClientBuilder().WithScheme(scheme)
builder.WithObjects(provider)
eventServer := EventServer{
kubeClient: builder.Build(),
logger: log.Log,
kubeClient: builder.Build(),
logger: log.Log,
EventRecorder: record.NewFakeRecorder(32),
}

err := eventServer.dispatchNotification(context.TODO(), testEvent, *alert)
err := eventServer.dispatchNotification(context.TODO(), testEvent, alert)
g.Expect(err != nil).To(Equal(tt.wantErr))
})
}
Expand Down Expand Up @@ -501,9 +504,10 @@ func TestGetNotificationParams(t *testing.T) {
kubeClient: builder.Build(),
logger: log.Log,
noCrossNamespaceRefs: tt.noCrossNSRefs,
EventRecorder: record.NewFakeRecorder(32),
}

_, n, _, _, err := eventServer.getNotificationParams(context.TODO(), event, *alert)
_, n, _, _, err := eventServer.getNotificationParams(context.TODO(), event, alert)
g.Expect(err != nil).To(Equal(tt.wantErr))
if tt.alertSummary != "" {
g.Expect(n.Metadata["summary"]).To(Equal(tt.alertSummary))
Expand Down Expand Up @@ -880,18 +884,31 @@ func TestEventMatchesAlert(t *testing.T) {
}

eventServer := EventServer{
kubeClient: builder.Build(),
logger: log.Log,
kubeClient: builder.Build(),
logger: log.Log,
EventRecorder: record.NewFakeRecorder(32),
}
alert := &apiv1beta3.Alert{
ObjectMeta: metav1.ObjectMeta{
Name: "test-alert",
Namespace: "test-ns",
},
Spec: apiv1beta3.AlertSpec{
EventSeverity: tt.severity,
},
}

result := eventServer.eventMatchesAlert(context.TODO(), tt.event, tt.source, tt.severity)
result := eventServer.eventMatchesAlertSource(context.TODO(), tt.event, alert, tt.source)
g.Expect(result).To(Equal(tt.wantResult))
})
}
}

func TestEnhanceEventWithAlertMetadata(t *testing.T) {
s := &EventServer{logger: log.Log}
s := &EventServer{
logger: log.Log,
EventRecorder: record.NewFakeRecorder(32),
}

for name, tt := range map[string]struct {
event eventv1.Event
Expand Down Expand Up @@ -963,7 +980,7 @@ func TestEnhanceEventWithAlertMetadata(t *testing.T) {
t.Run(name, func(t *testing.T) {
g := NewGomegaWithT(t)

s.enhanceEventWithAlertMetadata(&tt.event, tt.alert)
s.enhanceEventWithAlertMetadata(&tt.event, &tt.alert)
g.Expect(tt.event.Metadata).To(BeEquivalentTo(tt.expectedMetadata))
})
}
Expand Down
5 changes: 4 additions & 1 deletion internal/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/sethvargo/go-limiter/httplimit"
"github.com/slok/go-http-metrics/middleware"
"github.com/slok/go-http-metrics/middleware/std"
kuberecorder "k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"

eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
Expand All @@ -48,14 +49,16 @@ type EventServer struct {
logger logr.Logger
kubeClient client.Client
noCrossNamespaceRefs bool
kuberecorder.EventRecorder
}

// NewEventServer returns an HTTP server that handles events
func NewEventServer(port string, logger logr.Logger, kubeClient client.Client, noCrossNamespaceRefs bool) *EventServer {
func NewEventServer(port string, logger logr.Logger, kubeClient client.Client, eventRecorder kuberecorder.EventRecorder, noCrossNamespaceRefs bool) *EventServer {
return &EventServer{
port: port,
logger: logger.WithName("event-server"),
kubeClient: kubeClient,
EventRecorder: eventRecorder,
noCrossNamespaceRefs: noCrossNamespaceRefs,
}
}
Expand Down
Loading

0 comments on commit f650b00

Please sign in to comment.