Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use periodic task for heartbeats #2723

Merged
merged 21 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 8 additions & 69 deletions engine/apps/heartbeat/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models, transaction
from django.db import models
from django.utils import timezone

from apps.integrations.tasks import create_alert
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -43,9 +42,16 @@ class IntegrationHeartBeat(models.Model):

created_at = models.DateTimeField(auto_now_add=True)
timeout_seconds = models.IntegerField(default=0)

last_heartbeat_time = models.DateTimeField(default=None, null=True)
"""
Stores the latest received heartbeat signal time
"""
# Deprecated. This field is not used. TODO: remove it
last_checkup_task_time = models.DateTimeField(default=None, null=True)
# Deprecated. Stored the latest scheduled `integration_heartbeat_checkup` task id. TODO: remove it
actual_check_up_task_id = models.CharField(max_length=100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be safe to simply drop these two fields in this PR? Or at a minimum remove them from the model without dropping them from the db, so that a subsequent PR we can drop them? (Vadim docs have more info on this)

# Last status of the heartbeat. Determines if integration was alive on latest checkup
previous_alerted_state_was_life = models.BooleanField(default=True)
iskhakov marked this conversation as resolved.
Show resolved Hide resolved

public_primary_key = models.CharField(
Expand Down Expand Up @@ -83,73 +89,6 @@ def status(self) -> bool:
def link(self) -> str:
return urljoin(self.alert_receive_channel.integration_url, "heartbeat/")

@classmethod
def perform_heartbeat_check(cls, heartbeat_id: int, task_request_id: str) -> None:
with transaction.atomic():
heartbeats = cls.objects.filter(pk=heartbeat_id).select_for_update()
if len(heartbeats) == 0:
logger.info(f"Heartbeat {heartbeat_id} not found {task_request_id}")
return
heartbeat = heartbeats[0]
if task_request_id == heartbeat.actual_check_up_task_id:
heartbeat.check_heartbeat_state_and_save()
else:
logger.info(f"Heartbeat {heartbeat_id} is not actual {task_request_id}")

def check_heartbeat_state_and_save(self) -> bool:
"""
Use this method if you want just check heartbeat status.
"""
state_changed = self.check_heartbeat_state()
if state_changed:
self.save(update_fields=["previous_alerted_state_was_life"])
return state_changed

def check_heartbeat_state(self) -> bool:
"""
Actually checking heartbeat.
Use this method if you want to do changes of heartbeat instance while checking its status.
( See IntegrationHeartBeatAPIView.post() for example )
"""
state_changed = False
if self.is_expired:
if self.previous_alerted_state_was_life:
self.on_heartbeat_expired()
self.previous_alerted_state_was_life = False
state_changed = True
else:
if not self.previous_alerted_state_was_life:
self.on_heartbeat_restored()
self.previous_alerted_state_was_life = True
state_changed = True
return state_changed

def on_heartbeat_restored(self) -> None:
create_alert.apply_async(
kwargs={
"title": self.alert_receive_channel.heartbeat_restored_title,
"message": self.alert_receive_channel.heartbeat_restored_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": self.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": self.alert_receive_channel.heartbeat_restored_payload,
},
)

def on_heartbeat_expired(self) -> None:
create_alert.apply_async(
kwargs={
"title": self.alert_receive_channel.heartbeat_expired_title,
"message": self.alert_receive_channel.heartbeat_expired_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": self.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": self.alert_receive_channel.heartbeat_expired_payload,
},
)

# Insight logs
@property
def insight_logs_type_verbal(self) -> str:
Expand Down
119 changes: 77 additions & 42 deletions engine/apps/heartbeat/tasks.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,92 @@
from time import perf_counter

from celery.utils.log import get_task_logger
from django.db import transaction
from django.db.models import DurationField, ExpressionWrapper, F
from django.db.models.functions import Now
from django.utils import timezone

from apps.heartbeat.models import IntegrationHeartBeat
from apps.integrations.tasks import create_alert
from common.custom_celery_tasks import shared_dedicated_queue_retry_task

logger = get_task_logger(__name__)


@shared_dedicated_queue_retry_task()
def integration_heartbeat_checkup(heartbeat_id: int) -> None:
from apps.heartbeat.models import IntegrationHeartBeat
def check_heartbeats() -> None:
"""
Periodic task to check heartbeats status change and create alerts (or auto-resolve alerts) if needed
"""
# Heartbeat is considered enabled if it
# * has timeout_seconds set to non-zero (non-default) value,
# * received at least one checkup (last_heartbeat_time set to non-null value)
enabled_heartbeats = (
IntegrationHeartBeat.objects.filter(last_heartbeat_time__isnull=False).exclude(timeout_seconds=0)
# Convert integer `timeout_seconds`` to datetime.timedelta `timeout`
# microseconds = seconds * 10**6
# TODO: consider migrate timeout_seconds from IntegerField to DurationField
.annotate(timeout=(ExpressionWrapper(F("timeout_seconds") * 10**6, output_field=DurationField())))
)
with transaction.atomic():
# Heartbeat is considered expired if it
# * is enabled,
# * is not already expired,
# * has not received a checkup for timeout period
expired_heartbeats = enabled_heartbeats.filter(last_heartbeat_time__lte=(timezone.now() - F("timeout"))).filter(
previous_alerted_state_was_life=True
)
# Schedule alert creation for each expired heartbeat after transaction commit
for heartbeat in expired_heartbeats:
transaction.on_commit(
lambda: create_alert.apply_async(
kwargs={
"title": heartbeat.alert_receive_channel.heartbeat_expired_title,
"message": heartbeat.alert_receive_channel.heartbeat_expired_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": heartbeat.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": heartbeat.alert_receive_channel.heartbeat_expired_payload,
},
)
)
# Update previous_alerted_state_was_life to False
expired_heartbeats.update(previous_alerted_state_was_life=False)
with transaction.atomic():
# Heartbeat is considered restored if it
# * is enabled, expired,
# * has received a checkup in timeout period from now,
# * was is alerted state (previous_alerted_state_was_life is False)
restored_heartbeats = (
enabled_heartbeats.select_for_update()
.filter(last_heartbeat_time__gte=(Now() - F("timeout")))
.filter(previous_alerted_state_was_life=False)
)
# Schedule auto-resolve alert creation for each expired heartbeat after transaction commit
for heartbeat in restored_heartbeats:
transaction.on_commit(
lambda: create_alert.apply_async(
kwargs={
"title": heartbeat.alert_receive_channel.heartbeat_restored_title,
"message": heartbeat.alert_receive_channel.heartbeat_restored_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": heartbeat.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": heartbeat.alert_receive_channel.heartbeat_restored_payload,
},
)
)
restored_heartbeats.update(previous_alerted_state_was_life=True)
iskhakov marked this conversation as resolved.
Show resolved Hide resolved

IntegrationHeartBeat.perform_heartbeat_check(heartbeat_id, integration_heartbeat_checkup.request.id)

@shared_dedicated_queue_retry_task()
def integration_heartbeat_checkup(heartbeat_id: int) -> None:
"""Deprecated. TODO: Remove this task after this task cleared from queue"""
pass


@shared_dedicated_queue_retry_task()
def process_heartbeat_task(alert_receive_channel_pk):
start = perf_counter()
from apps.heartbeat.models import IntegrationHeartBeat

with transaction.atomic():
heartbeats = IntegrationHeartBeat.objects.filter(
alert_receive_channel__pk=alert_receive_channel_pk,
).select_for_update()
if len(heartbeats) == 0:
logger.info(f"Integration Heartbeat for alert_receive_channel {alert_receive_channel_pk} was not found.")
return
else:
heartbeat = heartbeats[0]
heartbeat_selected = perf_counter()
logger.info(
f"IntegrationHeartBeat selected for alert_receive_channel {alert_receive_channel_pk} in {heartbeat_selected - start}"
)
task = integration_heartbeat_checkup.apply_async(
(heartbeat.pk,),
countdown=heartbeat.timeout_seconds + 1,
)
is_touched = heartbeat.last_heartbeat_time is not None
heartbeat.actual_check_up_task_id = task.id
heartbeat.last_heartbeat_time = timezone.now()
update_fields = ["actual_check_up_task_id", "last_heartbeat_time"]
task_started = perf_counter()
logger.info(
f"heartbeat_checkup task started for alert_receive_channel {alert_receive_channel_pk} in {task_started - start}"
)
if is_touched:
state_changed = heartbeat.check_heartbeat_state()
state_checked = perf_counter()
logger.info(
f"state checked for alert_receive_channel {alert_receive_channel_pk} in {state_checked - start}"
)
if state_changed:
update_fields.append("previous_alerted_state_was_life")
heartbeat.save(update_fields=update_fields)
IntegrationHeartBeat.objects.filter(
alert_receive_channel__pk=alert_receive_channel_pk,
).first().update(last_heartbeat=timezone().now())
52 changes: 50 additions & 2 deletions engine/apps/integrations/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from django.conf import settings
from django.core.exceptions import PermissionDenied
from django.db import OperationalError
from django.http import HttpResponseBadRequest, JsonResponse
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
Expand Down Expand Up @@ -324,6 +325,53 @@ def post(self, request):
return Response(status=200)

def _process_heartbeat_signal(self, request, alert_receive_channel):
process_heartbeat_task.apply_async(
(alert_receive_channel.pk,),
try:
process_heartbeat_task(alert_receive_channel.pk)
# If database is not ready, fallback to celery task
except OperationalError:
process_heartbeat_task.apply_async(
(alert_receive_channel.pk,),
)


class AlertManagerV2View(BrowsableInstructionMixin, AlertChannelDefiningMixin, IntegrationRateLimitMixin, APIView):
"""
AlertManagerV2View consumes alerts from AlertManager. It expects data to be in format of AM webhook receiver.
"""

def post(self, request, *args, **kwargs):
alert_receive_channel = self.request.alert_receive_channel
if not alert_receive_channel.integration == AlertReceiveChannel.INTEGRATION_ALERTMANAGER_V2:
return HttpResponseBadRequest(
f"This url is for integration with {alert_receive_channel.config.title}."
f"Key is for {alert_receive_channel.get_integration_display()}"
)
alerts = request.data.get("alerts", [])

data = request.data
if "numFiring" not in request.data:
num_firing = 0
num_resolved = 0
for a in alerts:
if a["status"] == "firing":
num_firing += 1
elif a["status"] == "resolved":
num_resolved += 1
# Count firing and resolved alerts manually if not present in payload
data = {**request.data, "numFiring": num_firing, "numResolved": num_resolved}
else:
data = request.data

create_alert.apply_async(
[],
{
"title": None,
"message": None,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": alert_receive_channel.pk,
"integration_unique_data": None,
"raw_request_data": data,
},
)
return Response("Ok.")