Skip to content

Commit

Permalink
Gate receiver names behind a feature flag
Browse files Browse the repository at this point in the history
Alertmanager doesn't actually have a feature flag system yet, so this
invents the universe, mostly by cribbing off the Prometheus model.

Here we introduce a new CLI flag `--enable-feature` that we parse into
a new `featureConfigs` struct. We also update the PipelineBuilder
to take that flag and pass it through the metrics infrastructure. This
also required a bit of deduplication to make finding the correct
labelset (with or without the receiver name) easier in every place that
it's used, and updating the tests to use that same mechanism.

Signed-off-by: sinkingpoint <colin@quirl.co.nz>
  • Loading branch information
sinkingpoint committed Jun 22, 2023
1 parent b240edc commit 769396f
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 59 deletions.
27 changes: 26 additions & 1 deletion cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,23 @@ func main() {
os.Exit(run())
}

type featureConfigs struct {
enableReceiverNamesInMetrics bool
}

func (c *featureConfigs) setFeatureListOptions(logger log.Logger, features []string) error {
for _, feature := range features {
switch feature {
case "receiver-names-in-metrics":
c.enableReceiverNamesInMetrics = true
default:
logger.Log("msg", "Unknown option for --enable-feature", "option", feature)
}
}

return nil
}

func run() int {
if os.Getenv("DEBUG") != "" {
runtime.SetBlockProfileRate(20)
Expand Down Expand Up @@ -231,6 +248,7 @@ func run() int {
tlsConfigFile = kingpin.Flag("cluster.tls-config", "[EXPERIMENTAL] Path to config yaml file that can enable mutual TLS within the gossip protocol.").Default("").String()
allowInsecureAdvertise = kingpin.Flag("cluster.allow-insecure-public-advertise-address-discovery", "[EXPERIMENTAL] Allow alertmanager to discover and listen on a public IP address.").Bool()
label = kingpin.Flag("cluster.label", "The cluster label is an optional string to include on each packet and stream. It uniquely identifies the cluster and prevents cross-communication issues when sending gossip messages.").Default("").String()
featureFlags = kingpin.Flag("enable-feature", "Comma separated feature names to enable. Valid options: receiver-names-in-metrics").Default("").Strings()
)

promlogflag.AddFlags(kingpin.CommandLine, &promlogConfig)
Expand All @@ -245,6 +263,12 @@ func run() int {
level.Info(logger).Log("msg", "Starting Alertmanager", "version", version.Info())
level.Info(logger).Log("build_context", version.BuildContext())

featureConfig := featureConfigs{}
if err := featureConfig.setFeatureListOptions(logger, *featureFlags); err != nil {
fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err))
os.Exit(1)
}

err := os.MkdirAll(*dataDir, 0o777)
if err != nil {
level.Error(logger).Log("msg", "Unable to create data directory", "err", err)
Expand Down Expand Up @@ -421,7 +445,7 @@ func run() int {
)

dispMetrics := dispatch.NewDispatcherMetrics(false, prometheus.DefaultRegisterer)
pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer)
pipelineBuilder := notify.NewPipelineBuilder(prometheus.DefaultRegisterer, featureConfig.enableReceiverNamesInMetrics)
configLogger := log.With(logger, "component", "configuration")
configCoordinator := config.NewCoordinator(
*configFile,
Expand Down Expand Up @@ -493,6 +517,7 @@ func run() int {
notificationLog,
pipelinePeer,
)

configuredReceivers.Set(float64(len(activeReceivers)))
configuredIntegrations.Set(float64(integrationsNum))

Expand Down
109 changes: 67 additions & 42 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,74 +251,91 @@ type Metrics struct {
numNotificationRequestsTotal *prometheus.CounterVec
numNotificationRequestsFailedTotal *prometheus.CounterVec
notificationLatencySeconds *prometheus.HistogramVec

enableReceiverNamesInMetrics bool
}

func NewMetrics(r prometheus.Registerer) *Metrics {
func NewMetrics(r prometheus.Registerer, enableReceiverNamesInMetrics bool) *Metrics {
labels := []string{"integration"}
if enableReceiverNamesInMetrics {
labels = append(labels, "receiver_name")
}

m := &Metrics{
numNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_total",
Help: "The total number of attempted notifications.",
}, []string{"integration", "receiver_name"}),
}, labels),
numTotalFailedNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notifications_failed_total",
Help: "The total number of failed notifications.",
}, []string{"integration", "reason", "receiver_name"}),
}, append(labels, "reason")),
numNotificationRequestsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notification_requests_total",
Help: "The total number of attempted notification requests.",
}, []string{"integration", "receiver_name"}),
}, labels),
numNotificationRequestsFailedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "alertmanager",
Name: "notification_requests_failed_total",
Help: "The total number of failed notification requests.",
}, []string{"integration", "receiver_name"}),
}, labels),
notificationLatencySeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "alertmanager",
Name: "notification_latency_seconds",
Help: "The latency of notifications in seconds.",
Buckets: []float64{1, 5, 10, 15, 20},
}, []string{"integration", "receiver_name"}),
}
for _, integration := range []string{
"email",
"msteams",
"pagerduty",
"wechat",
"pushover",
"slack",
"opsgenie",
"webhook",
"victorops",
"sns",
"telegram",
} {
m.numNotifications.WithLabelValues(integration)
m.numNotificationRequestsTotal.WithLabelValues(integration)
m.numNotificationRequestsFailedTotal.WithLabelValues(integration)
m.notificationLatencySeconds.WithLabelValues(integration)

for _, reason := range possibleFailureReasonCategory {
m.numTotalFailedNotifications.WithLabelValues(integration, reason)
}, labels),
enableReceiverNamesInMetrics: enableReceiverNamesInMetrics,
}

