Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer most signal handling code until after the end of the current transaction #7460

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
### Fixed

- Side effects of data changes, such as the sending of webhooks,
are no longer triggered until after the changes have been committed
to the database
(<https://github.com/opencv/cvat/pull/7460>)
18 changes: 14 additions & 4 deletions cvat/apps/analytics_report/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: MIT

from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver

Expand All @@ -19,12 +20,21 @@
)
def __save_job__update_analytics_report(instance, created, **kwargs):
if isinstance(instance, Project):
AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(project=instance)
kwargs = {"project": instance}
elif isinstance(instance, Task):
AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(task=instance)
kwargs = {"task": instance}
elif isinstance(instance, Job):
AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(job=instance)
kwargs = {"job": instance}
elif isinstance(instance, Annotation):
AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(job=instance.job)
kwargs = {"job": instance.job}
else:
assert False

def schedule_autoupdate_job():
if any(v.id is None for v in kwargs.values()):
# The object may have been deleted after the on_commit call.
return

AnalyticsReportUpdateManager().schedule_analytics_report_autoupdate_job(**kwargs)

transaction.on_commit(schedule_autoupdate_job, robust=True)
21 changes: 15 additions & 6 deletions cvat/apps/engine/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
# Copyright (C) 2023 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT
import functools
import shutil

from django.contrib.auth.models import User
from django.db import transaction
from django.db.models.signals import post_delete, post_save
from django.dispatch import receiver

Expand Down Expand Up @@ -45,17 +47,21 @@ def __save_user_handler(instance, **kwargs):
@receiver(post_delete, sender=Project,
dispatch_uid=__name__ + ".delete_project_handler")
def __delete_project_handler(instance, **kwargs):
shutil.rmtree(instance.get_dirname(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_dirname(), ignore_errors=True))

@receiver(post_delete, sender=Asset,
dispatch_uid=__name__ + ".__delete_asset_handler")
def __delete_asset_handler(instance, **kwargs):
shutil.rmtree(instance.get_asset_dir(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_asset_dir(), ignore_errors=True))

@receiver(post_delete, sender=Task,
dispatch_uid=__name__ + ".delete_task_handler")
def __delete_task_handler(instance, **kwargs):
shutil.rmtree(instance.get_dirname(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_dirname(), ignore_errors=True))

if instance.data and not instance.data.tasks.exists():
instance.data.delete()

Expand All @@ -69,14 +75,17 @@ def __delete_task_handler(instance, **kwargs):
@receiver(post_delete, sender=Job,
dispatch_uid=__name__ + ".delete_job_handler")
def __delete_job_handler(instance, **kwargs):
shutil.rmtree(instance.get_dirname(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_dirname(), ignore_errors=True))

@receiver(post_delete, sender=Data,
dispatch_uid=__name__ + ".delete_data_handler")
def __delete_data_handler(instance, **kwargs):
shutil.rmtree(instance.get_data_dirname(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_data_dirname(), ignore_errors=True))

@receiver(post_delete, sender=CloudStorage,
dispatch_uid=__name__ + ".delete_cloudstorage_handler")
def __delete_cloudstorage_handler(instance, **kwargs):
shutil.rmtree(instance.get_storage_dirname(), ignore_errors=True)
transaction.on_commit(
functools.partial(shutil.rmtree, instance.get_storage_dirname(), ignore_errors=True))
8 changes: 6 additions & 2 deletions cvat/apps/engine/tests/test_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,8 @@ def test_api_v2_projects_delete_project_data_after_delete_project(self):
task_dir = task.get_dirname()
self.assertTrue(os.path.exists(task_dir))

self._check_api_v2_projects_id(self.admin)
with self.captureOnCommitCallbacks(execute=True):
self._check_api_v2_projects_id(self.admin)

for project in self.projects:
project_dir = project.get_dirname()
Expand Down Expand Up @@ -2019,7 +2020,10 @@ def test_api_v2_tasks_delete_task_data_after_delete_task(self):
for task in self.tasks:
task_dir = task.get_dirname()
self.assertTrue(os.path.exists(task_dir))
self._check_api_v2_tasks_id(self.admin)

with self.captureOnCommitCallbacks(execute=True):
self._check_api_v2_tasks_id(self.admin)

for task in self.tasks:
task_dir = task.get_dirname()
self.assertFalse(os.path.exists(task_dir))
Expand Down
12 changes: 10 additions & 2 deletions cvat/apps/quality_control/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: MIT

from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver

Expand Down Expand Up @@ -37,8 +38,15 @@ def __save_job__update_quality_metrics(instance, created, **kwargs):
else:
assert False

for task in tasks:
qc.QualityReportUpdateManager().schedule_quality_autoupdate_job(task)
def schedule_autoupdate_jobs():
for task in tasks:
if task.id is None:
# The task may have been deleted after the on_commit call.
continue
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved

qc.QualityReportUpdateManager().schedule_quality_autoupdate_job(task)

transaction.on_commit(schedule_autoupdate_jobs, robust=True)


@receiver(post_save, sender=Task, dispatch_uid=__name__ + ".save_task-initialize_quality_settings")
Expand Down
13 changes: 10 additions & 3 deletions cvat/apps/webhooks/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import requests
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.db.models.signals import (post_delete, post_save, pre_delete,
pre_save)
from django.dispatch import Signal, receiver
Expand Down Expand Up @@ -186,7 +187,10 @@ def post_save_resource_event(sender, instance, created, **kwargs):
else:
return

batch_add_to_queue(filtered_webhooks, data)
transaction.on_commit(
lambda: batch_add_to_queue(filtered_webhooks, data),
robust=True,
)


@receiver(pre_delete, sender=Project, dispatch_uid=__name__ + ":project:pre_delete")
Expand Down Expand Up @@ -232,9 +236,12 @@ def post_delete_resource_event(sender, instance, **kwargs):
"sender": get_sender(instance),
}

batch_add_to_queue(filtered_webhooks, data)
related_webhooks = [webhook for webhook in getattr(instance, "_related_webhooks", []) if webhook.id not in map(lambda a: a.id, filtered_webhooks)]
batch_add_to_queue(related_webhooks, data)

transaction.on_commit(
lambda: batch_add_to_queue(filtered_webhooks + related_webhooks, data),
robust=True,
)


@receiver(signal_redelivery)
Expand Down
Loading