Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use periodic task for heartbeats #2723

Merged
merged 21 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Add stack slug to organization options for direct paging Slash command by @vadimkerr ([#2743](https://github.com/grafana/oncall/pull/2743))
- Avoid creating (or notifying about) potential event splits resulting from untaken swap requests ([#2748](https://github.com/grafana/oncall/pull/2748))
- Refactor heartbeats into a periodic task ([2723](https://github.com/grafana/oncall/pull/2723))

### Fixed

Expand Down
86 changes: 17 additions & 69 deletions engine/apps/heartbeat/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models, transaction
from django.db import models
from django.utils import timezone

from apps.integrations.tasks import create_alert
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -43,10 +42,26 @@ class IntegrationHeartBeat(models.Model):

created_at = models.DateTimeField(auto_now_add=True)
timeout_seconds = models.IntegerField(default=0)

last_heartbeat_time = models.DateTimeField(default=None, null=True)
"""
Stores the latest received heartbeat signal time
"""

last_checkup_task_time = models.DateTimeField(default=None, null=True)
"""
Deprecated. This field is not used. TODO: remove it
"""

actual_check_up_task_id = models.CharField(max_length=100)
"""
Deprecated. Stored the latest scheduled `integration_heartbeat_checkup` task id. TODO: remove it
"""

previous_alerted_state_was_life = models.BooleanField(default=True)
"""
Last status of the heartbeat. Determines if integration was alive on latest checkup
"""

public_primary_key = models.CharField(
max_length=20,
Expand Down Expand Up @@ -83,73 +98,6 @@ def status(self) -> bool:
def link(self) -> str:
return urljoin(self.alert_receive_channel.integration_url, "heartbeat/")

@classmethod
def perform_heartbeat_check(cls, heartbeat_id: int, task_request_id: str) -> None:
with transaction.atomic():
heartbeats = cls.objects.filter(pk=heartbeat_id).select_for_update()
if len(heartbeats) == 0:
logger.info(f"Heartbeat {heartbeat_id} not found {task_request_id}")
return
heartbeat = heartbeats[0]
if task_request_id == heartbeat.actual_check_up_task_id:
heartbeat.check_heartbeat_state_and_save()
else:
logger.info(f"Heartbeat {heartbeat_id} is not actual {task_request_id}")

def check_heartbeat_state_and_save(self) -> bool:
"""
Use this method if you want just check heartbeat status.
"""
state_changed = self.check_heartbeat_state()
if state_changed:
self.save(update_fields=["previous_alerted_state_was_life"])
return state_changed

def check_heartbeat_state(self) -> bool:
"""
Actually checking heartbeat.
Use this method if you want to do changes of heartbeat instance while checking its status.
( See IntegrationHeartBeatAPIView.post() for example )
"""
state_changed = False
if self.is_expired:
if self.previous_alerted_state_was_life:
self.on_heartbeat_expired()
self.previous_alerted_state_was_life = False
state_changed = True
else:
if not self.previous_alerted_state_was_life:
self.on_heartbeat_restored()
self.previous_alerted_state_was_life = True
state_changed = True
return state_changed

def on_heartbeat_restored(self) -> None:
create_alert.apply_async(
kwargs={
"title": self.alert_receive_channel.heartbeat_restored_title,
"message": self.alert_receive_channel.heartbeat_restored_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": self.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": self.alert_receive_channel.heartbeat_restored_payload,
},
)

def on_heartbeat_expired(self) -> None:
create_alert.apply_async(
kwargs={
"title": self.alert_receive_channel.heartbeat_expired_title,
"message": self.alert_receive_channel.heartbeat_expired_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": self.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": self.alert_receive_channel.heartbeat_expired_payload,
},
)

# Insight logs
@property
def insight_logs_type_verbal(self) -> str:
Expand Down
128 changes: 88 additions & 40 deletions engine/apps/heartbeat/tasks.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,105 @@
from time import perf_counter
import datetime

from celery.utils.log import get_task_logger
from django.conf import settings
from django.db import transaction
from django.db.models import DateTimeField, DurationField, ExpressionWrapper, F
from django.db.models.functions import Cast
from django.utils import timezone

from apps.heartbeat.models import IntegrationHeartBeat
from apps.integrations.tasks import create_alert
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from settings.base import DatabaseTypes

logger = get_task_logger(__name__)


@shared_dedicated_queue_retry_task()
def integration_heartbeat_checkup(heartbeat_id: int) -> None:
from apps.heartbeat.models import IntegrationHeartBeat

IntegrationHeartBeat.perform_heartbeat_check(heartbeat_id, integration_heartbeat_checkup.request.id)

def check_heartbeats() -> str:
"""
Periodic task to check heartbeats status change and create alerts (or auto-resolve alerts) if needed
"""
# Heartbeat is considered enabled if it
# * has timeout_seconds set to non-zero (non-default) value,
# * received at least one checkup (last_heartbeat_time set to non-null value)\

@shared_dedicated_queue_retry_task()
def process_heartbeat_task(alert_receive_channel_pk):
start = perf_counter()
from apps.heartbeat.models import IntegrationHeartBeat
def _get_timeout_expression() -> ExpressionWrapper:
if settings.DATABASES["default"]["ENGINE"] == f"django.db.backends.{DatabaseTypes.POSTGRESQL}":
# DurationField: When used on PostgreSQL, the data type used is an interval
# https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield
return ExpressionWrapper(datetime.timedelta(seconds=1) * F("timeout_seconds"), output_field=DurationField())
else:
# DurationField: ...Otherwise a bigint of microseconds is used...
# microseconds = seconds * 10**6
# https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield
return ExpressionWrapper(F("timeout_seconds") * 10**6, output_field=DurationField())

enabled_heartbeats = (
IntegrationHeartBeat.objects.filter(last_heartbeat_time__isnull=False)
.exclude(timeout_seconds=0)
.annotate(period_start=(Cast(timezone.now() - _get_timeout_expression(), DateTimeField())))
)
with transaction.atomic():
heartbeats = IntegrationHeartBeat.objects.filter(
alert_receive_channel__pk=alert_receive_channel_pk,
).select_for_update()
if len(heartbeats) == 0:
logger.info(f"Integration Heartbeat for alert_receive_channel {alert_receive_channel_pk} was not found.")
return
else:
heartbeat = heartbeats[0]
heartbeat_selected = perf_counter()
logger.info(
f"IntegrationHeartBeat selected for alert_receive_channel {alert_receive_channel_pk} in {heartbeat_selected - start}"
# Heartbeat is considered expired if it
# * is enabled,
# * is not already expired,
# * last check in was before the timeout period start
expired_heartbeats = enabled_heartbeats.select_for_update().filter(
last_heartbeat_time__lte=F("period_start"), previous_alerted_state_was_life=True
)
task = integration_heartbeat_checkup.apply_async(
(heartbeat.pk,),
countdown=heartbeat.timeout_seconds + 1,
)
is_touched = heartbeat.last_heartbeat_time is not None
heartbeat.actual_check_up_task_id = task.id
heartbeat.last_heartbeat_time = timezone.now()
update_fields = ["actual_check_up_task_id", "last_heartbeat_time"]
task_started = perf_counter()
logger.info(
f"heartbeat_checkup task started for alert_receive_channel {alert_receive_channel_pk} in {task_started - start}"
# Schedule alert creation for each expired heartbeat after transaction commit
for heartbeat in expired_heartbeats:
transaction.on_commit(
lambda: create_alert.apply_async(
kwargs={
"title": heartbeat.alert_receive_channel.heartbeat_expired_title,
"message": heartbeat.alert_receive_channel.heartbeat_expired_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": heartbeat.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": heartbeat.alert_receive_channel.heartbeat_expired_payload,
},
)
)
# Update previous_alerted_state_was_life to False
expired_count = expired_heartbeats.update(previous_alerted_state_was_life=False)
with transaction.atomic():
# Heartbeat is considered restored if it
# * is enabled,
# * last check in was after the timeout period start,
# * was is alerted state (previous_alerted_state_was_life is False), i.e. was expired
restored_heartbeats = enabled_heartbeats.select_for_update().filter(
last_heartbeat_time__gte=F("period_start"), previous_alerted_state_was_life=False
)
if is_touched:
state_changed = heartbeat.check_heartbeat_state()
state_checked = perf_counter()
logger.info(
f"state checked for alert_receive_channel {alert_receive_channel_pk} in {state_checked - start}"
# Schedule auto-resolve alert creation for each expired heartbeat after transaction commit
for heartbeat in restored_heartbeats:
transaction.on_commit(
lambda: create_alert.apply_async(
kwargs={
"title": heartbeat.alert_receive_channel.heartbeat_restored_title,
"message": heartbeat.alert_receive_channel.heartbeat_restored_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": heartbeat.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": heartbeat.alert_receive_channel.heartbeat_restored_payload,
},
)
)
if state_changed:
update_fields.append("previous_alerted_state_was_life")
heartbeat.save(update_fields=update_fields)
restored_count = restored_heartbeats.update(previous_alerted_state_was_life=True)
return f"Found {expired_count} expired and {restored_count} restored heartbeats"


@shared_dedicated_queue_retry_task()
def integration_heartbeat_checkup(heartbeat_id: int) -> None:
"""Deprecated. TODO: Remove this task after this task cleared from queue"""
pass


@shared_dedicated_queue_retry_task()
def process_heartbeat_task(alert_receive_channel_pk):
IntegrationHeartBeat.objects.filter(
alert_receive_channel__pk=alert_receive_channel_pk,
).update(last_heartbeat_time=timezone.now())
Loading