// If we aren't including receiver names then initialise the possible label combinations.
// We don't have receiver names here, so if we are including them then we can't do this here.
if !enableReceiverNamesInMetrics {
for _, integration := range []string{
"email",
"msteams",
"pagerduty",
"wechat",
"pushover",
"slack",
"opsgenie",
"webhook",
"victorops",
"sns",
"telegram",
} {
m.numNotifications.WithLabelValues(integration)
m.numNotificationRequestsTotal.WithLabelValues(integration)
m.numNotificationRequestsFailedTotal.WithLabelValues(integration)
m.notificationLatencySeconds.WithLabelValues(integration)

for _, reason := range possibleFailureReasonCategory {
m.numTotalFailedNotifications.WithLabelValues(integration, reason)
}
}
}

r.MustRegister(
m.numNotifications, m.numTotalFailedNotifications,
m.numNotificationRequestsTotal, m.numNotificationRequestsFailedTotal,
m.notificationLatencySeconds,
)

return m
}

type PipelineBuilder struct {
metrics *Metrics
metrics *Metrics
enableReceiverNamesInMetrics bool
}

func NewPipelineBuilder(r prometheus.Registerer) *PipelineBuilder {
func NewPipelineBuilder(r prometheus.Registerer, enableReceiverNamesInMetrics bool) *PipelineBuilder {
return &PipelineBuilder{
metrics: NewMetrics(r),
metrics: NewMetrics(r, enableReceiverNamesInMetrics),
enableReceiverNamesInMetrics: enableReceiverNamesInMetrics,
}
}

Expand All @@ -341,7 +358,7 @@ func (pb *PipelineBuilder) New(
ss := NewMuteStage(silencer)

for name := range receivers {
st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics, pb.enableReceiverNamesInMetrics)
rs[name] = MultiStage{ms, is, tas, tms, ss, st}
}
return rs
Expand All @@ -354,6 +371,7 @@ func createReceiverStage(
wait func() time.Duration,
notificationLog NotificationLog,
metrics *Metrics,
enableReceiverNamesInMetrics bool,
) Stage {
var fs FanoutStage
for i := range integrations {
Expand Down Expand Up @@ -651,30 +669,37 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al
// RetryStage notifies via passed integration with exponential backoff until it
// succeeds. It aborts if the context is canceled or timed out.
type RetryStage struct {
integration Integration
groupName string
metrics *Metrics
integration Integration
groupName string
metrics *Metrics
metricLabels []string
}

// NewRetryStage returns a new instance of a RetryStage.
func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage {
labels := []string{i.Name()}
if metrics.enableReceiverNamesInMetrics {
labels = append(labels, i.receiverName)
}

return &RetryStage{
integration: i,
groupName: groupName,
metrics: metrics,
integration: i,
groupName: groupName,
metrics: metrics,
metricLabels: labels,
}
}

func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
r.metrics.numNotifications.WithLabelValues(r.integration.Name(), r.integration.receiverName).Inc()
r.metrics.numNotifications.WithLabelValues(r.metricLabels...).Inc()
ctx, alerts, err := r.exec(ctx, l, alerts...)

failureReason := DefaultReason.String()
if err != nil {
if e, ok := errors.Cause(err).(*ErrorWithReason); ok {
failureReason = e.Reason.String()
}
r.metrics.numTotalFailedNotifications.WithLabelValues(r.integration.Name(), failureReason, r.integration.receiverName).Inc()
r.metrics.numTotalFailedNotifications.WithLabelValues(append(r.metricLabels, failureReason)...).Inc()
}
return ctx, alerts, err
}
Expand Down Expand Up @@ -731,10 +756,10 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
case <-tick.C:
now := time.Now()
retry, err := r.integration.Notify(ctx, sent...)
r.metrics.notificationLatencySeconds.WithLabelValues(r.integration.Name(), r.integration.receiverName).Observe(time.Since(now).Seconds())
r.metrics.numNotificationRequestsTotal.WithLabelValues(r.integration.Name(), r.integration.receiverName).Inc()
r.metrics.notificationLatencySeconds.WithLabelValues(r.metricLabels...).Observe(time.Since(now).Seconds())
r.metrics.numNotificationRequestsTotal.WithLabelValues(r.metricLabels...).Inc()
if err != nil {
r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.integration.Name(), r.integration.receiverName).Inc()
r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.metricLabels...).Inc()
if !retry {
return ctx, alerts, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i)
}
Expand Down
20 changes: 4 additions & 16 deletions notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,7 @@ func TestRetryStageWithError(t *testing.T) {
}),
rs: sendResolved(false),
}
r := RetryStage{
integration: i,
metrics: NewMetrics(prometheus.NewRegistry()),
}
r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), false))

alerts := []*types.Alert{
{
Expand Down Expand Up @@ -447,10 +444,7 @@ func TestRetryStageWithErrorCode(t *testing.T) {
}),
rs: sendResolved(false),
}
r := RetryStage{
integration: i,
metrics: NewMetrics(prometheus.NewRegistry()),
}
r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), false))

alerts := []*types.Alert{
{
Expand Down Expand Up @@ -483,10 +477,7 @@ func TestRetryStageNoResolved(t *testing.T) {
}),
rs: sendResolved(false),
}
r := RetryStage{
integration: i,
metrics: NewMetrics(prometheus.NewRegistry()),
}
r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), false))

alerts := []*types.Alert{
{
Expand Down Expand Up @@ -537,10 +528,7 @@ func TestRetryStageSendResolved(t *testing.T) {
}),
rs: sendResolved(true),
}
r := RetryStage{
integration: i,
metrics: NewMetrics(prometheus.NewRegistry()),
}
r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), false))

alerts := []*types.Alert{
{
Expand Down

0 comments on commit 769396f

Please sign in to comment.