From f1fcb41fb4e8a56f4fd4d02953bb2088ba215c4e Mon Sep 17 00:00:00 2001 From: Yulya Artyukhina Date: Wed, 28 Jun 2023 10:15:19 +0200 Subject: [PATCH] Add "user_was_notified_of_alert_groups" metric (#2334) This PR adds new metric for Prometheus exporter "user_was_notified_of_alert_groups" which counts how many alert groups user was notified of. ## Checklist - [x] Unit, integration, and e2e (if applicable) tests updated - [x] Documentation added (or `pr:no public docs` PR label added if not required) - [x] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not required) --------- Co-authored-by: Joey Orlando --- CHANGELOG.md | 6 + docs/sources/insights-and-metrics/_index.md | 46 +++++-- engine/apps/alerts/tasks/notify_user.py | 12 ++ engine/apps/metrics_exporter/constants.py | 13 ++ engine/apps/metrics_exporter/helpers.py | 54 ++++++++ .../metrics_exporter/metrics_collectors.py | 114 ++++++++++++---- engine/apps/metrics_exporter/tasks.py | 129 ++++++++++++++---- .../apps/metrics_exporter/tests/conftest.py | 48 ++++++- .../tests/test_calculation_metrics.py | 78 ++++++++++- .../tests/test_metrics_collectors.py | 9 +- .../tests/test_update_metics_cache.py | 85 ++++++++++++ engine/settings/prod_without_db.py | 2 + 12 files changed, 522 insertions(+), 74 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d756395694..75375d7a4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### Added + +- Add metric "how many alert groups user was notified of" to Prometheus exporter ([#2334](https://github.com/grafana/oncall/pull/2334/)) + ## v1.3.2 ### Changed diff --git a/docs/sources/insights-and-metrics/_index.md b/docs/sources/insights-and-metrics/_index.md index 143353e6d4..c9fbe9dd18 100644 --- a/docs/sources/insights-and-metrics/_index.md +++ b/docs/sources/insights-and-metrics/_index.md @@ -21,6 +21,8 @@ It is a gauge, and its name has the suffix `alert_groups_total` - Response time on alert groups for each integration (mean time between the start and first action of all alert groups for the last 7 days in selected period). It is a histogram, and its name has the suffix `alert_groups_response_time` with the histogram suffixes such as `_bucket`, `_sum` and `_count` +- A total count of alert groups users were notified of for each user. It is a counter, and its name has the suffix +`user_was_notified_of_alert_groups_total` You can find more information about metrics types in the [Prometheus documentation](https://prometheus.io/docs/concepts/metric_types). @@ -31,15 +33,17 @@ To retrieve Prometheus metrics use PromQL. If you are not familiar with PromQL, OnCall application metrics are collected in preinstalled `grafanacloud_usage` datasource and are available for every cloud instance. -Metrics have prefix `grafanacloud_oncall_instance`, e.g. `grafanacloud_oncall_instance_alert_groups_total` and -`grafanacloud_oncall_instance_alert_groups_response_time_seconds_bucket`. +Metrics have prefix `grafanacloud_oncall_instance`, e.g. `grafanacloud_oncall_instance_alert_groups_total`, +`grafanacloud_oncall_instance_alert_groups_response_time_seconds_bucket` and +`grafanacloud_oncall_instance_user_was_notified_of_alert_groups_total`. ### For open source customers To collect OnCall application metrics you need to set up Prometheus and add it to your Grafana instance as a datasource. You can find more information about Prometheus setup in the [OSS documentation](https://github.com/grafana/oncall#readme) -Metrics will have the prefix `oncall`, e.g. `oncall_alert_groups_total` and `oncall_alert_groups_response_time_seconds_bucket`. +Metrics will have the prefix `oncall`, e.g. `oncall_alert_groups_total`, `oncall_alert_groups_response_time_seconds_bucket` +and `oncall_user_was_notified_of_alert_groups_total`. Your metrics may also have additional labels, such as `pod`, `instance`, `container`, depending on your Prometheus setup. @@ -86,17 +90,37 @@ in Grafana stack "test_stack": grafanacloud_oncall_instance_alert_groups_response_time_seconds_bucket{slug="test_stack", integration="Grafana Alerting", le="600"} ``` +### Metric Alert groups user was notified of + +This metric has the following labels: + +| Label Name | Description | +|---------------|:-----------------------------------------------------------------------------:| +| `id` | ID of Grafana instance (stack) | +| `slug` | Slug of Grafana instance (stack) | +| `org_id` | ID of Grafana organization | +| `username` | User username | + +**Query example:** + +Get the number of alert groups user with username "alex" was notified of in Grafana stack "test_stack": + +```promql +grafanacloud_oncall_instance_user_was_notified_of_alert_groups_total{slug="test_stack", username="alex"} +``` + ### Dashboard -To import OnCall metrics dashboard go to `Administration` -> `Plugins` page, find OnCall in the plugins list, open -`Dashboards` tab at the OnCall plugin settings page and click "Import" near "OnCall metrics". After that you can find -the "OnCall metrics" dashboard in your dashboards list. In the datasource dropdown select your Prometheus datasource -(for Cloud customers it's `grafanacloud_usage`). You can filter data by your Grafana instances, teams and integrations. +You can find the "OnCall Metrics" dashboard in the list of your dashboards in the folder `General`, it has the tag +`oncall`. In the datasource dropdown select your Prometheus datasource (for Cloud customers it's `grafanacloud_usage`). +You can filter data by your Grafana instances, teams and integrations. + +To re-import OnCall metrics dashboard go to `Administration` -> `Plugins` page, find OnCall in the plugins list, open +`Dashboards` tab at the OnCall plugin settings page and click "Re-import" near "OnCall Metrics". After that you can find +the "OnCall Metrics" dashboard in your dashboards list. -To update the dashboard to the newest version go to `Dashboards` tab at the OnCall plugin settings page and click -“Re-import”. -Be aware: if you have made changes to the dashboard, they will be deleted after re-importing. To save your changes go -to the dashboard settings, click "Save as" and save a copy of the dashboard. +Be aware: if you have made changes to the dashboard, they will be lost after re-importing or after the plugin update. +To save your changes go to the "OnCall Metrics" dashboard settings, click "Save as" and save a copy of the dashboard. ## Insight Logs diff --git a/engine/apps/alerts/tasks/notify_user.py b/engine/apps/alerts/tasks/notify_user.py index 58422b9a73..538569ed78 100644 --- a/engine/apps/alerts/tasks/notify_user.py +++ b/engine/apps/alerts/tasks/notify_user.py @@ -9,6 +9,7 @@ from apps.alerts.constants import NEXT_ESCALATION_DELAY from apps.alerts.signals import user_notification_action_triggered_signal from apps.base.messaging import get_messaging_backend_from_id +from apps.metrics_exporter.helpers import metrics_update_user_cache from apps.phone_notifications.phone_backend import PhoneBackend from common.custom_celery_tasks import shared_dedicated_queue_retry_task @@ -186,6 +187,17 @@ def notify_user_task( notification_channel=notification_policy.notify_by, ) if log_record: # log_record is None if user notification policy step is unspecified + # if this is the first notification step, and user hasn't been notified for this alert group - update metric + if ( + log_record.type == UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED + and previous_notification_policy_pk is None + and not user.personal_log_records.filter( + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + alert_group_id=alert_group_pk, + ).exists() + ): + metrics_update_user_cache(user) + log_record.save() if notify_user_task.request.retries == 0: transaction.on_commit(lambda: send_user_notification_signal.apply_async((log_record.pk,))) diff --git a/engine/apps/metrics_exporter/constants.py b/engine/apps/metrics_exporter/constants.py index 66619b16bb..000956271f 100644 --- a/engine/apps/metrics_exporter/constants.py +++ b/engine/apps/metrics_exporter/constants.py @@ -1,6 +1,8 @@ import datetime import typing +from django.conf import settings + class AlertGroupsTotalMetricsDict(typing.TypedDict): integration_name: str @@ -25,6 +27,14 @@ class AlertGroupsResponseTimeMetricsDict(typing.TypedDict): response_time: list +class UserWasNotifiedOfAlertGroupsMetricsDict(typing.TypedDict): + user_username: str + org_id: int + slug: str + id: int + counter: int + + class RecalculateMetricsTimer(typing.TypedDict): recalculate_timeout: int forced_started: bool @@ -35,6 +45,9 @@ class RecalculateOrgMetricsDict(typing.TypedDict): force: bool +METRICS_PREFIX = "oncall_" if settings.IS_OPEN_SOURCE else "grafanacloud_oncall_instance_" +USER_WAS_NOTIFIED_OF_ALERT_GROUPS = METRICS_PREFIX + "user_was_notified_of_alert_groups" + ALERT_GROUPS_TOTAL = "oncall_alert_groups_total" ALERT_GROUPS_RESPONSE_TIME = "oncall_alert_groups_response_time_seconds" diff --git a/engine/apps/metrics_exporter/helpers.py b/engine/apps/metrics_exporter/helpers.py index a8ebb066df..6c3b56d81c 100644 --- a/engine/apps/metrics_exporter/helpers.py +++ b/engine/apps/metrics_exporter/helpers.py @@ -17,8 +17,11 @@ METRICS_RECALCULATION_CACHE_TIMEOUT, METRICS_RECALCULATION_CACHE_TIMEOUT_DISPERSE, METRICS_RESPONSE_TIME_CALCULATION_PERIOD, + USER_WAS_NOTIFIED_OF_ALERT_GROUPS, AlertGroupsResponseTimeMetricsDict, AlertGroupsTotalMetricsDict, + RecalculateMetricsTimer, + UserWasNotifiedOfAlertGroupsMetricsDict, ) if typing.TYPE_CHECKING: @@ -46,6 +49,27 @@ def get_organization_ids(): return organizations_ids +def is_allowed_to_start_metrics_calculation(organization_id, force=False) -> bool: + """Check if metrics_cache_timer doesn't exist or if recalculation was started by force.""" + recalculate_timeout = get_metrics_recalculation_timeout() + metrics_cache_timer_key = get_metrics_cache_timer_key(organization_id) + metrics_cache_timer = cache.get(metrics_cache_timer_key) + if metrics_cache_timer: + if not force or metrics_cache_timer.get("forced_started", False): + return False + else: + metrics_cache_timer["forced_started"] = True + else: + metrics_cache_timer: RecalculateMetricsTimer = { + "recalculate_timeout": recalculate_timeout, + "forced_started": force, + } + + metrics_cache_timer["recalculate_timeout"] = recalculate_timeout + cache.set(metrics_cache_timer_key, metrics_cache_timer, timeout=recalculate_timeout) + return True + + def get_response_time_period() -> datetime.datetime: """Returns period for response time calculation""" return timezone.now() - METRICS_RESPONSE_TIME_CALCULATION_PERIOD @@ -87,6 +111,14 @@ def get_metric_alert_groups_response_time_key(organization_id) -> str: return f"{ALERT_GROUPS_RESPONSE_TIME}_{organization_id}" +def get_metric_user_was_notified_of_alert_groups_key(organization_id) -> str: + return f"{USER_WAS_NOTIFIED_OF_ALERT_GROUPS}_{organization_id}" + + +def get_metric_calculation_started_key(metric_name) -> str: + return f"calculation_started_for_{metric_name}" + + def metrics_update_integration_cache(integration: "AlertReceiveChannel") -> None: """Update integration data in metrics cache""" metrics_cache_timeout = get_metrics_cache_timeout(integration.organization_id) @@ -239,3 +271,25 @@ def metrics_update_alert_groups_response_time_cache(integrations_response_time, continue integration_response_time_metrics["response_time"].extend(integration_response_time) cache.set(metric_alert_groups_response_time_key, metric_alert_groups_response_time, timeout=metrics_cache_timeout) + + +def metrics_update_user_cache(user): + """Update "user_was_notified_of_alert_groups" metric cache.""" + metrics_cache_timeout = get_metrics_cache_timeout(user.organization_id) + metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(user.organization_id) + metric_user_was_notified: typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict] = cache.get( + metric_user_was_notified_key, {} + ) + + metric_user_was_notified.setdefault( + user.id, + { + "user_username": user.username, + "org_id": user.organization.org_id, + "slug": user.organization.stack_slug, + "id": user.organization.stack_id, + "counter": 0, + }, + )["counter"] += 1 + + cache.set(metric_user_was_notified_key, metric_user_was_notified, timeout=metrics_cache_timeout) diff --git a/engine/apps/metrics_exporter/metrics_collectors.py b/engine/apps/metrics_exporter/metrics_collectors.py index a659b73973..34bad958b9 100644 --- a/engine/apps/metrics_exporter/metrics_collectors.py +++ b/engine/apps/metrics_exporter/metrics_collectors.py @@ -3,53 +3,77 @@ from django.core.cache import cache from prometheus_client import CollectorRegistry -from prometheus_client.metrics_core import GaugeMetricFamily, HistogramMetricFamily +from prometheus_client.metrics_core import CounterMetricFamily, GaugeMetricFamily, HistogramMetricFamily from apps.alerts.constants import AlertGroupState from apps.metrics_exporter.constants import ( ALERT_GROUPS_RESPONSE_TIME, ALERT_GROUPS_TOTAL, + USER_WAS_NOTIFIED_OF_ALERT_GROUPS, AlertGroupsResponseTimeMetricsDict, AlertGroupsTotalMetricsDict, RecalculateOrgMetricsDict, + UserWasNotifiedOfAlertGroupsMetricsDict, ) from apps.metrics_exporter.helpers import ( get_metric_alert_groups_response_time_key, get_metric_alert_groups_total_key, + get_metric_calculation_started_key, + get_metric_user_was_notified_of_alert_groups_key, get_metrics_cache_timer_key, get_organization_ids, ) -from apps.metrics_exporter.tasks import start_calculate_and_cache_metrics +from apps.metrics_exporter.tasks import start_calculate_and_cache_metrics, start_recalculation_for_new_metric application_metrics_registry = CollectorRegistry() RE_ALERT_GROUPS_TOTAL = re.compile(r"{}_(\d+)".format(ALERT_GROUPS_TOTAL)) RE_ALERT_GROUPS_RESPONSE_TIME = re.compile(r"{}_(\d+)".format(ALERT_GROUPS_RESPONSE_TIME)) +RE_USER_WAS_NOTIFIED_OF_ALERT_GROUPS = re.compile(r"{}_(\d+)".format(USER_WAS_NOTIFIED_OF_ALERT_GROUPS)) # https://github.com/prometheus/client_python#custom-collectors class ApplicationMetricsCollector: def __init__(self): self._buckets = (60, 300, 600, 3600, "+Inf") - self._labels = [ - "integration", - "team", + self._stack_labels = [ "org_id", "slug", "id", ] - self._labels_with_state = self._labels + ["state"] + self._integration_labels = [ + "integration", + "team", + ] + self._stack_labels + self._integration_labels_with_state = self._integration_labels + ["state"] + self._user_labels = ["username"] + self._stack_labels def collect(self): - alert_groups_total = GaugeMetricFamily(ALERT_GROUPS_TOTAL, "All alert groups", labels=self._labels_with_state) - alert_groups_response_time_seconds = HistogramMetricFamily( - ALERT_GROUPS_RESPONSE_TIME, "Users response time to alert groups in 7 days (seconds)", labels=self._labels - ) - org_ids = set(get_organization_ids()) - # alert groups total metrics + # alert groups total metric: gauge + alert_groups_total, missing_org_ids_1 = self._get_alert_groups_total_metric(org_ids) + # alert groups response time metrics: histogram + alert_groups_response_time_seconds, missing_org_ids_2 = self._get_response_time_metric(org_ids) + # user was notified of alert groups metrics: counter + user_was_notified, missing_org_ids_3 = self._get_user_was_notified_of_alert_groups_metric(org_ids) + + # update new metric gradually + missing_org_ids_3 = self._update_new_metric(USER_WAS_NOTIFIED_OF_ALERT_GROUPS, org_ids, missing_org_ids_3) + + # check for orgs missing any of the metrics or needing a refresh, start recalculation task for missing org ids + missing_org_ids = missing_org_ids_1 | missing_org_ids_2 | missing_org_ids_3 + self.recalculate_cache_for_missing_org_ids(org_ids, missing_org_ids) + + yield alert_groups_total + yield alert_groups_response_time_seconds + yield user_was_notified + + def _get_alert_groups_total_metric(self, org_ids): + alert_groups_total = GaugeMetricFamily( + ALERT_GROUPS_TOTAL, "All alert groups", labels=self._integration_labels_with_state + ) processed_org_ids = set() alert_groups_total_keys = [get_metric_alert_groups_total_key(org_id) for org_id in org_ids] org_ag_states: typing.Dict[str, typing.Dict[int, AlertGroupsTotalMetricsDict]] = cache.get_many( @@ -57,11 +81,11 @@ def collect(self): ) for org_key, ag_states in org_ag_states.items(): for integration, integration_data in ag_states.items(): - # Labels values should have the same order as _labels + # Labels values should have the same order as _integration_labels_with_state labels_values = [ integration_data["integration_name"], # integration integration_data["team_name"], # team - integration_data["org_id"], # org_id + integration_data["org_id"], # grafana org_id integration_data["slug"], # grafana instance slug integration_data["id"], # grafana instance id ] @@ -70,10 +94,15 @@ def collect(self): alert_groups_total.add_metric(labels_values + [state.value], integration_data[state.value]) org_id_from_key = RE_ALERT_GROUPS_TOTAL.match(org_key).groups()[0] processed_org_ids.add(int(org_id_from_key)) - # get missing orgs - missing_org_ids_1 = org_ids - processed_org_ids + missing_org_ids = org_ids - processed_org_ids + return alert_groups_total, missing_org_ids - # alert groups response time metrics + def _get_response_time_metric(self, org_ids): + alert_groups_response_time_seconds = HistogramMetricFamily( + ALERT_GROUPS_RESPONSE_TIME, + "Users response time to alert groups in 7 days (seconds)", + labels=self._integration_labels, + ) processed_org_ids = set() alert_groups_response_time_keys = [get_metric_alert_groups_response_time_key(org_id) for org_id in org_ids] org_ag_response_times: typing.Dict[str, typing.Dict[int, AlertGroupsResponseTimeMetricsDict]] = cache.get_many( @@ -81,11 +110,11 @@ def collect(self): ) for org_key, ag_response_time in org_ag_response_times.items(): for integration, integration_data in ag_response_time.items(): - # Labels values should have the same order as _labels + # Labels values should have the same order as _integration_labels labels_values = [ integration_data["integration_name"], # integration integration_data["team_name"], # team - integration_data["org_id"], # org_id + integration_data["org_id"], # grafana org_id integration_data["slug"], # grafana instance slug integration_data["id"], # grafana instance id ] @@ -99,11 +128,47 @@ def collect(self): alert_groups_response_time_seconds.add_metric(labels_values, buckets=buckets, sum_value=sum_value) org_id_from_key = RE_ALERT_GROUPS_RESPONSE_TIME.match(org_key).groups()[0] processed_org_ids.add(int(org_id_from_key)) - # get missing orgs - missing_org_ids_2 = org_ids - processed_org_ids + missing_org_ids = org_ids - processed_org_ids + return alert_groups_response_time_seconds, missing_org_ids - # check for orgs missing any of the metrics or needing a refresh - missing_org_ids = missing_org_ids_1 | missing_org_ids_2 + def _get_user_was_notified_of_alert_groups_metric(self, org_ids): + user_was_notified = CounterMetricFamily( + USER_WAS_NOTIFIED_OF_ALERT_GROUPS, "Number of alert groups user was notified of", labels=self._user_labels + ) + processed_org_ids = set() + user_was_notified_keys = [get_metric_user_was_notified_of_alert_groups_key(org_id) for org_id in org_ids] + org_users: typing.Dict[str, typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict]] = cache.get_many( + user_was_notified_keys + ) + for org_key, users in org_users.items(): + for user, user_data in users.items(): + # Labels values should have the same order as _user_labels + labels_values = [ + user_data["user_username"], # username + user_data["org_id"], # grafana org_id + user_data["slug"], # grafana instance slug + user_data["id"], # grafana instance id + ] + labels_values = list(map(str, labels_values)) + user_was_notified.add_metric(labels_values, user_data["counter"]) + org_id_from_key = RE_USER_WAS_NOTIFIED_OF_ALERT_GROUPS.match(org_key).groups()[0] + processed_org_ids.add(int(org_id_from_key)) + missing_org_ids = org_ids - processed_org_ids + return user_was_notified, missing_org_ids + + def _update_new_metric(self, metric_name, org_ids, missing_org_ids): + """ + This method is used for new metrics to calculate metrics gradually and avoid force recalculation for all orgs + """ + calculation_started_key = get_metric_calculation_started_key(metric_name) + is_calculation_started = cache.get(calculation_started_key) + if len(missing_org_ids) == len(org_ids) or is_calculation_started: + missing_org_ids = set() + if not is_calculation_started: + start_recalculation_for_new_metric.apply_async((metric_name,)) + return missing_org_ids + + def recalculate_cache_for_missing_org_ids(self, org_ids, missing_org_ids): cache_timer_for_org_keys = [get_metrics_cache_timer_key(org_id) for org_id in org_ids] cache_timers_for_org = cache.get_many(cache_timer_for_org_keys) recalculate_orgs: typing.List[RecalculateOrgMetricsDict] = [] @@ -114,9 +179,6 @@ def collect(self): if recalculate_orgs: start_calculate_and_cache_metrics.apply_async((recalculate_orgs,)) - yield alert_groups_total - yield alert_groups_response_time_seconds - def get_buckets_with_sum(self, values): """Put values in correct buckets and count values sum""" buckets_values = {str(key): 0 for key in self._buckets} diff --git a/engine/apps/metrics_exporter/tasks.py b/engine/apps/metrics_exporter/tasks.py index 605d111d3f..347c5a1cd5 100644 --- a/engine/apps/metrics_exporter/tasks.py +++ b/engine/apps/metrics_exporter/tasks.py @@ -3,7 +3,7 @@ from django.apps import apps from django.conf import settings from django.core.cache import cache -from django.db.models import Q +from django.db.models import Count, Q from apps.alerts.constants import AlertGroupState from apps.metrics_exporter.constants import ( @@ -11,17 +11,22 @@ METRICS_ORGANIZATIONS_IDS_CACHE_TIMEOUT, AlertGroupsResponseTimeMetricsDict, AlertGroupsTotalMetricsDict, - RecalculateMetricsTimer, RecalculateOrgMetricsDict, + UserWasNotifiedOfAlertGroupsMetricsDict, ) from apps.metrics_exporter.helpers import ( get_metric_alert_groups_response_time_key, get_metric_alert_groups_total_key, + get_metric_calculation_started_key, + get_metric_user_was_notified_of_alert_groups_key, get_metrics_cache_timer_key, get_metrics_recalculation_timeout, + get_organization_ids, get_organization_ids_from_db, get_response_time_period, + is_allowed_to_start_metrics_calculation, ) +from apps.user_management.models import User from common.custom_celery_tasks import shared_dedicated_queue_retry_task from common.database import get_random_readonly_database_key_if_present_otherwise_default @@ -40,9 +45,30 @@ def save_organizations_ids_in_cache(): def start_calculate_and_cache_metrics(metrics_to_recalculate: list[RecalculateOrgMetricsDict]): """Start calculation metrics for each object in metrics_to_recalculate""" for counter, recalculation_data in enumerate(metrics_to_recalculate): + if not is_allowed_to_start_metrics_calculation(**recalculation_data): + continue # start immediately if recalculation starting has been forced countdown = 0 if recalculation_data.get("force") else counter calculate_and_cache_metrics.apply_async(kwargs=recalculation_data, countdown=countdown) + calculate_and_cache_user_was_notified_metric.apply_async( + (recalculation_data["organization_id"],), countdown=countdown + ) + + +@shared_dedicated_queue_retry_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=0) +def start_recalculation_for_new_metric(metric_name): + TEN_MINUTES = 600 + calculation_started_key = get_metric_calculation_started_key(metric_name) + is_calculation_started = cache.get(calculation_started_key) + if is_calculation_started: + return + cache.set(calculation_started_key, True, timeout=TEN_MINUTES) + org_ids = set(get_organization_ids()) + countdown = 0 + for counter, organization_id in enumerate(org_ids): + if counter % 10 == 0: + countdown += 1 + calculate_and_cache_user_was_notified_metric.apply_async((organization_id,), countdown=countdown) @shared_dedicated_queue_retry_task( @@ -50,41 +76,31 @@ def start_calculate_and_cache_metrics(metrics_to_recalculate: list[RecalculateOr ) def calculate_and_cache_metrics(organization_id, force=False): """ - Calculate metrics for organization. - Before calculation checks if calculation has already been started to avoid too frequent launch or parallel launch + Calculate integrations metrics for organization. """ AlertGroup = apps.get_model("alerts", "AlertGroup") AlertReceiveChannel = apps.get_model("alerts", "AlertReceiveChannel") + Organization = apps.get_model("user_management", "Organization") + ONE_HOUR = 3600 TWO_HOURS = 7200 - recalculate_timeout = get_metrics_recalculation_timeout() - - # check if recalculation has been already started - metrics_cache_timer_key = get_metrics_cache_timer_key(organization_id) - metrics_cache_timer = cache.get(metrics_cache_timer_key) - if metrics_cache_timer: - if not force or metrics_cache_timer.get("forced_started", False): - return - else: - metrics_cache_timer["forced_started"] = True - else: - metrics_cache_timer: RecalculateMetricsTimer = { - "recalculate_timeout": recalculate_timeout, - "forced_started": force, - } - - metrics_cache_timer["recalculate_timeout"] = recalculate_timeout - cache.set(metrics_cache_timer_key, metrics_cache_timer, timeout=recalculate_timeout) + organization = Organization.objects.filter(pk=organization_id).first() + if not organization: + return integrations = ( AlertReceiveChannel.objects.using(get_random_readonly_database_key_if_present_otherwise_default()) .filter(~Q(integration=AlertReceiveChannel.INTEGRATION_MAINTENANCE) & Q(organization_id=organization_id)) - .select_related("organization", "team") + .select_related("team") ) response_time_period = get_response_time_period() + instance_slug = organization.stack_slug + instance_id = organization.stack_id + instance_org_id = organization.org_id + metric_alert_group_total: typing.Dict[int, AlertGroupsTotalMetricsDict] = {} metric_alert_group_response_time: typing.Dict[int, AlertGroupsResponseTimeMetricsDict] = {} @@ -96,8 +112,6 @@ def calculate_and_cache_metrics(organization_id, force=False): } for integration in integrations: - instance_slug = integration.organization.stack_slug - instance_id = integration.organization.stack_id # calculate states for state, alert_group_filter in states.items(): metric_alert_group_total.setdefault( @@ -106,7 +120,7 @@ def calculate_and_cache_metrics(organization_id, force=False): "integration_name": integration.emojized_verbal_name, "team_name": integration.team_name, "team_id": integration.team_id_or_no_team, - "org_id": integration.organization.org_id, + "org_id": instance_org_id, "slug": instance_slug, "id": instance_id, }, @@ -124,7 +138,7 @@ def calculate_and_cache_metrics(organization_id, force=False): "integration_name": integration.emojized_verbal_name, "team_name": integration.team_name, "team_id": integration.team_id_or_no_team, - "org_id": integration.organization.org_id, + "org_id": instance_org_id, "slug": instance_slug, "id": instance_id, "response_time": all_response_time_seconds, @@ -133,9 +147,68 @@ def calculate_and_cache_metrics(organization_id, force=False): metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization_id) metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization_id) + recalculate_timeout = get_metrics_recalculation_timeout() metrics_cache_timeout = recalculate_timeout + TWO_HOURS cache.set(metric_alert_groups_total_key, metric_alert_group_total, timeout=metrics_cache_timeout) cache.set(metric_alert_groups_response_time_key, metric_alert_group_response_time, timeout=metrics_cache_timeout) - if metrics_cache_timer["forced_started"]: + if force: + metrics_cache_timer_key = get_metrics_cache_timer_key(organization_id) + metrics_cache_timer = cache.get(metrics_cache_timer_key) metrics_cache_timer["forced_started"] = False cache.set(metrics_cache_timer_key, metrics_cache_timer, timeout=recalculate_timeout - ONE_HOUR) + + +@shared_dedicated_queue_retry_task( + autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None +) +def calculate_and_cache_user_was_notified_metric(organization_id): + """ + Calculate metric "user_was_notified_of_alert_groups" for organization. + """ + UserNotificationPolicyLogRecord = apps.get_model("base", "UserNotificationPolicyLogRecord") + Organization = apps.get_model("user_management", "Organization") + + TWO_HOURS = 7200 + + organization = Organization.objects.filter(pk=organization_id).first() + if not organization: + return + + users = ( + User.objects.using(get_random_readonly_database_key_if_present_otherwise_default()) + .filter(organization_id=organization_id) + .annotate(num_logs=Count("personal_log_records")) + .filter(num_logs__gte=1) + ) + + instance_slug = organization.stack_slug + instance_id = organization.stack_id + instance_org_id = organization.org_id + + metric_user_was_notified: typing.Dict[int, UserWasNotifiedOfAlertGroupsMetricsDict] = {} + + for user in users: + + counter = ( + user.personal_log_records.filter(type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED) + .values("alert_group") + .distinct() + .count() + ) + + if counter == 0: # means that user has no successful notifications + continue + + metric_user_was_notified[user.id] = { + "user_username": user.username, + "org_id": instance_org_id, + "slug": instance_slug, + "id": instance_id, + "counter": counter, + } + + metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(organization_id) + + recalculate_timeout = get_metrics_recalculation_timeout() + metrics_cache_timeout = recalculate_timeout + TWO_HOURS + cache.set(metric_user_was_notified_key, metric_user_was_notified, timeout=metrics_cache_timeout) diff --git a/engine/apps/metrics_exporter/tests/conftest.py b/engine/apps/metrics_exporter/tests/conftest.py index 19b45cdf1d..f82ee2115e 100644 --- a/engine/apps/metrics_exporter/tests/conftest.py +++ b/engine/apps/metrics_exporter/tests/conftest.py @@ -1,13 +1,22 @@ import pytest from django.core.cache import cache -from apps.metrics_exporter.constants import ALERT_GROUPS_RESPONSE_TIME, ALERT_GROUPS_TOTAL -from apps.metrics_exporter.helpers import get_metric_alert_groups_response_time_key, get_metric_alert_groups_total_key +from apps.metrics_exporter.constants import ( + ALERT_GROUPS_RESPONSE_TIME, + ALERT_GROUPS_TOTAL, + USER_WAS_NOTIFIED_OF_ALERT_GROUPS, +) +from apps.metrics_exporter.helpers import ( + get_metric_alert_groups_response_time_key, + get_metric_alert_groups_total_key, + get_metric_user_was_notified_of_alert_groups_key, +) METRICS_TEST_INTEGRATION_NAME = "Test integration" METRICS_TEST_ORG_ID = 123 # random number METRICS_TEST_INSTANCE_SLUG = "test_instance" METRICS_TEST_INSTANCE_ID = 292 # random number +METRICS_TEST_USER_USERNAME = "Alex" @pytest.fixture() @@ -17,6 +26,8 @@ def _mock_cache_get(key, *args, **kwargs): key = ALERT_GROUPS_TOTAL elif key.startswith(ALERT_GROUPS_RESPONSE_TIME): key = ALERT_GROUPS_RESPONSE_TIME + elif key.startswith(USER_WAS_NOTIFIED_OF_ALERT_GROUPS): + key = USER_WAS_NOTIFIED_OF_ALERT_GROUPS test_metrics = { ALERT_GROUPS_TOTAL: { 1: { @@ -43,6 +54,15 @@ def _mock_cache_get(key, *args, **kwargs): "response_time": [2, 10, 200, 650], } }, + USER_WAS_NOTIFIED_OF_ALERT_GROUPS: { + 1: { + "org_id": 1, + "slug": "Test stack", + "id": 1, + "user_username": "Alex", + "counter": 4, + } + }, } return test_metrics.get(key) @@ -106,3 +126,27 @@ def cache_get(key, *args, **kwargs): return cache_get return _make_cache_params + + +@pytest.fixture +def make_user_was_notified_metrics_cache_params(monkeypatch): + def _make_cache_params(user_id, organization_id): + metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(organization_id) + + def cache_get(key, *args, **kwargs): + metrics_data = { + metric_user_was_notified_key: { + user_id: { + "org_id": METRICS_TEST_ORG_ID, + "slug": METRICS_TEST_INSTANCE_SLUG, + "id": METRICS_TEST_INSTANCE_ID, + "user_username": METRICS_TEST_USER_USERNAME, + "counter": 1, + } + }, + } + return metrics_data.get(key, {}) + + return cache_get + + return _make_cache_params diff --git a/engine/apps/metrics_exporter/tests/test_calculation_metrics.py b/engine/apps/metrics_exporter/tests/test_calculation_metrics.py index b992a34072..b46465467b 100644 --- a/engine/apps/metrics_exporter/tests/test_calculation_metrics.py +++ b/engine/apps/metrics_exporter/tests/test_calculation_metrics.py @@ -2,12 +2,13 @@ import pytest +from apps.base.models import UserNotificationPolicyLogRecord from apps.metrics_exporter.helpers import ( get_metric_alert_groups_response_time_key, get_metric_alert_groups_total_key, - get_metrics_cache_timer_key, + get_metric_user_was_notified_of_alert_groups_key, ) -from apps.metrics_exporter.tasks import calculate_and_cache_metrics +from apps.metrics_exporter.tasks import calculate_and_cache_metrics, calculate_and_cache_user_was_notified_metric @patch("apps.alerts.models.alert_group.MetricsCacheManager.metrics_update_state_cache_for_alert_group") @@ -44,7 +45,6 @@ def test_calculate_and_cache_metrics_task( make_alert(alert_group=alert_group_to_sil, raw_request_data={}) alert_group_to_sil.silence() - metrics_cache_timer_key = get_metrics_cache_timer_key(organization.id) metric_alert_groups_total_key = get_metric_alert_groups_total_key(organization.id) metric_alert_groups_response_time_key = get_metric_alert_groups_response_time_key(organization.id) @@ -98,18 +98,84 @@ def test_calculate_and_cache_metrics_task( with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set: calculate_and_cache_metrics(organization.id) args = mock_cache_set.call_args_list - assert args[0].args[0] == metrics_cache_timer_key # check alert_groups_total metric cache - metric_alert_groups_total_values = args[1].args + metric_alert_groups_total_values = args[0].args assert metric_alert_groups_total_values[0] == metric_alert_groups_total_key assert metric_alert_groups_total_values[1] == expected_result_metric_alert_groups_total # check alert_groups_response_time metric cache - metric_alert_groups_response_time_values = args[2].args + metric_alert_groups_response_time_values = args[1].args assert metric_alert_groups_response_time_values[0] == metric_alert_groups_response_time_key for integration_id, values in metric_alert_groups_response_time_values[1].items(): assert len(values["response_time"]) == METRICS_RESPONSE_TIME_LEN # set response time to expected result because it is calculated on fly expected_result_metric_alert_groups_response_time[integration_id]["response_time"] = values["response_time"] assert metric_alert_groups_response_time_values[1] == expected_result_metric_alert_groups_response_time + + +@patch("apps.alerts.models.alert_group.MetricsCacheManager.metrics_update_state_cache_for_alert_group") +@pytest.mark.django_db +def test_calculate_and_cache_user_was_notified_metric_task( + mocked_update_state_cache, + make_organization, + make_user_for_organization, + make_team, + make_alert_receive_channel, + make_alert_group, + make_alert, + make_user_notification_policy_log_record, +): + organization = make_organization() + team = make_team(organization) + + user_1 = make_user_for_organization(organization) + user_2 = make_user_for_organization(organization) + + alert_receive_channel_1 = make_alert_receive_channel(organization) + alert_receive_channel_2 = make_alert_receive_channel(organization, team=team) + + alert_group_1 = make_alert_group(alert_receive_channel_1) + alert_group_2 = make_alert_group(alert_receive_channel_2) + + for _ in range(2): + make_user_notification_policy_log_record( + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + author=user_1, + alert_group=alert_group_1, + ) + + for user in [user_1, user_2]: + make_user_notification_policy_log_record( + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + author=user, + alert_group=alert_group_2, + ) + + metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(organization.id) + + expected_result_metric_user_was_notified = { + user_1.id: { + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "user_username": user_1.username, + "counter": 2, + }, + user_2.id: { + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "user_username": user_2.username, + "counter": 1, + }, + } + + with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set: + calculate_and_cache_user_was_notified_metric(organization.id) + args = mock_cache_set.call_args_list + + # check user_was_notified_of_alert_groups metric cache + metric_user_was_notified_of_alert_groups_values = args[0].args + assert metric_user_was_notified_of_alert_groups_values[0] == metric_user_was_notified_key + assert metric_user_was_notified_of_alert_groups_values[1] == expected_result_metric_user_was_notified diff --git a/engine/apps/metrics_exporter/tests/test_metrics_collectors.py b/engine/apps/metrics_exporter/tests/test_metrics_collectors.py index 6f17d01576..4dbe7d0565 100644 --- a/engine/apps/metrics_exporter/tests/test_metrics_collectors.py +++ b/engine/apps/metrics_exporter/tests/test_metrics_collectors.py @@ -4,7 +4,11 @@ from prometheus_client import CollectorRegistry, generate_latest from apps.alerts.constants import AlertGroupState -from apps.metrics_exporter.constants import ALERT_GROUPS_RESPONSE_TIME, ALERT_GROUPS_TOTAL +from apps.metrics_exporter.constants import ( + ALERT_GROUPS_RESPONSE_TIME, + ALERT_GROUPS_TOTAL, + USER_WAS_NOTIFIED_OF_ALERT_GROUPS, +) from apps.metrics_exporter.metrics_collectors import ApplicationMetricsCollector @@ -25,6 +29,9 @@ def test_application_metrics_collector( elif metric.name == ALERT_GROUPS_RESPONSE_TIME: # integration with labels for each value in collector's bucket + _count and _sum histogram values assert len(metric.samples) == len(collector._buckets) + 2 + elif metric.name == USER_WAS_NOTIFIED_OF_ALERT_GROUPS: + # metric with labels for each notified user + assert len(metric.samples) == 1 result = generate_latest(test_metrics_registry).decode("utf-8") assert result is not None assert mocked_org_ids.called diff --git a/engine/apps/metrics_exporter/tests/test_update_metics_cache.py b/engine/apps/metrics_exporter/tests/test_update_metics_cache.py index d9ee2e0a3b..313f08b8df 100644 --- a/engine/apps/metrics_exporter/tests/test_update_metics_cache.py +++ b/engine/apps/metrics_exporter/tests/test_update_metics_cache.py @@ -4,9 +4,12 @@ from django.core.cache import cache from django.test import override_settings +from apps.alerts.tasks import notify_user_task +from apps.base.models import UserNotificationPolicy, UserNotificationPolicyLogRecord from apps.metrics_exporter.helpers import ( get_metric_alert_groups_response_time_key, get_metric_alert_groups_total_key, + get_metric_user_was_notified_of_alert_groups_key, metrics_bulk_update_team_label_cache, ) from apps.metrics_exporter.metrics_cache_manager import MetricsCacheManager @@ -15,6 +18,7 @@ METRICS_TEST_INSTANCE_SLUG, METRICS_TEST_INTEGRATION_NAME, METRICS_TEST_ORG_ID, + METRICS_TEST_USER_USERNAME, ) @@ -450,3 +454,84 @@ def get_called_arg_index_and_compare_results(): ]: expected_result[alert_receive_channel.id].update(expected_result_delete_team) get_called_arg_index_and_compare_results() + + +@patch("apps.alerts.tasks.notify_user.perform_notification") +@override_settings(CELERY_TASK_ALWAYS_EAGER=True) +@pytest.mark.django_db +def test_update_metrics_cache_on_user_notification( + mocked_perform_notification_task, + make_organization, + make_user_for_organization, + make_alert_receive_channel, + make_alert_group, + make_user_notification_policy, + make_user_notification_policy_log_record, + make_user_was_notified_metrics_cache_params, + monkeypatch, + mock_get_metrics_cache, +): + organization = make_organization( + org_id=METRICS_TEST_ORG_ID, + stack_slug=METRICS_TEST_INSTANCE_SLUG, + stack_id=METRICS_TEST_INSTANCE_ID, + ) + alert_receive_channel = make_alert_receive_channel( + organization, + verbal_name=METRICS_TEST_INTEGRATION_NAME, + ) + user = make_user_for_organization(organization, username=METRICS_TEST_USER_USERNAME) + + notification_policy_1 = make_user_notification_policy(user, step=UserNotificationPolicy.Step.NOTIFY) + make_user_notification_policy(user, step=UserNotificationPolicy.Step.NOTIFY) + + alert_group_1 = make_alert_group(alert_receive_channel) + alert_group_2 = make_alert_group(alert_receive_channel) + + make_user_notification_policy_log_record( + type=UserNotificationPolicyLogRecord.TYPE_PERSONAL_NOTIFICATION_TRIGGERED, + author=user, + alert_group=alert_group_1, + ) + + metrics_cache = make_user_was_notified_metrics_cache_params(user.id, organization.id) + monkeypatch.setattr(cache, "get", metrics_cache) + + metric_user_was_notified_key = get_metric_user_was_notified_of_alert_groups_key(organization.id) + + expected_result_metric_user_was_notified = { + user.id: { + "org_id": organization.org_id, + "slug": organization.stack_slug, + "id": organization.stack_id, + "user_username": METRICS_TEST_USER_USERNAME, + "counter": 1, + } + } + + def get_called_arg_index_and_compare_results(cache_was_updated=False): + """find index for the metric argument, that was set in cache""" + for idx, called_arg in enumerate(mock_cache_set_called_args): + if idx >= arg_idx and called_arg.args[0] == metric_user_was_notified_key: + assert called_arg.args[1] == expected_result_metric_user_was_notified + return idx + 1 + if cache_was_updated: + raise AssertionError + return arg_idx + + with patch("apps.metrics_exporter.tasks.cache.set") as mock_cache_set: + arg_idx = 0 + notify_user_task(user.id, alert_group_1.id) + + # check user_was_notified_of_alert_groups metric cache, get called args + mock_cache_set_called_args = mock_cache_set.call_args_list + arg_idx = get_called_arg_index_and_compare_results() + + # counter grows after the first notification of alert group + notify_user_task(user.id, alert_group_2.id) + expected_result_metric_user_was_notified[user.id]["counter"] += 1 + arg_idx = get_called_arg_index_and_compare_results(cache_was_updated=True) + + # counter doesn't grow after the second notification of alert group + notify_user_task(user.id, alert_group_2.id, previous_notification_policy_pk=notification_policy_1.id) + arg_idx = get_called_arg_index_and_compare_results() diff --git a/engine/settings/prod_without_db.py b/engine/settings/prod_without_db.py index 7c33383d91..355e81d2b8 100644 --- a/engine/settings/prod_without_db.py +++ b/engine/settings/prod_without_db.py @@ -58,6 +58,7 @@ def on_uwsgi_worker_exit(): "apps.heartbeat.tasks.process_heartbeat_task": {"queue": "default"}, "apps.heartbeat.tasks.restore_heartbeat_tasks": {"queue": "default"}, "apps.metrics_exporter.tasks.start_calculate_and_cache_metrics": {"queue": "default"}, + "apps.metrics_exporter.tasks.start_recalculation_for_new_metric": {"queue": "default"}, "apps.metrics_exporter.tasks.save_organizations_ids_in_cache": {"queue": "default"}, "apps.schedules.tasks.refresh_ical_files.refresh_ical_file": {"queue": "default"}, "apps.schedules.tasks.refresh_ical_files.start_refresh_ical_files": {"queue": "default"}, @@ -119,6 +120,7 @@ def on_uwsgi_worker_exit(): "apps.grafana_plugin.tasks.sync.start_sync_organizations": {"queue": "long"}, "apps.grafana_plugin.tasks.sync.sync_organization_async": {"queue": "long"}, "apps.metrics_exporter.tasks.calculate_and_cache_metrics": {"queue": "long"}, + "apps.metrics_exporter.tasks.calculate_and_cache_user_was_notified_metric": {"queue": "long"}, # SLACK "apps.integrations.tasks.notify_about_integration_ratelimit_in_slack": {"queue": "slack"}, "apps.slack.helpers.alert_group_representative.on_alert_group_action_triggered_async": {"queue": "slack"},