From 6027afd1a7f5ad15d99682de8f654562e200316a Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Wed, 2 Aug 2023 15:28:58 +0800 Subject: [PATCH 01/16] Use periodic task for heartbeats --- engine/apps/heartbeat/models.py | 75 ++----------------- engine/apps/heartbeat/tasks.py | 119 +++++++++++++++++++----------- engine/apps/integrations/views.py | 11 ++- 3 files changed, 91 insertions(+), 114 deletions(-) diff --git a/engine/apps/heartbeat/models.py b/engine/apps/heartbeat/models.py index eb9b9cd627..4e52456d32 100644 --- a/engine/apps/heartbeat/models.py +++ b/engine/apps/heartbeat/models.py @@ -4,10 +4,9 @@ from django.conf import settings from django.core.validators import MinLengthValidator -from django.db import models, transaction +from django.db import models from django.utils import timezone -from apps.integrations.tasks import create_alert from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length logger = logging.getLogger(__name__) @@ -43,9 +42,14 @@ class IntegrationHeartBeat(models.Model): created_at = models.DateTimeField(auto_now_add=True) timeout_seconds = models.IntegerField(default=0) + + # Stores the latest received heartbeat signal time last_heartbeat_time = models.DateTimeField(default=None, null=True) + # Deprecated. This field is not used. TODO: remove it last_checkup_task_time = models.DateTimeField(default=None, null=True) + # Deprecated. Stored the latest scheduled `integration_heartbeat_checkup` task id. TODO: remove it actual_check_up_task_id = models.CharField(max_length=100) + # Last status of the heartbeat. Determines if integration was alive on latest checkup previous_alerted_state_was_life = models.BooleanField(default=True) public_primary_key = models.CharField( @@ -83,73 +87,6 @@ def status(self) -> bool: def link(self) -> str: return urljoin(self.alert_receive_channel.integration_url, "heartbeat/") - @classmethod - def perform_heartbeat_check(cls, heartbeat_id: int, task_request_id: str) -> None: - with transaction.atomic(): - heartbeats = cls.objects.filter(pk=heartbeat_id).select_for_update() - if len(heartbeats) == 0: - logger.info(f"Heartbeat {heartbeat_id} not found {task_request_id}") - return - heartbeat = heartbeats[0] - if task_request_id == heartbeat.actual_check_up_task_id: - heartbeat.check_heartbeat_state_and_save() - else: - logger.info(f"Heartbeat {heartbeat_id} is not actual {task_request_id}") - - def check_heartbeat_state_and_save(self) -> bool: - """ - Use this method if you want just check heartbeat status. - """ - state_changed = self.check_heartbeat_state() - if state_changed: - self.save(update_fields=["previous_alerted_state_was_life"]) - return state_changed - - def check_heartbeat_state(self) -> bool: - """ - Actually checking heartbeat. - Use this method if you want to do changes of heartbeat instance while checking its status. - ( See IntegrationHeartBeatAPIView.post() for example ) - """ - state_changed = False - if self.is_expired: - if self.previous_alerted_state_was_life: - self.on_heartbeat_expired() - self.previous_alerted_state_was_life = False - state_changed = True - else: - if not self.previous_alerted_state_was_life: - self.on_heartbeat_restored() - self.previous_alerted_state_was_life = True - state_changed = True - return state_changed - - def on_heartbeat_restored(self) -> None: - create_alert.apply_async( - kwargs={ - "title": self.alert_receive_channel.heartbeat_restored_title, - "message": self.alert_receive_channel.heartbeat_restored_message, - "image_url": None, - "link_to_upstream_details": None, - "alert_receive_channel_pk": self.alert_receive_channel.pk, - "integration_unique_data": {}, - "raw_request_data": self.alert_receive_channel.heartbeat_restored_payload, - }, - ) - - def on_heartbeat_expired(self) -> None: - create_alert.apply_async( - kwargs={ - "title": self.alert_receive_channel.heartbeat_expired_title, - "message": self.alert_receive_channel.heartbeat_expired_message, - "image_url": None, - "link_to_upstream_details": None, - "alert_receive_channel_pk": self.alert_receive_channel.pk, - "integration_unique_data": {}, - "raw_request_data": self.alert_receive_channel.heartbeat_expired_payload, - }, - ) - # Insight logs @property def insight_logs_type_verbal(self) -> str: diff --git a/engine/apps/heartbeat/tasks.py b/engine/apps/heartbeat/tasks.py index 071af2fbcc..5b0e9bf9f6 100644 --- a/engine/apps/heartbeat/tasks.py +++ b/engine/apps/heartbeat/tasks.py @@ -1,57 +1,92 @@ -from time import perf_counter - from celery.utils.log import get_task_logger from django.db import transaction +from django.db.models import DurationField, ExpressionWrapper, F +from django.db.models.functions import Now from django.utils import timezone +from apps.heartbeat.models import IntegrationHeartBeat +from apps.integrations.tasks import create_alert from common.custom_celery_tasks import shared_dedicated_queue_retry_task logger = get_task_logger(__name__) @shared_dedicated_queue_retry_task() -def integration_heartbeat_checkup(heartbeat_id: int) -> None: - from apps.heartbeat.models import IntegrationHeartBeat +def check_heartbeats() -> None: + """ + Periodic task to check heartbeats status change and create alerts (or auto-resolve alerts) if needed + """ + # Heartbeat is considered enabled if it + # * has timeout_seconds set to non-zero (non-default) value, + # * received at least one checkup (last_heartbeat_time set to non-null value) + enabled_heartbeats = ( + IntegrationHeartBeat.objects.filter(last_heartbeat_time__isnull=False).exclude(timeout_seconds=0) + # Convert integer `timeout_seconds`` to datetime.timedelta `timeout` + # microseconds = seconds * 10**6 + # TODO: consider migrate timeout_seconds from IntegerField to DurationField + .annotate(timeout=(ExpressionWrapper(F("timeout_seconds") * 10**6, output_field=DurationField()))) + ) + with transaction.atomic(): + # Heartbeat is considered expired if it + # * is enabled, + # * is not already expired, + # * has not received a checkup for timeout period + expired_heartbeats = enabled_heartbeats.filter(last_heartbeat_time__lte=(timezone.now() - F("timeout"))).filter( + previous_alerted_state_was_life=True + ) + # Schedule alert creation for each expired heartbeat after transaction commit + for heartbeat in expired_heartbeats: + transaction.on_commit( + lambda: create_alert.apply_async( + kwargs={ + "title": heartbeat.alert_receive_channel.heartbeat_expired_title, + "message": heartbeat.alert_receive_channel.heartbeat_expired_message, + "image_url": None, + "link_to_upstream_details": None, + "alert_receive_channel_pk": heartbeat.alert_receive_channel.pk, + "integration_unique_data": {}, + "raw_request_data": heartbeat.alert_receive_channel.heartbeat_expired_payload, + }, + ) + ) + # Update previous_alerted_state_was_life to False + expired_heartbeats.update(previous_alerted_state_was_life=False) + with transaction.atomic(): + # Heartbeat is considered restored if it + # * is enabled, expired, + # * has received a checkup in timeout period from now, + # * was is alerted state (previous_alerted_state_was_life is False) + restored_heartbeats = ( + enabled_heartbeats.select_for_update() + .filter(last_heartbeat_time__gte=(Now() - F("timeout"))) + .filter(previous_alerted_state_was_life=False) + ) + # Schedule auto-resolve alert creation for each expired heartbeat after transaction commit + for heartbeat in restored_heartbeats: + transaction.on_commit( + lambda: create_alert.apply_async( + kwargs={ + "title": heartbeat.alert_receive_channel.heartbeat_restored_title, + "message": heartbeat.alert_receive_channel.heartbeat_restored_message, + "image_url": None, + "link_to_upstream_details": None, + "alert_receive_channel_pk": heartbeat.alert_receive_channel.pk, + "integration_unique_data": {}, + "raw_request_data": heartbeat.alert_receive_channel.heartbeat_restored_payload, + }, + ) + ) + restored_heartbeats.update(previous_alerted_state_was_life=True) - IntegrationHeartBeat.perform_heartbeat_check(heartbeat_id, integration_heartbeat_checkup.request.id) + +@shared_dedicated_queue_retry_task() +def integration_heartbeat_checkup(heartbeat_id: int) -> None: + """Deprecated. TODO: Remove this task after this task cleared from queue""" + pass @shared_dedicated_queue_retry_task() def process_heartbeat_task(alert_receive_channel_pk): - start = perf_counter() - from apps.heartbeat.models import IntegrationHeartBeat - - with transaction.atomic(): - heartbeats = IntegrationHeartBeat.objects.filter( - alert_receive_channel__pk=alert_receive_channel_pk, - ).select_for_update() - if len(heartbeats) == 0: - logger.info(f"Integration Heartbeat for alert_receive_channel {alert_receive_channel_pk} was not found.") - return - else: - heartbeat = heartbeats[0] - heartbeat_selected = perf_counter() - logger.info( - f"IntegrationHeartBeat selected for alert_receive_channel {alert_receive_channel_pk} in {heartbeat_selected - start}" - ) - task = integration_heartbeat_checkup.apply_async( - (heartbeat.pk,), - countdown=heartbeat.timeout_seconds + 1, - ) - is_touched = heartbeat.last_heartbeat_time is not None - heartbeat.actual_check_up_task_id = task.id - heartbeat.last_heartbeat_time = timezone.now() - update_fields = ["actual_check_up_task_id", "last_heartbeat_time"] - task_started = perf_counter() - logger.info( - f"heartbeat_checkup task started for alert_receive_channel {alert_receive_channel_pk} in {task_started - start}" - ) - if is_touched: - state_changed = heartbeat.check_heartbeat_state() - state_checked = perf_counter() - logger.info( - f"state checked for alert_receive_channel {alert_receive_channel_pk} in {state_checked - start}" - ) - if state_changed: - update_fields.append("previous_alerted_state_was_life") - heartbeat.save(update_fields=update_fields) + IntegrationHeartBeat.objects.filter( + alert_receive_channel__pk=alert_receive_channel_pk, + ).first().update(last_heartbeat=timezone().now()) diff --git a/engine/apps/integrations/views.py b/engine/apps/integrations/views.py index 67b26883d5..5353ba0a54 100644 --- a/engine/apps/integrations/views.py +++ b/engine/apps/integrations/views.py @@ -3,6 +3,7 @@ from django.conf import settings from django.core.exceptions import PermissionDenied +from django.db import OperationalError from django.http import HttpResponseBadRequest, JsonResponse from django.utils.decorators import method_decorator from django.views.decorators.csrf import csrf_exempt @@ -267,9 +268,13 @@ def post(self, request): return Response(status=200) def _process_heartbeat_signal(self, request, alert_receive_channel): - process_heartbeat_task.apply_async( - (alert_receive_channel.pk,), - ) + try: + process_heartbeat_task(alert_receive_channel.pk) + # If database is not ready, fallback to celery task + except OperationalError: + process_heartbeat_task.apply_async( + (alert_receive_channel.pk,), + ) class AlertManagerV2View(BrowsableInstructionMixin, AlertChannelDefiningMixin, IntegrationRateLimitMixin, APIView): From 472ecebb60095760e52c649ae599fc6180e42ed4 Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Wed, 2 Aug 2023 18:37:12 +0800 Subject: [PATCH 02/16] Update engine/apps/heartbeat/models.py Co-authored-by: Joey Orlando --- engine/apps/heartbeat/models.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/engine/apps/heartbeat/models.py b/engine/apps/heartbeat/models.py index 4e52456d32..9d39c98b54 100644 --- a/engine/apps/heartbeat/models.py +++ b/engine/apps/heartbeat/models.py @@ -43,8 +43,10 @@ class IntegrationHeartBeat(models.Model): created_at = models.DateTimeField(auto_now_add=True) timeout_seconds = models.IntegerField(default=0) - # Stores the latest received heartbeat signal time last_heartbeat_time = models.DateTimeField(default=None, null=True) + """ + Stores the latest received heartbeat signal time + """ # Deprecated. This field is not used. TODO: remove it last_checkup_task_time = models.DateTimeField(default=None, null=True) # Deprecated. Stored the latest scheduled `integration_heartbeat_checkup` task id. TODO: remove it From 2a5ea678424362503beb0d880150d1612373589c Mon Sep 17 00:00:00 2001 From: Michael Derynck Date: Wed, 2 Aug 2023 16:58:33 -0600 Subject: [PATCH 03/16] Small fixes to process_heartbeat_task, add celery beat schedule --- engine/apps/heartbeat/tasks.py | 2 +- engine/settings/base.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/engine/apps/heartbeat/tasks.py b/engine/apps/heartbeat/tasks.py index 5b0e9bf9f6..0e57b7ad08 100644 --- a/engine/apps/heartbeat/tasks.py +++ b/engine/apps/heartbeat/tasks.py @@ -89,4 +89,4 @@ def integration_heartbeat_checkup(heartbeat_id: int) -> None: def process_heartbeat_task(alert_receive_channel_pk): IntegrationHeartBeat.objects.filter( alert_receive_channel__pk=alert_receive_channel_pk, - ).first().update(last_heartbeat=timezone().now()) + ).update(last_heartbeat_time=timezone.now()) diff --git a/engine/settings/base.py b/engine/settings/base.py index bffb29926b..3de6546cde 100644 --- a/engine/settings/base.py +++ b/engine/settings/base.py @@ -488,6 +488,11 @@ class BrokerTypes: "schedule": 60 * 30, "args": (), }, + "check_heartbeats": { + "task": "apps.heartbeat.tasks.check_heartbeats", + "schedule": 5, + "args": (), + }, } if ESCALATION_AUDITOR_ENABLED: From 1cdb5b7315abbdfa9c168cfab0a9755059fbcd5c Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Wed, 2 Aug 2023 18:40:51 +0800 Subject: [PATCH 04/16] Change comments formatting --- engine/apps/heartbeat/models.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/engine/apps/heartbeat/models.py b/engine/apps/heartbeat/models.py index 9d39c98b54..9d8018df38 100644 --- a/engine/apps/heartbeat/models.py +++ b/engine/apps/heartbeat/models.py @@ -47,12 +47,21 @@ class IntegrationHeartBeat(models.Model): """ Stores the latest received heartbeat signal time """ - # Deprecated. This field is not used. TODO: remove it + last_checkup_task_time = models.DateTimeField(default=None, null=True) - # Deprecated. Stored the latest scheduled `integration_heartbeat_checkup` task id. TODO: remove it + """ + Deprecated. This field is not used. TODO: remove it + """ + actual_check_up_task_id = models.CharField(max_length=100) - # Last status of the heartbeat. Determines if integration was alive on latest checkup + """ + Deprecated. Stored the latest scheduled `integration_heartbeat_checkup` task id. TODO: remove it + """ + previous_alerted_state_was_life = models.BooleanField(default=True) + """ + Last status of the heartbeat. Determines if integration was alive on latest checkup + """ public_primary_key = models.CharField( max_length=20, From 67456c571216ad1d5e9dc320dd5b15fe0e53a6bb Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Thu, 3 Aug 2023 17:16:56 +0800 Subject: [PATCH 05/16] Cleanup and fix tests --- engine/apps/heartbeat/tasks.py | 14 +- .../tests/test_integration_heartbeat.py | 126 +++++++++--------- .../metadata/heartbeat/elastalert.py | 16 +-- .../metadata/heartbeat/formatted_webhook.py | 12 +- .../metadata/heartbeat/grafana.py | 12 +- .../integrations/metadata/heartbeat/prtg.py | 12 +- .../metadata/heartbeat/webhook.py | 14 +- .../integrations/metadata/heartbeat/zabbix.py | 12 +- engine/config_integrations/elastalert.py | 9 +- engine/config_integrations/webhook.py | 19 ++- engine/settings/base.py | 2 +- .../integration-tests/.auth/admin.json | 25 ++++ .../integration-tests/.auth/editor.json | 25 ++++ .../integration-tests/.auth/viewer.json | 25 ++++ 14 files changed, 202 insertions(+), 121 deletions(-) create mode 100644 grafana-plugin/integration-tests/.auth/admin.json create mode 100644 grafana-plugin/integration-tests/.auth/editor.json create mode 100644 grafana-plugin/integration-tests/.auth/viewer.json diff --git a/engine/apps/heartbeat/tasks.py b/engine/apps/heartbeat/tasks.py index 0e57b7ad08..1f0c4e9dbf 100644 --- a/engine/apps/heartbeat/tasks.py +++ b/engine/apps/heartbeat/tasks.py @@ -1,7 +1,6 @@ from celery.utils.log import get_task_logger from django.db import transaction from django.db.models import DurationField, ExpressionWrapper, F -from django.db.models.functions import Now from django.utils import timezone from apps.heartbeat.models import IntegrationHeartBeat @@ -31,8 +30,10 @@ def check_heartbeats() -> None: # * is enabled, # * is not already expired, # * has not received a checkup for timeout period - expired_heartbeats = enabled_heartbeats.filter(last_heartbeat_time__lte=(timezone.now() - F("timeout"))).filter( - previous_alerted_state_was_life=True + expired_heartbeats = ( + enabled_heartbeats.select_for_update() + .filter(last_heartbeat_time__lte=(timezone.now() - F("timeout"))) + .filter(previous_alerted_state_was_life=True) ) # Schedule alert creation for each expired heartbeat after transaction commit for heartbeat in expired_heartbeats: @@ -50,7 +51,7 @@ def check_heartbeats() -> None: ) ) # Update previous_alerted_state_was_life to False - expired_heartbeats.update(previous_alerted_state_was_life=False) + expired_count = expired_heartbeats.update(previous_alerted_state_was_life=False) with transaction.atomic(): # Heartbeat is considered restored if it # * is enabled, expired, @@ -58,7 +59,7 @@ def check_heartbeats() -> None: # * was is alerted state (previous_alerted_state_was_life is False) restored_heartbeats = ( enabled_heartbeats.select_for_update() - .filter(last_heartbeat_time__gte=(Now() - F("timeout"))) + .filter(last_heartbeat_time__gte=(timezone.now() - F("timeout"))) .filter(previous_alerted_state_was_life=False) ) # Schedule auto-resolve alert creation for each expired heartbeat after transaction commit @@ -76,7 +77,8 @@ def check_heartbeats() -> None: }, ) ) - restored_heartbeats.update(previous_alerted_state_was_life=True) + restored_count = restored_heartbeats.update(previous_alerted_state_was_life=True) + return f"Found {expired_count} expired and {restored_count} restored heartbeats" @shared_dedicated_queue_retry_task() diff --git a/engine/apps/heartbeat/tests/test_integration_heartbeat.py b/engine/apps/heartbeat/tests/test_integration_heartbeat.py index c3797dcfd3..e2cdb8c3f7 100644 --- a/engine/apps/heartbeat/tests/test_integration_heartbeat.py +++ b/engine/apps/heartbeat/tests/test_integration_heartbeat.py @@ -4,83 +4,77 @@ from django.utils import timezone from apps.alerts.models import AlertReceiveChannel +from apps.heartbeat.tasks import check_heartbeats +from apps.integrations.tasks import create_alert @pytest.mark.django_db -@patch("apps.heartbeat.models.IntegrationHeartBeat.on_heartbeat_expired", return_value=None) @pytest.mark.parametrize("integration", [AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK]) -def test_integration_heartbeat_expired( - mocked_handler, make_organization_and_user, make_alert_receive_channel, make_integration_heartbeat, integration +def test_check_heartbeats( + make_organization_and_user, + make_alert_receive_channel, + make_integration_heartbeat, + integration, + django_capture_on_commit_callbacks, ): - team, _ = make_organization_and_user() - # Some short timeout and last_heartbeat_time to make sure that heartbeat is expired - timeout = 1 - last_heartbeat_time = timezone.now() - timezone.timedelta(seconds=timeout * 10) - alert_receive_channel = make_alert_receive_channel(team, integration=integration) - integration_heartbeat = make_integration_heartbeat( - alert_receive_channel, timeout, last_heartbeat_time=last_heartbeat_time - ) - integration_heartbeat.check_heartbeat_state_and_save() - assert mocked_handler.called + # No heartbeats, nothing happens + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 0 expired and 0 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 0 - -@pytest.mark.django_db -@patch("apps.heartbeat.models.IntegrationHeartBeat.on_heartbeat_expired", return_value=None) -@pytest.mark.parametrize("integration", [AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK]) -def test_integration_heartbeat_already_expired( - mocked_handler, make_organization_and_user, make_alert_receive_channel, make_integration_heartbeat, integration -): + # Prepare heartbeat team, _ = make_organization_and_user() - # Some short timeout and last_heartbeat_time to make sure that heartbeat is expired - timeout = 1 - last_heartbeat_time = timezone.now() - timezone.timedelta(seconds=timeout * 10) + timeout = 60 + last_heartbeat_time = timezone.now() alert_receive_channel = make_alert_receive_channel(team, integration=integration) integration_heartbeat = make_integration_heartbeat( - alert_receive_channel, - timeout, - last_heartbeat_time=last_heartbeat_time, - previous_alerted_state_was_life=False, + alert_receive_channel, timeout, last_heartbeat_time=last_heartbeat_time, previous_alerted_state_was_life=True ) - integration_heartbeat.check_heartbeat_state_and_save() - assert mocked_handler.called is False + # Heartbeat is alive, nothing happens + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 0 expired and 0 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 0 -@pytest.mark.django_db -@patch("apps.heartbeat.models.IntegrationHeartBeat.on_heartbeat_restored", return_value=None) -@pytest.mark.parametrize("integration", [AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK]) -def test_integration_heartbeat_restored( - mocked_handler, make_organization_and_user, make_alert_receive_channel, make_integration_heartbeat, integration -): - team, _ = make_organization_and_user() - # Some long timeout and last_heartbeat_time to make sure that heartbeat is not expired - timeout = 1000 - last_heartbeat_time = timezone.now() - alert_receive_channel = make_alert_receive_channel(team, integration=integration) - integration_heartbeat = make_integration_heartbeat( - alert_receive_channel, - timeout, - last_heartbeat_time=last_heartbeat_time, - previous_alerted_state_was_life=False, - ) - integration_heartbeat.check_heartbeat_state_and_save() - assert mocked_handler.called + # Hearbeat expires, send an alert + integration_heartbeat.refresh_from_db() + integration_heartbeat.last_heartbeat_time = timezone.now() - timezone.timedelta(seconds=timeout * 10) + integration_heartbeat.save() + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 1 expired and 0 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 1 + # Heartbeat is still expired, nothing happens + integration_heartbeat.refresh_from_db() + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 0 expired and 0 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 0 -@pytest.mark.django_db -@patch("apps.heartbeat.models.IntegrationHeartBeat.on_heartbeat_restored", return_value=None) -@pytest.mark.parametrize("integration", [AlertReceiveChannel.INTEGRATION_FORMATTED_WEBHOOK]) -def test_integration_heartbeat_restored_and_alert_was_not_sent( - mocked_handler, make_organization_and_user, make_alert_receive_channel, make_integration_heartbeat, integration -): - team, _ = make_organization_and_user() - # Some long timeout and last_heartbeat_time to make sure that heartbeat is not expired - timeout = 1000 - last_heartbeat_time = timezone.now() - alert_receive_channel = make_alert_receive_channel(team, integration=integration) - integration_heartbeat = make_integration_heartbeat( - alert_receive_channel, - timeout, - last_heartbeat_time=last_heartbeat_time, - ) - integration_heartbeat.check_heartbeat_state_and_save() - assert mocked_handler.called is False + # Hearbeat restored, send an auto-resolve alert + integration_heartbeat.refresh_from_db() + integration_heartbeat.last_heartbeat_time = timezone.now() + integration_heartbeat.save() + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 0 expired and 1 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 1 + + # Heartbeat is alive, nothing happens + integration_heartbeat.refresh_from_db() + integration_heartbeat.last_heartbeat_time = timezone.now() + integration_heartbeat.save() + integration_heartbeat.refresh_from_db() + with patch.object(create_alert, "apply_async") as mock_create_alert_apply_async: + with django_capture_on_commit_callbacks(execute=True): + result = check_heartbeats() + assert result == "Found 0 expired and 0 restored heartbeats" + assert mock_create_alert_apply_async.call_count == 0 diff --git a/engine/apps/integrations/metadata/heartbeat/elastalert.py b/engine/apps/integrations/metadata/heartbeat/elastalert.py index 04a05d673b..8394577e53 100644 --- a/engine/apps/integrations/metadata/heartbeat/elastalert.py +++ b/engine/apps/integrations/metadata/heartbeat/elastalert.py @@ -12,12 +12,12 @@ heartbeat_expired_payload = { "alert_uid": "0eaf37c8-e1eb-4714-b79e-7c648b6a96fa", "title": heartbeat_expired_title, - "image_url": None, "state": "alerting", - "link_to_upstream_details": None, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = heartbeat_text.heartbeat_restored_title @@ -26,10 +26,10 @@ heartbeat_restored_payload = { "alert_uid": "0eaf37c8-e1eb-4714-b79e-7c648b6a96fa", "title": heartbeat_restored_title, - "image_url": None, "state": "ok", - "link_to_upstream_details": None, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/apps/integrations/metadata/heartbeat/formatted_webhook.py b/engine/apps/integrations/metadata/heartbeat/formatted_webhook.py index 3e44b57e81..72278b15e8 100644 --- a/engine/apps/integrations/metadata/heartbeat/formatted_webhook.py +++ b/engine/apps/integrations/metadata/heartbeat/formatted_webhook.py @@ -17,8 +17,10 @@ "state": "alerting", "link_to_upstream_details": None, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = heartbeat_text.heartbeat_restored_title @@ -31,6 +33,8 @@ "state": "ok", "link_to_upstream_details": None, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/apps/integrations/metadata/heartbeat/grafana.py b/engine/apps/integrations/metadata/heartbeat/grafana.py index a71011edd9..67954ab9c5 100644 --- a/engine/apps/integrations/metadata/heartbeat/grafana.py +++ b/engine/apps/integrations/metadata/heartbeat/grafana.py @@ -14,8 +14,10 @@ "state": "alerting", "title": heartbeat_expired_title, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = f"[OK] {heartbeat_text.heartbeat_restored_title}" @@ -25,6 +27,8 @@ "state": "ok", "title": heartbeat_restored_title, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/apps/integrations/metadata/heartbeat/prtg.py b/engine/apps/integrations/metadata/heartbeat/prtg.py index ddf163335c..42c6965c29 100644 --- a/engine/apps/integrations/metadata/heartbeat/prtg.py +++ b/engine/apps/integrations/metadata/heartbeat/prtg.py @@ -17,8 +17,10 @@ "state": "alerting", "link_to_upstream_details": None, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = heartbeat_text.heartbeat_restored_title @@ -31,6 +33,8 @@ "state": "ok", "link_to_upstream_details": None, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/apps/integrations/metadata/heartbeat/webhook.py b/engine/apps/integrations/metadata/heartbeat/webhook.py index e6283e3600..b3bf350485 100644 --- a/engine/apps/integrations/metadata/heartbeat/webhook.py +++ b/engine/apps/integrations/metadata/heartbeat/webhook.py @@ -13,12 +13,12 @@ heartbeat_expired_payload = { "alert_uid": "7973c835-ff3f-46e4-9444-06df127b6f8e", "title": heartbeat_expired_title, - "image_url": None, "state": "alerting", - "link_to_upstream_details": None, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = heartbeat_text.heartbeat_restored_title @@ -31,6 +31,8 @@ "state": "ok", "link_to_upstream_details": None, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/apps/integrations/metadata/heartbeat/zabbix.py b/engine/apps/integrations/metadata/heartbeat/zabbix.py index b336b75b4b..d961bd836b 100644 --- a/engine/apps/integrations/metadata/heartbeat/zabbix.py +++ b/engine/apps/integrations/metadata/heartbeat/zabbix.py @@ -16,8 +16,10 @@ "state": "alerting", "link_to_upstream_details": None, "message": heartbeat_expired_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": False, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": False, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": False, # Keep for backwards compatibility } heartbeat_restored_title = heartbeat_text.heartbeat_restored_title @@ -30,6 +32,8 @@ "state": "ok", "link_to_upstream_details": None, "message": heartbeat_restored_message, - "is_amixr_heartbeat": True, - "is_amixr_heartbeat_restored": True, + "is_oncall_heartbeat": True, + "is_oncall_heartbeat_restored": True, + "is_amixr_heartbeat": True, # Keep for backwards compatibility + "is_amixr_heartbeat_restored": True, # Keep for backwards compatibility } diff --git a/engine/config_integrations/elastalert.py b/engine/config_integrations/elastalert.py index 2cd00b1aa7..a2e13b30f8 100644 --- a/engine/config_integrations/elastalert.py +++ b/engine/config_integrations/elastalert.py @@ -46,14 +46,7 @@ grouping_id = '{{ payload.get("alert_uid", "")}}' -resolve_condition = """\ -{%- if "is_amixr_heartbeat_restored" in payload -%} -{# We don't know the payload format from your integration. #} -{# The heartbeat alerts will go here so we check for our own key #} -{{ payload["is_amixr_heartbeat_restored"] }} -{%- else -%} -{{ payload.get("state", "").upper() == "OK" }} -{%- endif %}""" +resolve_condition = """{{ payload.get("state", "").upper() == "OK" }}""" acknowledge_condition = None diff --git a/engine/config_integrations/webhook.py b/engine/config_integrations/webhook.py index 945cbb5fec..a492ef72c9 100644 --- a/engine/config_integrations/webhook.py +++ b/engine/config_integrations/webhook.py @@ -45,16 +45,15 @@ source_link = "{{ payload.url }}" -grouping_id = "{{ payload }}" - -resolve_condition = """\ -{%- if "is_amixr_heartbeat_restored" in payload -%} -{# We don't know the payload format from your integration. #} -{# The heartbeat alerts will go here so we check for our own key #} -{{ payload["is_amixr_heartbeat_restored"] }} -{%- else -%} -{{ payload.get("state", "").upper() == "OK" }} -{%- endif %}""" +grouping_id = """\ +{% if "is_oncall_heartbeat" in payload %} +{# Case for heartbeat alerts generated by Grafana OnCall #} +{{- payload.alert_uid }} +{% else %} +{{- payload }} +{% endif %}""" + +resolve_condition = """{{ payload.get("state", "").upper() == "OK" }}""" acknowledge_condition = None example_payload = {"message": "This alert was sent by user for demonstration purposes"} diff --git a/engine/settings/base.py b/engine/settings/base.py index 3de6546cde..b9ab140aa1 100644 --- a/engine/settings/base.py +++ b/engine/settings/base.py @@ -490,7 +490,7 @@ class BrokerTypes: }, "check_heartbeats": { "task": "apps.heartbeat.tasks.check_heartbeats", - "schedule": 5, + "schedule": crontab(minute="*/2"), # every 2 minutes "args": (), }, } diff --git a/grafana-plugin/integration-tests/.auth/admin.json b/grafana-plugin/integration-tests/.auth/admin.json new file mode 100644 index 0000000000..11ab596714 --- /dev/null +++ b/grafana-plugin/integration-tests/.auth/admin.json @@ -0,0 +1,25 @@ +{ + "cookies": [ + { + "name": "grafana_session", + "value": "0270fbb31b7d5f070a4eb1762c3563c6", + "domain": "localhost", + "path": "/", + "expires": 1692959957.132, + "httpOnly": true, + "secure": false, + "sameSite": "Lax" + }, + { + "name": "grafana_session_expiry", + "value": "1690368552", + "domain": "localhost", + "path": "/", + "expires": 1692959957.132, + "httpOnly": false, + "secure": false, + "sameSite": "Lax" + } + ], + "origins": [] +} \ No newline at end of file diff --git a/grafana-plugin/integration-tests/.auth/editor.json b/grafana-plugin/integration-tests/.auth/editor.json new file mode 100644 index 0000000000..c9333c113f --- /dev/null +++ b/grafana-plugin/integration-tests/.auth/editor.json @@ -0,0 +1,25 @@ +{ + "cookies": [ + { + "name": "grafana_session", + "value": "4f21a52a1bfb274c6495859af78b653e", + "domain": "localhost", + "path": "/", + "expires": 1692959957.777, + "httpOnly": true, + "secure": false, + "sameSite": "Lax" + }, + { + "name": "grafana_session_expiry", + "value": "1690368552", + "domain": "localhost", + "path": "/", + "expires": 1692959957.777, + "httpOnly": false, + "secure": false, + "sameSite": "Lax" + } + ], + "origins": [] +} \ No newline at end of file diff --git a/grafana-plugin/integration-tests/.auth/viewer.json b/grafana-plugin/integration-tests/.auth/viewer.json new file mode 100644 index 0000000000..cca56e2c04 --- /dev/null +++ b/grafana-plugin/integration-tests/.auth/viewer.json @@ -0,0 +1,25 @@ +{ + "cookies": [ + { + "name": "grafana_session", + "value": "8076889fdfbf962a8a54c35889c8b139", + "domain": "localhost", + "path": "/", + "expires": 1692959958.156, + "httpOnly": true, + "secure": false, + "sameSite": "Lax" + }, + { + "name": "grafana_session_expiry", + "value": "1690368553", + "domain": "localhost", + "path": "/", + "expires": 1692959958.156, + "httpOnly": false, + "secure": false, + "sameSite": "Lax" + } + ], + "origins": [] +} \ No newline at end of file From 07385b067a7a514cd2d6abaccb1db17d4c1b97d9 Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Thu, 3 Aug 2023 17:43:24 +0800 Subject: [PATCH 06/16] Edit tests --- engine/apps/integrations/tests/test_ratelimit.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/engine/apps/integrations/tests/test_ratelimit.py b/engine/apps/integrations/tests/test_ratelimit.py index 97b5693772..a35c5f73ef 100644 --- a/engine/apps/integrations/tests/test_ratelimit.py +++ b/engine/apps/integrations/tests/test_ratelimit.py @@ -74,11 +74,8 @@ def test_ratelimit_alerts_per_team( @mock.patch("ratelimit.utils._split_rate", return_value=(1, 60)) -@mock.patch("apps.heartbeat.tasks.process_heartbeat_task.apply_async", return_value=None) @pytest.mark.django_db def test_ratelimit_integration_heartbeats( - mocked_task, - mocked_rate, make_organization, make_alert_receive_channel, ): @@ -96,5 +93,3 @@ def test_ratelimit_integration_heartbeats( response = c.get(url) assert response.status_code == 429 - - assert mocked_task.call_count == 1 From 84174008374b33c3172a3b6b561f96b07ce94043 Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Thu, 3 Aug 2023 17:46:17 +0800 Subject: [PATCH 07/16] Edit tests --- engine/apps/integrations/tests/test_ratelimit.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/engine/apps/integrations/tests/test_ratelimit.py b/engine/apps/integrations/tests/test_ratelimit.py index a35c5f73ef..5598c4f786 100644 --- a/engine/apps/integrations/tests/test_ratelimit.py +++ b/engine/apps/integrations/tests/test_ratelimit.py @@ -74,8 +74,11 @@ def test_ratelimit_alerts_per_team( @mock.patch("ratelimit.utils._split_rate", return_value=(1, 60)) +@mock.patch("apps.heartbeat.tasks.process_heartbeat_task.apply_async", return_value=None) @pytest.mark.django_db def test_ratelimit_integration_heartbeats( + mocked_task, + mocked_rate, make_organization, make_alert_receive_channel, ): From 4c50497b389477991c05afbf57f5ac392c3025f6 Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Thu, 3 Aug 2023 18:33:04 +0800 Subject: [PATCH 08/16] Fix tests --- engine/apps/heartbeat/tasks.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/engine/apps/heartbeat/tasks.py b/engine/apps/heartbeat/tasks.py index 1f0c4e9dbf..63e3c62c8d 100644 --- a/engine/apps/heartbeat/tasks.py +++ b/engine/apps/heartbeat/tasks.py @@ -1,6 +1,7 @@ from celery.utils.log import get_task_logger from django.db import transaction from django.db.models import DurationField, ExpressionWrapper, F +from django.db.models.functions import Now from django.utils import timezone from apps.heartbeat.models import IntegrationHeartBeat @@ -32,7 +33,7 @@ def check_heartbeats() -> None: # * has not received a checkup for timeout period expired_heartbeats = ( enabled_heartbeats.select_for_update() - .filter(last_heartbeat_time__lte=(timezone.now() - F("timeout"))) + .filter(last_heartbeat_time__lte=(Now() - F("timeout"))) .filter(previous_alerted_state_was_life=True) ) # Schedule alert creation for each expired heartbeat after transaction commit @@ -59,7 +60,7 @@ def check_heartbeats() -> None: # * was is alerted state (previous_alerted_state_was_life is False) restored_heartbeats = ( enabled_heartbeats.select_for_update() - .filter(last_heartbeat_time__gte=(timezone.now() - F("timeout"))) + .filter(last_heartbeat_time__gte=(Now() - F("timeout"))) .filter(previous_alerted_state_was_life=False) ) # Schedule auto-resolve alert creation for each expired heartbeat after transaction commit From 9ec678e2a6f590bf658092ba8fc60a4cef4acec5 Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Thu, 3 Aug 2023 18:44:35 +0800 Subject: [PATCH 09/16] Fix tests --- engine/apps/heartbeat/tasks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/engine/apps/heartbeat/tasks.py b/engine/apps/heartbeat/tasks.py index 63e3c62c8d..fbcb00d16b 100644 --- a/engine/apps/heartbeat/tasks.py +++ b/engine/apps/heartbeat/tasks.py @@ -1,7 +1,7 @@ from celery.utils.log import get_task_logger from django.db import transaction -from django.db.models import DurationField, ExpressionWrapper, F -from django.db.models.functions import Now +from django.db.models import DateTimeField, DurationField, ExpressionWrapper, F +from django.db.models.functions import Cast from django.utils import timezone from apps.heartbeat.models import IntegrationHeartBeat @@ -33,7 +33,7 @@ def check_heartbeats() -> None: # * has not received a checkup for timeout period expired_heartbeats = ( enabled_heartbeats.select_for_update() - .filter(last_heartbeat_time__lte=(Now() - F("timeout"))) + .filter(last_heartbeat_time__lte=(Cast(timezone.now(), DateTimeField()) - F("timeout"))) .filter(previous_alerted_state_was_life=True) ) # Schedule alert creation for each expired heartbeat after transaction commit @@ -60,7 +60,7 @@ def check_heartbeats() -> None: # * was is alerted state (previous_alerted_state_was_life is False) restored_heartbeats = ( enabled_heartbeats.select_for_update() - .filter(last_heartbeat_time__gte=(Now() - F("timeout"))) + .filter(last_heartbeat_time__gte=(Cast(timezone.now(), DateTimeField()) - F("timeout"))) .filter(previous_alerted_state_was_life=False) ) # Schedule auto-resolve alert creation for each expired heartbeat after transaction commit From 5ba1aa7acb9ad598b5fc1ff36c7232b86388d407 Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Thu, 3 Aug 2023 19:06:51 +0800 Subject: [PATCH 10/16] Fix tests --- engine/apps/heartbeat/tasks.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/engine/apps/heartbeat/tasks.py b/engine/apps/heartbeat/tasks.py index fbcb00d16b..18e70ccf70 100644 --- a/engine/apps/heartbeat/tasks.py +++ b/engine/apps/heartbeat/tasks.py @@ -33,7 +33,9 @@ def check_heartbeats() -> None: # * has not received a checkup for timeout period expired_heartbeats = ( enabled_heartbeats.select_for_update() - .filter(last_heartbeat_time__lte=(Cast(timezone.now(), DateTimeField()) - F("timeout"))) + .filter( + last_heartbeat_time__lte=(Cast(timezone.now(), DateTimeField()) - Cast(F("timeout"), DurationField())) + ) .filter(previous_alerted_state_was_life=True) ) # Schedule alert creation for each expired heartbeat after transaction commit @@ -60,7 +62,9 @@ def check_heartbeats() -> None: # * was is alerted state (previous_alerted_state_was_life is False) restored_heartbeats = ( enabled_heartbeats.select_for_update() - .filter(last_heartbeat_time__gte=(Cast(timezone.now(), DateTimeField()) - F("timeout"))) + .filter( + last_heartbeat_time__gte=(Cast(timezone.now(), DateTimeField()) - Cast(F("timeout"), DurationField())) + ) .filter(previous_alerted_state_was_life=False) ) # Schedule auto-resolve alert creation for each expired heartbeat after transaction commit From b668e7e5f6874e5c834fa042c7913838fee308cc Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Fri, 4 Aug 2023 11:42:40 +0800 Subject: [PATCH 11/16] Add a special case for sqlite --- engine/apps/heartbeat/tasks.py | 50 +++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/engine/apps/heartbeat/tasks.py b/engine/apps/heartbeat/tasks.py index 18e70ccf70..f8b2c4753c 100644 --- a/engine/apps/heartbeat/tasks.py +++ b/engine/apps/heartbeat/tasks.py @@ -1,4 +1,7 @@ +import datetime + from celery.utils.log import get_task_logger +from django.conf import settings from django.db import transaction from django.db.models import DateTimeField, DurationField, ExpressionWrapper, F from django.db.models.functions import Cast @@ -7,6 +10,7 @@ from apps.heartbeat.models import IntegrationHeartBeat from apps.integrations.tasks import create_alert from common.custom_celery_tasks import shared_dedicated_queue_retry_task +from settings.base import DatabaseTypes logger = get_task_logger(__name__) @@ -18,25 +22,31 @@ def check_heartbeats() -> None: """ # Heartbeat is considered enabled if it # * has timeout_seconds set to non-zero (non-default) value, - # * received at least one checkup (last_heartbeat_time set to non-null value) - enabled_heartbeats = ( - IntegrationHeartBeat.objects.filter(last_heartbeat_time__isnull=False).exclude(timeout_seconds=0) - # Convert integer `timeout_seconds`` to datetime.timedelta `timeout` - # microseconds = seconds * 10**6 + # * received at least one checkup (last_heartbeat_time set to non-null value)\ + + def _get_timeout_expression() -> ExpressionWrapper: # TODO: consider migrate timeout_seconds from IntegerField to DurationField - .annotate(timeout=(ExpressionWrapper(F("timeout_seconds") * 10**6, output_field=DurationField()))) + if settings.DATABASES["default"]["ENGINE"] == f"django.db.backends.{DatabaseTypes.SQLITE3}": + # Current Django version (3.2) does not support DurationField multiplying on SQLite + # https://github.com/django/django/commit/54e94640ace261b14cf8cdb1fae3dc6f068a5f87 + # Convert integer `timeout_seconds` to datetime.timedelta `timeout` + # microseconds = seconds * 10**6 + return ExpressionWrapper(F("timeout_seconds") * 10**6, output_field=DurationField()) + else: + return ExpressionWrapper(datetime.timedelta(seconds=1) * F("timeout_seconds"), output_field=DurationField()) + + enabled_heartbeats = ( + IntegrationHeartBeat.objects.filter(last_heartbeat_time__isnull=False) + .exclude(timeout_seconds=0) + .annotate(period_start=(Cast(timezone.now() - _get_timeout_expression(), DateTimeField()))) ) with transaction.atomic(): # Heartbeat is considered expired if it # * is enabled, # * is not already expired, - # * has not received a checkup for timeout period - expired_heartbeats = ( - enabled_heartbeats.select_for_update() - .filter( - last_heartbeat_time__lte=(Cast(timezone.now(), DateTimeField()) - Cast(F("timeout"), DurationField())) - ) - .filter(previous_alerted_state_was_life=True) + # * last check in was before the timeout period start + expired_heartbeats = enabled_heartbeats.select_for_update().filter( + last_heartbeat_time__lte=F("period_start"), previous_alerted_state_was_life=True ) # Schedule alert creation for each expired heartbeat after transaction commit for heartbeat in expired_heartbeats: @@ -57,15 +67,11 @@ def check_heartbeats() -> None: expired_count = expired_heartbeats.update(previous_alerted_state_was_life=False) with transaction.atomic(): # Heartbeat is considered restored if it - # * is enabled, expired, - # * has received a checkup in timeout period from now, - # * was is alerted state (previous_alerted_state_was_life is False) - restored_heartbeats = ( - enabled_heartbeats.select_for_update() - .filter( - last_heartbeat_time__gte=(Cast(timezone.now(), DateTimeField()) - Cast(F("timeout"), DurationField())) - ) - .filter(previous_alerted_state_was_life=False) + # * is enabled, + # * last check in was after the timeout period start, + # * was is alerted state (previous_alerted_state_was_life is False), i.e. was expired + restored_heartbeats = enabled_heartbeats.select_for_update().filter( + last_heartbeat_time__gte=F("period_start"), previous_alerted_state_was_life=False ) # Schedule auto-resolve alert creation for each expired heartbeat after transaction commit for heartbeat in restored_heartbeats: From 0a887cd0cc0568d5c40085f5d71acc3b6d29082f Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Fri, 4 Aug 2023 11:45:24 +0800 Subject: [PATCH 12/16] Fix --- .../integration-tests/.auth/admin.json | 25 ------------------- .../integration-tests/.auth/editor.json | 25 ------------------- .../integration-tests/.auth/viewer.json | 25 ------------------- 3 files changed, 75 deletions(-) delete mode 100644 grafana-plugin/integration-tests/.auth/admin.json delete mode 100644 grafana-plugin/integration-tests/.auth/editor.json delete mode 100644 grafana-plugin/integration-tests/.auth/viewer.json diff --git a/grafana-plugin/integration-tests/.auth/admin.json b/grafana-plugin/integration-tests/.auth/admin.json deleted file mode 100644 index 11ab596714..0000000000 --- a/grafana-plugin/integration-tests/.auth/admin.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "cookies": [ - { - "name": "grafana_session", - "value": "0270fbb31b7d5f070a4eb1762c3563c6", - "domain": "localhost", - "path": "/", - "expires": 1692959957.132, - "httpOnly": true, - "secure": false, - "sameSite": "Lax" - }, - { - "name": "grafana_session_expiry", - "value": "1690368552", - "domain": "localhost", - "path": "/", - "expires": 1692959957.132, - "httpOnly": false, - "secure": false, - "sameSite": "Lax" - } - ], - "origins": [] -} \ No newline at end of file diff --git a/grafana-plugin/integration-tests/.auth/editor.json b/grafana-plugin/integration-tests/.auth/editor.json deleted file mode 100644 index c9333c113f..0000000000 --- a/grafana-plugin/integration-tests/.auth/editor.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "cookies": [ - { - "name": "grafana_session", - "value": "4f21a52a1bfb274c6495859af78b653e", - "domain": "localhost", - "path": "/", - "expires": 1692959957.777, - "httpOnly": true, - "secure": false, - "sameSite": "Lax" - }, - { - "name": "grafana_session_expiry", - "value": "1690368552", - "domain": "localhost", - "path": "/", - "expires": 1692959957.777, - "httpOnly": false, - "secure": false, - "sameSite": "Lax" - } - ], - "origins": [] -} \ No newline at end of file diff --git a/grafana-plugin/integration-tests/.auth/viewer.json b/grafana-plugin/integration-tests/.auth/viewer.json deleted file mode 100644 index cca56e2c04..0000000000 --- a/grafana-plugin/integration-tests/.auth/viewer.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "cookies": [ - { - "name": "grafana_session", - "value": "8076889fdfbf962a8a54c35889c8b139", - "domain": "localhost", - "path": "/", - "expires": 1692959958.156, - "httpOnly": true, - "secure": false, - "sameSite": "Lax" - }, - { - "name": "grafana_session_expiry", - "value": "1690368553", - "domain": "localhost", - "path": "/", - "expires": 1692959958.156, - "httpOnly": false, - "secure": false, - "sameSite": "Lax" - } - ], - "origins": [] -} \ No newline at end of file From f9e91031f8c2c4ae8e61350d02b65ad81027ee6e Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Mon, 7 Aug 2023 19:31:46 +0800 Subject: [PATCH 13/16] Fix for DurationField --- engine/apps/heartbeat/tasks.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/engine/apps/heartbeat/tasks.py b/engine/apps/heartbeat/tasks.py index f8b2c4753c..9f744ac8a1 100644 --- a/engine/apps/heartbeat/tasks.py +++ b/engine/apps/heartbeat/tasks.py @@ -25,15 +25,15 @@ def check_heartbeats() -> None: # * received at least one checkup (last_heartbeat_time set to non-null value)\ def _get_timeout_expression() -> ExpressionWrapper: - # TODO: consider migrate timeout_seconds from IntegerField to DurationField - if settings.DATABASES["default"]["ENGINE"] == f"django.db.backends.{DatabaseTypes.SQLITE3}": - # Current Django version (3.2) does not support DurationField multiplying on SQLite - # https://github.com/django/django/commit/54e94640ace261b14cf8cdb1fae3dc6f068a5f87 - # Convert integer `timeout_seconds` to datetime.timedelta `timeout` + if settings.DATABASES["default"]["ENGINE"] == f"django.db.backends.{DatabaseTypes.POSTGRESQL}": + # DurationField: When used on PostgreSQL, the data type used is an interval + # https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield + return ExpressionWrapper(datetime.timedelta(seconds=1) * F("timeout_seconds"), output_field=DurationField()) + else: + # DurationField: ...Otherwise a bigint of microseconds is used... # microseconds = seconds * 10**6 + # https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield return ExpressionWrapper(F("timeout_seconds") * 10**6, output_field=DurationField()) - else: - return ExpressionWrapper(datetime.timedelta(seconds=1) * F("timeout_seconds"), output_field=DurationField()) enabled_heartbeats = ( IntegrationHeartBeat.objects.filter(last_heartbeat_time__isnull=False) From f2705d0feec9341e245fa237b7f42591d626c26f Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Mon, 7 Aug 2023 19:44:00 +0800 Subject: [PATCH 14/16] Update views.py --- engine/apps/integrations/views.py | 43 ------------------------------- 1 file changed, 43 deletions(-) diff --git a/engine/apps/integrations/views.py b/engine/apps/integrations/views.py index c5f1403c5d..c694d87e88 100644 --- a/engine/apps/integrations/views.py +++ b/engine/apps/integrations/views.py @@ -332,46 +332,3 @@ def _process_heartbeat_signal(self, request, alert_receive_channel): process_heartbeat_task.apply_async( (alert_receive_channel.pk,), ) - - -class AlertManagerV2View(BrowsableInstructionMixin, AlertChannelDefiningMixin, IntegrationRateLimitMixin, APIView): - """ - AlertManagerV2View consumes alerts from AlertManager. It expects data to be in format of AM webhook receiver. - """ - - def post(self, request, *args, **kwargs): - alert_receive_channel = self.request.alert_receive_channel - if not alert_receive_channel.integration == AlertReceiveChannel.INTEGRATION_ALERTMANAGER_V2: - return HttpResponseBadRequest( - f"This url is for integration with {alert_receive_channel.config.title}." - f"Key is for {alert_receive_channel.get_integration_display()}" - ) - alerts = request.data.get("alerts", []) - - data = request.data - if "numFiring" not in request.data: - num_firing = 0 - num_resolved = 0 - for a in alerts: - if a["status"] == "firing": - num_firing += 1 - elif a["status"] == "resolved": - num_resolved += 1 - # Count firing and resolved alerts manually if not present in payload - data = {**request.data, "numFiring": num_firing, "numResolved": num_resolved} - else: - data = request.data - - create_alert.apply_async( - [], - { - "title": None, - "message": None, - "image_url": None, - "link_to_upstream_details": None, - "alert_receive_channel_pk": alert_receive_channel.pk, - "integration_unique_data": None, - "raw_request_data": data, - }, - ) - return Response("Ok.") From e1c120b4cf0efd727ae205528278d8fc9ed92956 Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Mon, 7 Aug 2023 19:45:35 +0800 Subject: [PATCH 15/16] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 02dd268382..fd1032e331 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add stack slug to organization options for direct paging Slash command by @vadimkerr ([#2743](https://github.com/grafana/oncall/pull/2743)) - Avoid creating (or notifying about) potential event splits resulting from untaken swap requests ([#2748](https://github.com/grafana/oncall/pull/2748)) +- Refactor heartbeats into a periodic task ([2723](https://github.com/grafana/oncall/pull/2723)) ### Fixed From 6612d98493b489e5e691592b377257af20cc3815 Mon Sep 17 00:00:00 2001 From: Ildar Iskhakov Date: Thu, 10 Aug 2023 10:18:35 +0800 Subject: [PATCH 16/16] Small fix --- engine/apps/heartbeat/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/apps/heartbeat/tasks.py b/engine/apps/heartbeat/tasks.py index 9f744ac8a1..d02fac9b0f 100644 --- a/engine/apps/heartbeat/tasks.py +++ b/engine/apps/heartbeat/tasks.py @@ -16,7 +16,7 @@ @shared_dedicated_queue_retry_task() -def check_heartbeats() -> None: +def check_heartbeats() -> str: """ Periodic task to check heartbeats status change and create alerts (or auto-resolve alerts) if needed """