Skip to content

Commit

Permalink
Reduce logging repeated code in event server
Browse files Browse the repository at this point in the history
Signed-off-by: Matheus Pimenta <matheuscscp@gmail.com>
  • Loading branch information
matheuscscp committed Jun 12, 2023
1 parent 56a77b7 commit 7f27249
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 73 deletions.
91 changes: 33 additions & 58 deletions internal/server/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/yaml"

eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
Expand All @@ -43,14 +45,15 @@ import (
func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
eventLogger := log.FromContext(r.Context())

ctx, cancel := context.WithTimeout(r.Context(), 15*time.Second)
defer cancel()

var allAlerts apiv1beta2.AlertList
err := s.kubeClient.List(ctx, &allAlerts)
if err != nil {
s.logger.Error(err, "listing alerts failed")
eventLogger.Error(err, "listing alerts failed")
w.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -59,6 +62,9 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
alerts := make([]apiv1beta2.Alert, 0)
each_alert:
for _, alert := range allAlerts.Items {
alertLogger := eventLogger.WithValues("alert", client.ObjectKeyFromObject(&alert))
ctx := log.IntoContext(ctx, alertLogger)

// skip suspended and not ready alerts
isReady := conditions.IsReady(&alert)
if alert.Spec.Suspend || !isReady {
Expand All @@ -75,7 +81,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
break
}
} else {
s.logger.Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", inclusionRegex))
alertLogger.Error(err, fmt.Sprintf("failed to compile inclusion regex: %s", inclusionRegex))
}
}
if !include {
Expand All @@ -91,7 +97,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
continue each_alert
}
} else {
s.logger.Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exclusionRegex))
alertLogger.Error(err, fmt.Sprintf("failed to compile exclusion regex: %s", exclusionRegex))
}
}
}
Expand All @@ -109,27 +115,24 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
}

if len(alerts) == 0 {
s.logger.Info("Discarding event, no alerts found for the involved object",
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"namespace", event.InvolvedObject.Namespace)
eventLogger.Info("Discarding event, no alerts found for the involved object")
w.WriteHeader(http.StatusAccepted)
return
}

s.logger.Info(fmt.Sprintf("Dispatching event: %s", event.Message),
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"namespace", event.InvolvedObject.Namespace)
eventLogger.Info(fmt.Sprintf("Dispatching event: %s", event.Message))

// dispatch notifications
for _, alert := range alerts {
alertLogger := eventLogger.WithValues("alert", client.ObjectKeyFromObject(&alert))
ctx := log.IntoContext(ctx, alertLogger)

// verify if event comes from a different namespace
if s.noCrossNamespaceRefs && event.InvolvedObject.Namespace != alert.Namespace {
accessDenied := fmt.Errorf(
"alert '%s/%s' can't process event from '%s/%s/%s', cross-namespace references have been blocked",
alert.Namespace, alert.Name, event.InvolvedObject.Kind, event.InvolvedObject.Namespace, event.InvolvedObject.Name)
s.logger.Error(accessDenied, "Discarding event, access denied to cross-namespace sources")
alertLogger.Error(accessDenied, "Discarding event, access denied to cross-namespace sources")
continue
}

Expand All @@ -138,10 +141,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)

err = s.kubeClient.Get(ctx, providerName, &provider)
if err != nil {
s.logger.Error(err, "failed to read provider",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to read provider")
continue
}

Expand All @@ -161,10 +161,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)

err = s.kubeClient.Get(ctx, secretName, &secret)
if err != nil {
s.logger.Error(err, "failed to read secret",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to read secret")
continue
}

Expand All @@ -191,10 +188,7 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
if h, ok := secret.Data["headers"]; ok {
err := yaml.Unmarshal(h, &headers)
if err != nil {
s.logger.Error(err, "failed to read headers from secret",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to read headers from secret")
continue
}
}
Expand All @@ -207,68 +201,51 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)

err = s.kubeClient.Get(ctx, secretName, &secret)
if err != nil {
s.logger.Error(err, "failed to read secret",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to read cert secret")
continue
}

caFile, ok := secret.Data["caFile"]
if !ok {
s.logger.Error(err, "failed to read secret key caFile",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to read secret key caFile")
continue
}

certPool = x509.NewCertPool()
ok = certPool.AppendCertsFromPEM(caFile)
if !ok {
s.logger.Error(err, "could not append to cert pool",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "could not append to cert pool")
continue
}
}

if webhook == "" {
s.logger.Error(nil, "provider has no address",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(nil, "provider has no address")
continue
}

factory := notifier.NewFactory(webhook, proxy, username, provider.Spec.Channel, token, headers, certPool, password, string(provider.UID))
sender, err := factory.Notifier(provider.Spec.Type)
if err != nil {
s.logger.Error(err, "failed to initialize provider",
"reconciler kind", apiv1beta2.ProviderKind,
"name", providerName.Name,
"namespace", providerName.Namespace)
alertLogger.Error(err, "failed to initialize provider")
continue
}

notification := *event.DeepCopy()
s.enhanceEventWithAlertMetadata(&notification, alert)
s.enhanceEventWithAlertMetadata(ctx, &notification, alert)

go func(n notifier.Interface, e eventv1.Event) {
ctx, cancel := context.WithTimeout(context.Background(), provider.GetTimeout())
defer cancel()
ctx = log.IntoContext(ctx, alertLogger)
if err := n.Post(ctx, e); err != nil {
maskedErrStr, maskErr := masktoken.MaskTokenFromString(err.Error(), token)
if maskErr != nil {
err = maskErr
} else {
err = errors.New(maskedErrStr)
}
s.logger.Error(err, "failed to send notification",
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"namespace", event.InvolvedObject.Namespace)
alertLogger.Error(err, "failed to send notification")
}
}(sender, notification)
}
Expand All @@ -278,6 +255,8 @@ func (s *EventServer) handleEvent() func(w http.ResponseWriter, r *http.Request)
}

func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Event, source apiv1.CrossNamespaceObjectReference, severity string) bool {
alertLogger := log.FromContext(ctx)

if event.InvolvedObject.Namespace == source.Namespace && event.InvolvedObject.Kind == source.Kind {
if event.Severity == severity || severity == eventv1.EventSeverityInfo {
labelMatch := true
Expand All @@ -291,15 +270,14 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even
Namespace: event.InvolvedObject.Namespace,
Name: event.InvolvedObject.Name,
}, &obj); err != nil {
s.logger.Error(err, "error getting object", "kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name, "apiVersion", event.InvolvedObject.APIVersion)
alertLogger.Error(err, "error getting object")
}

sel, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: source.MatchLabels,
})
if err != nil {
s.logger.Error(err, fmt.Sprintf("error using matchLabels from event source '%s'", source.Name))
alertLogger.Error(err, fmt.Sprintf("error using matchLabels from event source '%s'", source.Name))
}

