From fd19dd422a5d1a6a4924cb2a17ddd19cbaa09b91 Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Thu, 10 Aug 2023 10:25:00 +0800 Subject: [PATCH] Use periodic task for heartbeats (#2723) # What this PR does ## Which issue(s) this PR fixes ## Checklist - [ ] Unit, integration, and e2e (if applicable) tests updated - [ ] Documentation added (or `pr:no public docs` PR label added if not required) - [ ] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not required) --------- Co-authored-by: Joey Orlando Co-authored-by: Michael Derynck --- CHANGELOG.md | 1 + engine/apps/heartbeat/models.py | 86 +++--------- engine/apps/heartbeat/tasks.py | 128 ++++++++++++------ .../tests/test_integration_heartbeat.py | 126 ++++++++--------- .../metadata/heartbeat/elastalert.py | 16 +-- .../metadata/heartbeat/formatted_webhook.py | 12 +- .../metadata/heartbeat/grafana.py | 12 +- .../integrations/metadata/heartbeat/prtg.py | 12 +- .../metadata/heartbeat/webhook.py | 14 +- .../integrations/metadata/heartbeat/zabbix.py | 12 +- .../apps/integrations/tests/test_ratelimit.py | 2 - engine/apps/integrations/views.py | 11 +- engine/config_integrations/elastalert.py | 9 +- engine/config_integrations/webhook.py | 19 ++- engine/settings/base.py | 5 + 15 files changed, 237 insertions(+), 228 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b380082843..a5fde585f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add stack slug to organization options for direct paging Slash command by @vadimkerr ([#2743](https://github.com/grafana/oncall/pull/2743)) - Avoid creating (or notifying about) potential event splits resulting from untaken swap requests ([#2748](https://github.com/grafana/oncall/pull/2748)) +- Refactor heartbeats into a periodic task ([2723](https://github.com/grafana/oncall/pull/2723)) ### Fixed diff --git a/engine/apps/heartbeat/models.py b/engine/apps/heartbeat/models.py index eb9b9cd627..9d8018df38 100644 --- a/engine/apps/heartbeat/models.py +++ b/engine/apps/heartbeat/models.py @@ -4,10 +4,9 @@ from django.conf import settings from django.core.validators import MinLengthValidator -from django.db import models, transaction +from django.db import models from django.utils import timezone -from apps.integrations.tasks import create_alert from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length logger = logging.getLogger(__name__) @@ -43,10 +42,26 @@ class IntegrationHeartBeat(models.Model): created_at = models.DateTimeField(auto_now_add=True) timeout_seconds = models.IntegerField(default=0) + last_heartbeat_time = models.DateTimeField(default=None, null=True) + """ + Stores the latest received heartbeat signal time + """ + last_checkup_task_time = models.DateTimeField(default=None, null=True) + """ + Deprecated. This field is not used. TODO: remove it + """ + actual_check_up_task_id = models.CharField(max_length=100) + """ + Deprecated. Stored the latest scheduled `integration_heartbeat_checkup` task id. TODO: remove it + """ + previous_alerted_state_was_life = models.BooleanField(default=True) + """ + Last status of the heartbeat. Determines if integration was alive on latest checkup + """ public_primary_key = models.CharField( max_length=20, @@ -83,73 +98,6 @@ def status(self) -> bool: def link(self) -> str: return urljoin(self.alert_receive_channel.integration_url, "heartbeat/") - @classmethod - def perform_heartbeat_check(cls, heartbeat_id: int, task_request_id: str) -> None: - with transaction.atomic(): - heartbeats = cls.objects.filter(pk=heartbeat_id).select_for_update() - if len(heartbeats) == 0: - logger.info(f"Heartbeat {heartbeat_id} not found {task_request_id}") - return - heartbeat = heartbeats[0] - if task_request_id == heartbeat.actual_check_up_task_id: - heartbeat.check_heartbeat_state_and_save() - else: - logger.info(f"Heartbeat {heartbeat_id} is not actual {task_request_id}") - - def check_heartbeat_state_and_save(self) -> bool: - """ - Use this method if you want just check heartbeat status. - """ - state_changed = self.check_heartbeat_state() - if state_changed: - self.save(update_fields=["previous_alerted_state_was_life"]) - return state_changed - - def check_heartbeat_state(self) -> bool: - """ - Actually checking heartbeat. - Use this method if you want to do changes of heartbeat instance while checking its status. - ( See IntegrationHeartBeatAPIView.post() for example ) - """ - state_changed = False - if self.is_expired: - if self.previous_alerted_state_was_life: - self.on_heartbeat_expired() - self.previous_alerted_state_was_life = False - state_changed = True - else: - if not self.previous_alerted_state_was_life: - self.on_heartbeat_restored() - self.previous_alerted_state_was_life = True - state_changed = True - return state_changed - - def on_heartbeat_restored(self) -> None: - create_alert.apply_async( - kwargs={ - "title": self.alert_receive_channel.heartbeat_restored_title, - "message": self.alert_receive_channel.heartbeat_restored_message, - "image_url": None, - "link_to_upstream_details": None, - "alert_receive_channel_pk": self.alert_receive_channel.pk, - "integration_unique_data": {}, - "raw_request_data": self.alert_receive_channel.heartbeat_restored_payload, - }, - ) - - def on_heartbeat_expired(self) -> None: - create_alert.apply_async( - kwargs={ - "title": self.alert_receive_channel.heartbeat_expired_title, - "message": self.alert_receive_channel.heartbeat_expired_message, - "image_url": None, - "link_to_upstream_details": None, - "alert_receive_channel_pk": self.alert_receive_channel.pk, - "integration_unique_data": {}, - "raw_request_data": self.alert_receive_channel.heartbeat_expired_payload, - }, - ) - # Insight logs @property def insight_logs_type_verbal(self) -> str: diff --git a/engine/apps/heartbeat/tasks.py b/engine/apps/heartbeat/tasks.py index 071af2fbcc..d02fac9b0f 100644 --- a/engine/apps/heartbeat/tasks.py +++ b/engine/apps/heartbeat/tasks.py @@ -1,57 +1,105 @@ -from time import perf_counter +import datetime from celery.utils.log import get_task_logger +from django.conf import settings from django.db import transaction +from django.db.models import DateTimeField, DurationField, ExpressionWrapper, F +from django.db.models.functions import Cast from django.utils import timezone +from apps.heartbeat.models import IntegrationHeartBeat +from apps.integrations.tasks import create_alert from common.custom_celery_tasks import shared_dedicated_queue_retry_task +from settings.base import DatabaseTypes logger = get_task_logger(__name__) @shared_dedicated_queue_retry_task() -def integration_heartbeat_checkup(heartbeat_id: int) -> None: - from apps.heartbeat.models import IntegrationHeartBeat - - IntegrationHeartBeat.perform_heartbeat_check(heartbeat_id, integration_heartbeat_checkup.request.id) - +def check_heartbeats() -> str: + """ + Periodic task to check heartbeats status change and create alerts (or auto-resolve alerts) if needed + """ + # Heartbeat is considered enabled if it + # * has timeout_seconds set to non-zero (non-default) value, + # * received at least one checkup (last_heartbeat_time set to non-null value)\ -@shared_dedicated_queue_retry_task() -def process_heartbeat_task(alert_receive_channel_pk): - start = perf_counter() - from apps.heartbeat.models import IntegrationHeartBeat + def _get_timeout_expression() -> ExpressionWrapper: + if settings.DATABASES["default"]["ENGINE"] == f"django.db.backends.{DatabaseTypes.POSTGRESQL}": + # DurationField: When used on PostgreSQL, the data type used is an interval + # https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield + return ExpressionWrapper(datetime.timedelta(seconds=1) * F("timeout_seconds"), output_field=DurationField()) + else: + # DurationField: ...Otherwise a bigint of microseconds is used... + # microseconds = seconds * 10**6 + # https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield + return ExpressionWrapper(F("timeout_seconds") * 10**6, output_field=DurationField()) + enabled_heartbeats = ( + IntegrationHeartBeat.objects.filter(last_heartbeat_time__isnull=False) + .exclude(timeout_seconds=0) + .annotate(period_start=(Cast(timezone.now() - _get_timeout_expression(), DateTimeField()))) + ) with transaction.atomic(): - heartbeats = IntegrationHeartBeat.objects.filter( - alert_receive_channel__pk=alert_receive_channel_pk, - ).select_for_update() - if len(heartbeats) == 0: - logger.info(f"Integration Heartbeat for alert_receive_channel {alert_receive_channel_pk} was not found.") - return - else: - heartbeat = heartbeats[0] - heartbeat_selected = perf_counter() - logger.info( - f"IntegrationHeartBeat selected for alert_receive_channel {alert_receive_channel_pk} in {heartbeat_selected - start}" + # Heartbeat is considered expired if it + # * is enabled, + # * is not already expired, + # * last check in was before the timeout period start + expired_heartbeats = enabled_heartbeats.select_for_update().filter( + last_heartbeat_time__lte=F("period_start"), previous_alerted_state_was_life=True ) - task = integration_heartbeat_checkup.apply_async( - (heartbeat.pk,), - countdown=heartbeat.timeout_seconds + 1, - ) - is_touched = heartbeat.last_heartbeat_time is not None - heartbeat.actual_check_up_task_id = task.id - heartbeat.last_heartbeat_time = timezone.now() - update_fields = ["actual_check_up_task_id", "last_heartbeat_time"] - task_started = perf_counter() - logger.info( - f"heartbeat_checkup task started for alert_receive_channel {alert_receive_channel_pk} in {task_started - start}" + # Schedule alert creation for each expired heartbeat after transaction commit + for heartbeat in expired_heartbeats: + transaction.on_commit( + lambda: create_alert.apply_async( + kwargs={ + "title": heartbeat.alert_receive_channel.heartbeat_expired_title, + "message": heartbeat.alert_receive_channel.heartbeat_expired_message, + "image_url": None, + "link_to_upstream_details": None, + "alert_receive_channel_pk": heartbeat.alert_receive_channel.pk, + "integration_unique_data": {}, + "raw_request_data": heartbeat.alert_receive_channel.heartbeat_expired_payload, + }, + ) + ) + # Update previous_alerted_state_was_life to False + expired_count = expired_heartbeats.update(previous_alerted_state_was_life=False) + with transaction.atomic(): + # Heartbeat is considered restored if it + # * is enabled, + # * last check in was after the timeout period start, + # * was is alerted state (previous_alerted_state_was_life is False), i.e. was expired + restored_heartbeats = enabled_heartbeats.select_for_update().filter( + last_heartbeat_time__gte=F("period_start"), previous_alerted_state_was_life=False ) - if is_touched: - state_changed = heartbeat.check_heartbeat_state() - state_checked = perf_counter() - logger.info( - f"state checked for alert_receive_channel {alert_receive_channel_pk} in {state_checked - start}" + # Schedule auto-resolve alert creation for each expired heartbeat after transaction commit + for heartbeat in restored_heartbeats: + transaction.on_commit( + lambda: create_alert.apply_async( + kwargs={ + "title": heartbeat.alert_receive_channel.heartbeat_restored_title, + "message": heartbeat.alert_receive_channel.heartbeat_restored_message, + "image_url": None, + "link_to_upstream_details": None, + "alert_receive_channel_pk": heartbeat.alert_receive_channel.pk, + "integration_unique_data": {}, + "raw_request_data": heartbeat.alert_receive_channel.heartbeat_restored_payload, + }, + ) ) - if state_changed: - update_fields.append("previous_alerted_state_was_life") - heartbeat.save(update_fields=update_fields) + restored_count = restored_heartbeats.update(previous_alerted_state_was_life=True) + return f"Found {expired_count} expired and {restored_count} restored heartbeats" + + +@shared_dedicated_queue_retry_task() +def integration_heartbeat_checkup(heartbeat_id: int) -> None: + """Deprecated. TODO: Remove this task after this task cleared from queue""" + pass + + +@shared_dedicated_queue_retry_task() +def process_heartbeat_task(alert_receive_channel_pk): + IntegrationHeartBeat.objects.filter( + alert_receive_channel__pk=alert_receive_channel_pk, + ).update(last_heartbeat_time=timezone.now()) diff --git a/engine/apps/heartbeat/tests/test_integration_heartbeat.py b/engine/apps/heartbeat/tests/test_integration_heartbeat.py index c3797dcfd3..e2cdb8c3f7 100644 --- a/engine/apps/heartbeat/tests/test_integration_heartbeat.py +++ b/engine/apps/heartbeat/tests/test_integration_heartbeat.py @@ -4,83 +4,77 @@ from django.utils import timezone from apps.alerts.models import AlertReceiveChannel +from apps.heartbeat.tasks import check_heartbeats +from apps.integrations.tasks import create_alert @pytest.mark.django_db -@patch("apps.heartbeat.models.IntegrationHeartBeat.on_heartbeat_expired", return_value=None) @pytest.mark.parametrize("integration", [AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK]) -def test_integration_heartbeat_expired( - mocked_handler, make_organization_and_user, make_alert_receive_channel, make_integration_heartbeat, integration +def test_check_heartbeats( + make_organization_and_user, + make_alert_receive_channel, + make_integration_heartbeat, + integration, + django_capture_on_commit_callbacks, ): - team, _ = make_organization_and_user() - # Some short timeout and last_heartbeat_time to make sure that heartbeat is expired - timeout = 1 - last_heartbeat_time = timezone.now() - timezone.timedelta(seconds=timeout * 10) - alert_receive_channel = make_alert_receive_channel(team, integration=integration) - integration_heartbeat = make_integration_heartbeat( - alert_receive_channel, timeout, last_heartbeat_time=last_heartbeat_time - ) - integration_heartbeat.check_heartbeat_state_and_save() - assert mocked_handler.called + # No heartbeats, nothing happens + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 0 expired and 0 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 0 - -@pytest.mark.django_db -@patch("apps.heartbeat.models.IntegrationHeartBeat.on_heartbeat_expired", return_value=None) -@pytest.mark.parametrize("integration", [AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK]) -def test_integration_heartbeat_already_expired( - mocked_handler, make_organization_and_user, make_alert_receive_channel, make_integration_heartbeat, integration -): + # Prepare heartbeat team, _ = make_organization_and_user() - # Some short timeout and last_heartbeat_time to make sure that heartbeat is expired - timeout = 1 - last_heartbeat_time = timezone.now() - timezone.timedelta(seconds=timeout * 10) + timeout = 60 + last_heartbeat_time = timezone.now() alert_receive_channel = make_alert_receive_channel(team, integration=integration) integration_heartbeat = make_integration_heartbeat( - alert_receive_channel, - timeout, - last_heartbeat_time=last_heartbeat_time, - previous_alerted_state_was_life=False, + alert_receive_channel, timeout, last_heartbeat_time=last_heartbeat_time, previous_alerted_state_was_life=True ) - integration_heartbeat.check_heartbeat_state_and_save() - assert mocked_handler.called is False + # Heartbeat is alive, nothing happens + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 0 expired and 0 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 0 -@pytest.mark.django_db -@patch("apps.heartbeat.models.IntegrationHeartBeat.on_heartbeat_restored", return_value=None) -@pytest.mark.parametrize("integration", [AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK]) -def test_integration_heartbeat_restored( - mocked_handler, make_organization_and_user, make_alert_receive_channel, make_integration_heartbeat, integration -): - team, _ = make_organization_and_user() - # Some long timeout and last_heartbeat_time to make sure that heartbeat is not expired - timeout = 1000 - last_heartbeat_time = timezone.now() - alert_receive_channel = make_alert_receive_channel(team, integration=integration) - integration_heartbeat = make_integration_heartbeat( - alert_receive_channel, - timeout, - last_heartbeat_time=last_heartbeat_time, - previous_alerted_state_was_life=False, - ) - integration_heartbeat.check_heartbeat_state_and_save() - assert mocked_handler.called + # Hearbeat expires, send an alert + integration_heartbeat.refresh_from_db() + integration_heartbeat.last_heartbeat_time = timezone.now() - timezone.timedelta(seconds=timeout * 10) + integration_heartbeat.save() + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 1 expired and 0 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 1 + # Heartbeat is still expired, nothing happens + integration_heartbeat.refresh_from_db() + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 0 expired and 0 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 0 -@pytest.mark.django_db -@patch("apps.heartbeat.models.IntegrationHeartBeat.on_heartbeat_restored", return_value=None) -@pytest.mark.parametrize("integration", [AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK]) -def test_integration_heartbeat_restored_and_alert_was_not_sent( - mocked_handler, make_organization_and_user, make_alert_receive_channel, make_integration_heartbeat, integration -): - team, _ = make_organization_and_user() - # Some long timeout and last_heartbeat_time to make sure that heartbeat is not expired - timeout = 1000 - last_heartbeat_time = timezone.now() - alert_receive_channel = make_alert_receive_channel(team, integration=integration) - integration_heartbeat = make_integration_heartbeat( - alert_receive_channel, - timeout, - last_heartbeat_time=last_heartbeat_time, - ) - integration_heartbeat.check_heartbeat_state_and_save() - assert mocked_handler.called is False + # Hearbeat restored, send an auto-resolve alert + integration_heartbeat.refresh_from_db() + integration_heartbeat.last_heartbeat_time = timezone.now() + integration_heartbeat.save() + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 0 expired and 1 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 1 + + # Heartbeat is alive, nothing happens + integration_heartbeat.refresh_from_db() + integration_heartbeat.last_heartbeat_time = timezone.now() + integration_heartbeat.save() + integration_heartbeat.refresh_from_db() + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 0 expired and 0 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 0 diff --git a/engine/apps/integrations/metadata/heartbeat/elastalert.py b/engine/apps/integrations/metadata/heartbeat/elastalert.py index 04a05d673b..8394577e53 100644 --- a/engine/apps/integrations/metadata/heartbeat/elastalert.py +++ b/engine/apps/integrations/metadata/heartbeat/elastalert.py @@ -12,12 +12,12 @@ heartbeat_expired_payload = { "alert_uid": "0eaf37c8-e1eb-4714-b79e-7c648b6a96fa", "title": heartbeat_expired_title, - "image_url": None, "state": "alerting", - "link_to_upstream_details": None, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = heartbeat_text.heartbeat_restored_title @@ -26,10 +26,10 @@ heartbeat_restored_payload = { "alert_uid": "0eaf37c8-e1eb-4714-b79e-7c648b6a96fa", "title": heartbeat_restored_title, - "image_url": None, "state": "ok", - "link_to_upstream_details": None, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/apps/integrations/metadata/heartbeat/formatted_webhook.py b/engine/apps/integrations/metadata/heartbeat/formatted_webhook.py index 3e44b57e81..72278b15e8 100644 --- a/engine/apps/integrations/metadata/heartbeat/formatted_webhook.py +++ b/engine/apps/integrations/metadata/heartbeat/formatted_webhook.py @@ -17,8 +17,10 @@ "state": "alerting", "link_to_upstream_details": None, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = heartbeat_text.heartbeat_restored_title @@ -31,6 +33,8 @@ "state": "ok", "link_to_upstream_details": None, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/apps/integrations/metadata/heartbeat/grafana.py b/engine/apps/integrations/metadata/heartbeat/grafana.py index a71011edd9..67954ab9c5 100644 --- a/engine/apps/integrations/metadata/heartbeat/grafana.py +++ b/engine/apps/integrations/metadata/heartbeat/grafana.py @@ -14,8 +14,10 @@ "state": "alerting", "title": heartbeat_expired_title, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = f"[OK] {heartbeat_text.heartbeat_restored_title}" @@ -25,6 +27,8 @@ "state": "ok", "title": heartbeat_restored_title, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/apps/integrations/metadata/heartbeat/prtg.py b/engine/apps/integrations/metadata/heartbeat/prtg.py index ddf163335c..42c6965c29 100644 --- a/engine/apps/integrations/metadata/heartbeat/prtg.py +++ b/engine/apps/integrations/metadata/heartbeat/prtg.py @@ -17,8 +17,10 @@ "state": "alerting", "link_to_upstream_details": None, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = heartbeat_text.heartbeat_restored_title @@ -31,6 +33,8 @@ "state": "ok", "link_to_upstream_details": None, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/apps/integrations/metadata/heartbeat/webhook.py b/engine/apps/integrations/metadata/heartbeat/webhook.py index e6283e3600..b3bf350485 100644 --- a/engine/apps/integrations/metadata/heartbeat/webhook.py +++ b/engine/apps/integrations/metadata/heartbeat/webhook.py @@ -13,12 +13,12 @@ heartbeat_expired_payload = { "alert_uid": "7973c835-ff3f-46e4-9444-06df127b6f8e", "title": heartbeat_expired_title, - "image_url": None, "state": "alerting", - "link_to_upstream_details": None, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = heartbeat_text.heartbeat_restored_title @@ -31,6 +31,8 @@ "state": "ok", "link_to_upstream_details": None, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/apps/integrations/metadata/heartbeat/zabbix.py b/engine/apps/integrations/metadata/heartbeat/zabbix.py index b336b75b4b..d961bd836b 100644 --- a/engine/apps/integrations/metadata/heartbeat/zabbix.py +++ b/engine/apps/integrations/metadata/heartbeat/zabbix.py @@ -16,8 +16,10 @@ "state": "alerting", "link_to_upstream_details": None, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = heartbeat_text.heartbeat_restored_title @@ -30,6 +32,8 @@ "state": "ok", "link_to_upstream_details": None, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/apps/integrations/tests/test_ratelimit.py b/engine/apps/integrations/tests/test_ratelimit.py index 97b5693772..5598c4f786 100644 --- a/engine/apps/integrations/tests/test_ratelimit.py +++ b/engine/apps/integrations/tests/test_ratelimit.py @@ -96,5 +96,3 @@ def test_ratelimit_integration_heartbeats( response = c.get(url) assert response.status_code == 429 - - assert mocked_task.call_count == 1 diff --git a/engine/apps/integrations/views.py b/engine/apps/integrations/views.py index 638df50743..c694d87e88 100644 --- a/engine/apps/integrations/views.py +++ b/engine/apps/integrations/views.py @@ -3,6 +3,7 @@ from django.conf import settings from django.core.exceptions import PermissionDenied +from django.db import OperationalError from django.http import HttpResponseBadRequest, JsonResponse from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt @@ -324,6 +325,10 @@ def post(self, request): return Response(status=200) def _process_heartbeat_signal(self, request, alert_receive_channel): - process_heartbeat_task.apply_async( - (alert_receive_channel.pk,), - ) + try: + process_heartbeat_task(alert_receive_channel.pk) + # If database is not ready, fallback to celery task + except OperationalError: + process_heartbeat_task.apply_async( + (alert_receive_channel.pk,), + ) diff --git a/engine/config_integrations/elastalert.py b/engine/config_integrations/elastalert.py index 2cd00b1aa7..a2e13b30f8 100644 --- a/engine/config_integrations/elastalert.py +++ b/engine/config_integrations/elastalert.py @@ -46,14 +46,7 @@ grouping_id = '{{ payload.get("alert_uid", "")}}' -resolve_condition = """\ -{%- if "is_amixr_heartbeat_restored" in payload -%} -{# We don't know the payload format from your integration. #} -{# The heartbeat alerts will go here so we check for our own key #} -{{ payload["is_amixr_heartbeat_restored"] }} -{%- else -%} -{{ payload.get("state", "").upper() == "OK" }} -{%- endif %}""" +resolve_condition = """{{ payload.get("state", "").upper() == "OK" }}""" acknowledge_condition = None diff --git a/engine/config_integrations/webhook.py b/engine/config_integrations/webhook.py index 945cbb5fec..a492ef72c9 100644 --- a/engine/config_integrations/webhook.py +++ b/engine/config_integrations/webhook.py @@ -45,16 +45,15 @@ source_link = "{{ payload.url }}" -grouping_id = "{{ payload }}" - -resolve_condition = """\ -{%- if "is_amixr_heartbeat_restored" in payload -%} -{# We don't know the payload format from your integration. #} -{# The heartbeat alerts will go here so we check for our own key #} -{{ payload["is_amixr_heartbeat_restored"] }} -{%- else -%} -{{ payload.get("state", "").upper() == "OK" }} -{%- endif %}""" +grouping_id = """\ +{% if "is_oncall_heartbeat" in payload %} +{# Case for heartbeat alerts generated by Grafana OnCall #} +{{- payload.alert_uid }} +{% else %} +{{- payload }} +{% endif %}""" + +resolve_condition = """{{ payload.get("state", "").upper() == "OK" }}""" acknowledge_condition = None example_payload = {"message": "This alert was sent by user for demonstration purposes"} diff --git a/engine/settings/base.py b/engine/settings/base.py index 0d2bc46a1a..e598bd175e 100644 --- a/engine/settings/base.py +++ b/engine/settings/base.py @@ -488,6 +488,11 @@ class BrokerTypes: "schedule": 60 * 30, "args": (), }, + "check_heartbeats": { + "task": "apps.heartbeat.tasks.check_heartbeats", + "schedule": crontab(minute="*/2"), # every 2 minutes + "args": (), + }, } if ESCALATION_AUDITOR_ENABLED: