Skip to content

Commit

Permalink
fix(count-requests): hotfix 3.18.13 lock instance and await for task …
Browse files Browse the repository at this point in the history
…before update
  • Loading branch information
Hicham committed Feb 27, 2024
1 parent b9d3f6e commit e8e7ef0
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 25 deletions.
2 changes: 1 addition & 1 deletion admin_cohort/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__title__ = 'Portail/Cohort360 API'
__version__ = '3.18.12'
__version__ = '3.18.13'
__author__ = 'Assistance Publique - Hopitaux de Paris, Département I&D'


Expand Down
7 changes: 4 additions & 3 deletions cohort/services/cohort_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ def process_cohort_creation(request, cohort_uuid: str):
if cohort.dated_measure_global:
dm_global = cohort.dated_measure_global
try:
get_count_task.delay(auth_headers,
cohort.request_query_snapshot.serialized_query,
dm_global.uuid)
get_count_task.s(auth_headers=auth_headers,
json_query=cohort.request_query_snapshot.serialized_query,
dm_uuid=dm_global.uuid)\
.apply_async()
except Exception as e:
dm_global.request_job_fail_msg = f"ERROR: Could not launch cohort global count: {e}"
dm_global.request_job_status = JobStatus.failed
Expand Down
7 changes: 4 additions & 3 deletions cohort/services/dated_measure.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ def process_dated_measure(self, dm_uuid: str, request):
cancel_previously_running_dm_jobs.delay(dm_uuid)
try:
auth_headers = get_authorization_header(request)
get_count_task.delay(auth_headers,
dm.request_query_snapshot.serialized_query,
dm_uuid)
get_count_task.s(auth_headers=auth_headers,
json_query=dm.request_query_snapshot.serialized_query,
dm_uuid=dm_uuid)\
.apply_async()
except Exception as e:
dm.delete()
raise ServerError("INTERNAL ERROR: Could not launch count request") from e
Expand Down
38 changes: 38 additions & 0 deletions cohort/services/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import functools

from django.core.cache import cache


def locked_instance_task(task):
def acquire_lock(lock_id: str):
return cache.add(lock_id, lock_id, 5*60)

def release_lock(lock_id: str):
cache.delete(lock_id)

@functools.wraps(task)
def wrapper(*args, **kwargs):
instance_id = kwargs.get("dm_uuid")
locked = acquire_lock(instance_id)
if locked:
try:
task(*args, **kwargs)
finally:
release_lock(instance_id)
return wrapper


def await_celery_task(func):

def retrieve_lock(lock_id: str):
return cache.get(lock_id)

@functools.wraps(func)
def wrapper(*args, **kwargs):
dm_id = kwargs.get("uuid")
lock = retrieve_lock(lock_id=dm_id)
while lock is not None:
lock = retrieve_lock(lock_id=dm_id)
else:
return func(*args, **kwargs)
return wrapper
34 changes: 17 additions & 17 deletions cohort/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from smtplib import SMTPException

from celery import shared_task, current_task
from django.db import transaction

import cohort.services.conf_cohort_job_api as cohort_job_api
from admin_cohort import celery_app
Expand All @@ -12,6 +11,7 @@
from cohort.models.dated_measure import GLOBAL_DM_MODE
from cohort.services.emails import send_email_notif_feasibility_report_requested, send_email_notif_error_feasibility_report
from cohort.services.misc import log_count_task, log_create_task, log_feasibility_study_task
from cohort.services.decorators import locked_instance_task

_logger = logging.getLogger('django.request')

Expand All @@ -36,23 +36,23 @@ def create_cohort_task(auth_headers: dict, json_query: str, cohort_uuid: str):


@shared_task
def get_count_task(auth_headers: dict, json_query: str, dm_uuid: str):
@locked_instance_task
def get_count_task(auth_headers=None, json_query=None, dm_uuid=None):
dm = DatedMeasure.objects.get(uuid=dm_uuid)
with transaction.atomic():
dm.count_task_id = current_task.request.id or ""
dm.request_job_status = JobStatus.pending
dm.save()
resp = cohort_job_api.post_count_cohort(dm_uuid=dm_uuid,
json_query=json_query,
auth_headers=auth_headers,
global_estimate=dm.mode == GLOBAL_DM_MODE)
if resp.success:
dm.request_job_id = resp.fhir_job_id
else:
dm.request_job_status = JobStatus.failed
dm.request_job_fail_msg = resp.err_msg
dm.save()
log_count_task(dm_uuid, resp.success and "DatedMeasure updated" or resp.err_msg)
dm.count_task_id = current_task.request.id or ""
dm.request_job_status = JobStatus.pending
dm.save()
resp = cohort_job_api.post_count_cohort(dm_uuid=dm_uuid,
json_query=json_query,
auth_headers=auth_headers,
global_estimate=dm.mode == GLOBAL_DM_MODE)
if resp.success:
dm.request_job_id = resp.fhir_job_id
else:
dm.request_job_status = JobStatus.failed
dm.request_job_fail_msg = resp.err_msg
dm.save()
log_count_task(dm_uuid, resp.success and "DatedMeasure updated" or resp.err_msg)


@shared_task
Expand Down
2 changes: 1 addition & 1 deletion cohort/tests/tests_view_dated_measure.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def setUp(self):

@mock.patch('cohort.services.dated_measure.get_authorization_header')
@mock.patch('cohort.services.dated_measure.cancel_previously_running_dm_jobs.delay')
@mock.patch('cohort.services.dated_measure.get_count_task.delay')
@mock.patch('cohort.services.dated_measure.get_count_task.apply_async')
def check_create_case_with_mock(self, case: DMCreateCase, mock_count_task: MagicMock, mock_cancel_task: MagicMock, mock_header: MagicMock,
other_view: any, view_kwargs: dict):
mock_header.return_value = None
Expand Down
2 changes: 2 additions & 0 deletions cohort/views/dated_measure.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from cohort.serializers import DatedMeasureSerializer
from cohort.services.dated_measure import dated_measure_service, JOB_STATUS, MINIMUM, MAXIMUM, COUNT
from cohort.services.misc import is_sjs_user
from cohort.services.decorators import await_celery_task
from cohort.views.shared import UserObjectsRestrictedViewSet

_logger = logging.getLogger('info')
Expand Down Expand Up @@ -81,6 +82,7 @@ def create(self, request, *args, **kwargs):
required=[JOB_STATUS, MINIMUM, MAXIMUM, COUNT]),
responses={'200': openapi.Response("DatedMeasure updated successfully", DatedMeasureSerializer()),
'400': openapi.Response("Bad Request")})
@await_celery_task
def partial_update(self, request, *args, **kwargs):
try:
dated_measure_service.process_patch_data(dm=self.get_object(), data=request.data)
Expand Down

0 comments on commit e8e7ef0

Please sign in to comment.