labelMatch = sel.Matches(labels.Set(obj.GetLabels()))
Expand All @@ -314,7 +292,7 @@ func (s *EventServer) eventMatchesAlert(ctx context.Context, event *eventv1.Even
return false
}

func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert apiv1beta2.Alert) {
func (s *EventServer) enhanceEventWithAlertMetadata(ctx context.Context, event *eventv1.Event, alert apiv1beta2.Alert) {
meta := event.Metadata
if meta == nil {
meta = make(map[string]string)
Expand All @@ -324,11 +302,8 @@ func (s *EventServer) enhanceEventWithAlertMetadata(event *eventv1.Event, alert
if _, alreadyPresent := meta[key]; !alreadyPresent {
meta[key] = value
} else {
s.logger.Info("metadata key found in the existing set of metadata",
"reconciler kind", apiv1beta2.AlertKind,
"name", alert.Name,
"namespace", alert.Namespace,
"key", key)
log.FromContext(ctx).
Info("metadata key found in the existing set of metadata", "key", key)
}
}

Expand Down
3 changes: 2 additions & 1 deletion internal/server/event_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package server

import (
"context"
"testing"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestEnhanceEventWithAlertMetadata(t *testing.T) {
t.Run(name, func(t *testing.T) {
g := NewGomegaWithT(t)

s.enhanceEventWithAlertMetadata(&tt.event, tt.alert)
s.enhanceEventWithAlertMetadata(context.Background(), &tt.event, tt.alert)
g.Expect(tt.event.Metadata).To(BeEquivalentTo(tt.expectedMetadata))
})
}
Expand Down
31 changes: 17 additions & 14 deletions internal/server/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/slok/go-http-metrics/middleware"
"github.com/slok/go-http-metrics/middleware/std"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
)
Expand Down Expand Up @@ -68,8 +69,8 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
var handler http.Handler = http.HandlerFunc(s.handleEvent())
for _, middleware := range []func(http.Handler) http.Handler{
limitMiddleware.Handle,
s.logRateLimitMiddleware,
s.cleanupMetadataMiddleware,
logRateLimitMiddleware,
s.eventMiddleware,
} {
handler = middleware(handler)
}
Expand Down Expand Up @@ -100,10 +101,12 @@ func (s *EventServer) ListenAndServe(stopCh <-chan struct{}, mdlw middleware.Mid
}
}

// cleanupMetadataMiddleware cleans up the metadata using cleanupMetadata() and
// eventMiddleware cleans up the event metadata using cleanupMetadata() and
// adds the cleaned event in the request context which can then be queried and
// used directly by the other http handlers.
func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler {
// used directly by the other http handlers. This middleware also adds a
// logger with the event's involved object's kind, name and namespace to the
// request context.
func (s *EventServer) eventMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
Expand All @@ -124,10 +127,13 @@ func (s *EventServer) cleanupMetadataMiddleware(h http.Handler) http.Handler {

cleanupMetadata(event)

ctxWithEvent := context.WithValue(r.Context(), eventContextKey{}, event)
reqWithEvent := r.WithContext(ctxWithEvent)
eventLogger := s.logger.WithValues("eventInvolvedObject", event.InvolvedObject)

h.ServeHTTP(w, reqWithEvent)
enhancedCtx := context.WithValue(r.Context(), eventContextKey{}, event)
enhancedCtx = log.IntoContext(enhancedCtx, eventLogger)
enhancedReq := r.WithContext(enhancedCtx)

h.ServeHTTP(w, enhancedReq)
})
}

Expand Down Expand Up @@ -172,7 +178,7 @@ func (r *statusRecorder) WriteHeader(status int) {
r.ResponseWriter.WriteHeader(status)
}

func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
func logRateLimitMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
recorder := &statusRecorder{
ResponseWriter: w,
Expand All @@ -181,11 +187,8 @@ func (s *EventServer) logRateLimitMiddleware(h http.Handler) http.Handler {
h.ServeHTTP(recorder, r)

if recorder.Status == http.StatusTooManyRequests {
event := r.Context().Value(eventContextKey{}).(*eventv1.Event)
s.logger.V(1).Info("Discarding event, rate limiting duplicate events",
"reconciler kind", event.InvolvedObject.Kind,
"name", event.InvolvedObject.Name,
"namespace", event.InvolvedObject.Namespace)
log.FromContext(r.Context()).V(1).
Info("Discarding event, rate limiting duplicate events")
}
})
}
Expand Down

0 comments on commit 7f27249

Please sign in to comment.