From f883838764c1365590ac078026bc9e9ac8959bb6 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Tue, 28 May 2024 19:03:58 +0300 Subject: [PATCH] Fix dataset downloading (#7864) ### Motivation and context This PR addresses several problems: - when requesting a dataset download, it's possible to get the 500 error with the message "The result file does not exist in export cache", which isn't expected for this request - when downloading the dataset the same error can be obtained if the file is requested right before the cache expiration - there are several [TOCTOU](https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use) bugs related to dataset cache file existence checks - under some conditions, it's possible that the export job is never started - the finished RQ jobs were removed automatically on result reporting (after the client requested the result). This made it hard to debug problems for admins, as the jobs were often removed already This PR fixes the problems by the following: - introduced dataset cache file locking (via redis) during reading, writing, and removal - the 500 error is changed to automatic reexporting attempt on export status request - the 500 error is changed to 404 when the file is not available for downloading - the exported files are now have different names for each instance update time - the lifetime of the exported files is now automatically prolonged on each export request for the file (given the export is still valid) - the deferred export jobs are now checked to have ghost dependencies. If so, the requested job is restarted - added several environment variables for configuration - finished RQ export jobs are not removed automatically on result retrieval. Now, they just use the export cache lifetime instead (should be continued in another PR) ### How has this been tested? ### Checklist - [ ] I submit my changes into the `develop` branch - [ ] I have created a changelog fragment - [ ] I have updated the documentation accordingly - [ ] I have added tests to cover my changes - [ ] I have linked related issues (see [GitHub docs]( https://help.github.com/en/github/managing-your-work-on-github/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword)) - [ ] I have increased versions of npm packages if it is necessary ([cvat-canvas](https://github.com/cvat-ai/cvat/tree/develop/cvat-canvas#versioning), [cvat-core](https://github.com/cvat-ai/cvat/tree/develop/cvat-core#versioning), [cvat-data](https://github.com/cvat-ai/cvat/tree/develop/cvat-data#versioning) and [cvat-ui](https://github.com/cvat-ai/cvat/tree/develop/cvat-ui#versioning)) ### License - [ ] I submit _my code changes_ under the same [MIT License]( https://github.com/cvat-ai/cvat/blob/develop/LICENSE) that covers the project. Feel free to contact the maintainers if that's a concern. ## Summary by CodeRabbit - **New Features** - Improved reliability of file handling during export and cleanup processes. - Introduced new functionality for managing export cache locks and directories. - **Bug Fixes** - Addressed race conditions in concurrent export and cleanup operations. - **Dependencies** - Updated multiple packages to their latest versions for enhanced security and performance: - `cryptography` to `42.0.7` - `django` to `4.2.13` - `django-health-check` to `3.18.2` - `freezegun` to `1.5.1` - `jinja2` to `3.1.4` - `limits` to `3.12.0` - `lxml` to `5.2.2` - `orjson` to `3.10.3` - Added `pottery` version `3.0.0` - Updated `tqdm` to `4.66.4` --- ...144202_mzhiltso_fix_dataset_downloading.md | 5 + cvat/apps/dataset_manager/apps.py | 18 + cvat/apps/dataset_manager/default_settings.py | 14 + .../tests/test_rest_api_formats.py | 809 +++++++++++++++++- cvat/apps/dataset_manager/util.py | 159 +++- cvat/apps/dataset_manager/views.py | 185 ++-- cvat/apps/engine/tests/test_rest_api_3D.py | 28 +- cvat/apps/engine/tests/utils.py | 53 +- cvat/apps/engine/views.py | 110 ++- cvat/requirements/base.in | 1 + cvat/requirements/base.txt | 27 +- cvat/requirements/development.txt | 5 +- cvat/requirements/testing.in | 2 +- cvat/requirements/testing.txt | 7 +- .../rest_api/test_resource_import_export.py | 6 +- utils/dataset_manifest/requirements.txt | 4 +- 16 files changed, 1264 insertions(+), 169 deletions(-) create mode 100644 changelog.d/20240508_144202_mzhiltso_fix_dataset_downloading.md create mode 100644 cvat/apps/dataset_manager/apps.py create mode 100644 cvat/apps/dataset_manager/default_settings.py diff --git a/changelog.d/20240508_144202_mzhiltso_fix_dataset_downloading.md b/changelog.d/20240508_144202_mzhiltso_fix_dataset_downloading.md new file mode 100644 index 000000000000..a33c56df635d --- /dev/null +++ b/changelog.d/20240508_144202_mzhiltso_fix_dataset_downloading.md @@ -0,0 +1,5 @@ +### Fixed + +- The 500 / "The result file does not exist in export cache" error + on dataset export request + () diff --git a/cvat/apps/dataset_manager/apps.py b/cvat/apps/dataset_manager/apps.py new file mode 100644 index 000000000000..3e62d078171c --- /dev/null +++ b/cvat/apps/dataset_manager/apps.py @@ -0,0 +1,18 @@ +# Copyright (C) 2024 CVAT.ai Corporation +# +# SPDX-License-Identifier: MIT + +from django.apps import AppConfig + + +class DatasetManagerConfig(AppConfig): + name = "cvat.apps.dataset_manager" + + def ready(self) -> None: + from django.conf import settings + + from . import default_settings + + for key in dir(default_settings): + if key.isupper() and not hasattr(settings, key): + setattr(settings, key, getattr(default_settings, key)) diff --git a/cvat/apps/dataset_manager/default_settings.py b/cvat/apps/dataset_manager/default_settings.py new file mode 100644 index 000000000000..1499bd2857cc --- /dev/null +++ b/cvat/apps/dataset_manager/default_settings.py @@ -0,0 +1,14 @@ +# Copyright (C) 2024 CVAT.ai Corporation +# +# SPDX-License-Identifier: MIT + +import os + +DATASET_CACHE_TTL = int(os.getenv("CVAT_DATASET_CACHE_TTL", 60 * 60 * 10)) +"Base lifetime for cached exported datasets, in seconds" + +DATASET_CACHE_LOCK_TIMEOUT = int(os.getenv("CVAT_DATASET_CACHE_LOCK_TIMEOUT", 10)) +"Timeout for cache lock acquiring, in seconds" + +DATASET_EXPORT_LOCKED_RETRY_INTERVAL = int(os.getenv("CVAT_DATASET_EXPORT_LOCKED_RETRY_INTERVAL", 60)) +"Retry interval for cases the export cache lock was unavailable, in seconds" diff --git a/cvat/apps/dataset_manager/tests/test_rest_api_formats.py b/cvat/apps/dataset_manager/tests/test_rest_api_formats.py index 7c30344d2bab..b8be48b173b6 100644 --- a/cvat/apps/dataset_manager/tests/test_rest_api_formats.py +++ b/cvat/apps/dataset_manager/tests/test_rest_api_formats.py @@ -1,32 +1,42 @@ # Copyright (C) 2021-2022 Intel Corporation -# Copyright (C) 2022-2023 CVAT.ai Corporation +# Copyright (C) 2022-2024 CVAT.ai Corporation # # SPDX-License-Identifier: MIT import copy +import itertools import json import os.path as osp import os +import multiprocessing import av import numpy as np import random import xml.etree.ElementTree as ET import zipfile +from contextlib import ExitStack, contextmanager +from datetime import timedelta +from functools import partial from io import BytesIO -import itertools +from tempfile import TemporaryDirectory +from time import sleep +from typing import Any, Callable, ClassVar, Optional, overload +from unittest.mock import MagicMock, patch, DEFAULT as MOCK_DEFAULT +from attr import define, field from datumaro.components.dataset import Dataset from datumaro.util.test_utils import compare_datasets, TestDir from django.contrib.auth.models import Group, User from PIL import Image from rest_framework import status -from rest_framework.test import APIClient, APITestCase import cvat.apps.dataset_manager as dm from cvat.apps.dataset_manager.bindings import CvatTaskOrJobDataExtractor, TaskData from cvat.apps.dataset_manager.task import TaskAnnotation +from cvat.apps.dataset_manager.util import get_export_cache_lock +from cvat.apps.dataset_manager.views import clear_export_cache, export, parse_export_file_path from cvat.apps.engine.models import Task -from cvat.apps.engine.tests.utils import get_paginated_collection +from cvat.apps.engine.tests.utils import get_paginated_collection, ApiTestBase, ForceLogin projects_path = osp.join(osp.dirname(__file__), 'assets', 'projects.json') with open(projects_path) as file: @@ -86,26 +96,7 @@ def generate_video_file(filename, width=1280, height=720, duration=1, fps=25, co return [(width, height)] * total_frames, f -class ForceLogin: - def __init__(self, user, client): - self.user = user - self.client = client - - def __enter__(self): - if self.user: - self.client.force_login(self.user, - backend='django.contrib.auth.backends.ModelBackend') - - return self - - def __exit__(self, exception_type, exception_value, traceback): - if self.user: - self.client.logout() - -class _DbTestBase(APITestCase): - def setUp(self): - self.client = APIClient() - +class _DbTestBase(ApiTestBase): @classmethod def setUpTestData(cls): cls.create_db_users() @@ -427,6 +418,8 @@ def test_api_v2_dump_and_upload_annotations_with_objects_type_is_shape(self): url = self._generate_url_dump_tasks_annotations(task_id) for user, edata in list(expected.items()): + self._clear_rq_jobs() # clean up from previous tests and iterations + user_name = edata['name'] file_zip_name = osp.join(test_dir, f'{test_name}_{user_name}_{dump_format_name}.zip') data = { @@ -532,6 +525,8 @@ def test_api_v2_dump_annotations_with_objects_type_is_track(self): url = self._generate_url_dump_tasks_annotations(task_id) for user, edata in list(expected.items()): + self._clear_rq_jobs() # clean up from previous tests and iterations + user_name = edata['name'] file_zip_name = osp.join(test_dir, f'{test_name}_{user_name}_{dump_format_name}.zip') data = { @@ -617,6 +612,8 @@ def test_api_v2_dump_tag_annotations(self): for user, edata in list(expected.items()): with self.subTest(format=f"{edata['name']}"): with TestDir() as test_dir: + self._clear_rq_jobs() # clean up from previous tests and iterations + user_name = edata['name'] url = self._generate_url_dump_tasks_annotations(task_id) @@ -865,6 +862,8 @@ def test_api_v2_export_dataset(self): # dump annotations url = self._generate_url_dump_task_dataset(task_id) for user, edata in list(expected.items()): + self._clear_rq_jobs() # clean up from previous tests and iterations + user_name = edata['name'] file_zip_name = osp.join(test_dir, f'{test_name}_{user_name}_{dump_format_name}.zip') data = { @@ -1265,6 +1264,764 @@ def test_api_v2_check_attribute_import_in_tracks(self): data_from_task_after_upload = self._get_data_from_task(task_id, include_images) compare_datasets(self, data_from_task_before_upload, data_from_task_after_upload) +class ExportBehaviorTest(_DbTestBase): + @define + class SharedBase: + condition: multiprocessing.Condition = field(factory=multiprocessing.Condition, init=False) + + @define + class SharedBool(SharedBase): + value: multiprocessing.Value = field( + factory=partial(multiprocessing.Value, 'i', 0), init=False + ) + + def set(self, value: bool = True): + self.value.value = int(value) + + def get(self) -> bool: + return bool(self.value.value) + + @define + class SharedString(SharedBase): + MAX_LEN: ClassVar[int] = 2048 + + value: multiprocessing.Value = field( + factory=partial(multiprocessing.Array, 'c', MAX_LEN), init=False + ) + + def set(self, value: str): + self.value.get_obj().value = value.encode()[ : self.MAX_LEN - 1] + + def get(self) -> str: + return self.value.get_obj().value.decode() + + class _LockTimeoutError(Exception): + pass + + @overload + @classmethod + def set_condition(cls, var: SharedBool, value: bool = True): ... + + @overload + @classmethod + def set_condition(cls, var: SharedBase, value: Any): ... + + _not_set = object() + + @classmethod + def set_condition(cls, var: SharedBase, value: Any = _not_set): + if isinstance(var, cls.SharedBool) and value is cls._not_set: + value = True + + with var.condition: + var.set(value) + var.condition.notify() + + @classmethod + def wait_condition(cls, var: SharedBase, timeout: Optional[int] = 5): + with var.condition: + if not var.condition.wait(timeout): + raise cls._LockTimeoutError + + @staticmethod + def side_effect(f: Callable, *args, **kwargs) -> Callable: + """ + Wraps the passed function to be executed with the given parameters + and return the regular mock output + """ + + def wrapped(*_, **__): + f(*args, **kwargs) + return MOCK_DEFAULT + + return wrapped + + @staticmethod + def chain_side_effects(*calls: Callable) -> Callable: + """ + Makes a callable that calls all the passed functions sequentially, + and returns the last call result + """ + + def wrapped(*args, **kwargs): + result = MOCK_DEFAULT + + for f in calls: + new_result = f(*args, **kwargs) + if new_result is not MOCK_DEFAULT: + result = new_result + + return result + + return wrapped + + @staticmethod + @contextmanager + def process_closing(process: multiprocessing.Process, *, timeout: Optional[int] = 10): + try: + yield process + finally: + if process.is_alive(): + process.terminate() + + process.join(timeout=timeout) + process.close() + + def test_concurrent_export_and_cleanup(self): + side_effect = self.side_effect + chain_side_effects = self.chain_side_effects + set_condition = self.set_condition + wait_condition = self.wait_condition + _LockTimeoutError = self._LockTimeoutError + process_closing = self.process_closing + + format_name = "CVAT for images 1.1" + + export_cache_lock = multiprocessing.Lock() + + export_checked_the_file = self.SharedBool() + export_created_the_file = self.SharedBool() + export_file_path = self.SharedString() + clear_removed_the_file = self.SharedBool() + + @contextmanager + def patched_get_export_cache_lock(export_path, *, ttl, block=True, acquire_timeout=None): + # fakeredis lock acquired in a subprocess won't be visible to other processes + # just implement the lock here + from cvat.apps.dataset_manager.util import LockNotAvailableError + + if isinstance(acquire_timeout, timedelta): + acquire_timeout = acquire_timeout.total_seconds() + if acquire_timeout is None: + acquire_timeout = -1 + + acquired = export_cache_lock.acquire( + block=block, + timeout=acquire_timeout if acquire_timeout > -1 else None + ) + + if not acquired: + raise LockNotAvailableError + + try: + yield + finally: + export_cache_lock.release() + + def _export(*_, task_id: int): + from os.path import exists as original_exists + from os import replace as original_replace + from cvat.apps.dataset_manager.views import log_exception as original_log_exception + import sys + + def os_replace_dst_recorder(_: str, dst: str): + set_condition(export_file_path, dst) + return MOCK_DEFAULT + + def patched_log_exception(logger=None, exc_info=True): + cur_exc_info = sys.exc_info() if exc_info is True else exc_info + if cur_exc_info and cur_exc_info[1] and isinstance(cur_exc_info[1], _LockTimeoutError): + return # don't spam in logs with expected errors + + original_log_exception(logger, exc_info) + + with ( + patch('cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_TIMEOUT', new=5), + patch( + 'cvat.apps.dataset_manager.views.get_export_cache_lock', + new=patched_get_export_cache_lock + ), + patch('cvat.apps.dataset_manager.views.osp.exists') as mock_osp_exists, + patch('cvat.apps.dataset_manager.views.os.replace') as mock_os_replace, + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch('cvat.apps.dataset_manager.views.log_exception', new=patched_log_exception), + ): + mock_osp_exists.side_effect = chain_side_effects( + original_exists, + side_effect(set_condition, export_checked_the_file), + ) + + mock_os_replace.side_effect = chain_side_effects( + original_replace, + os_replace_dst_recorder, + side_effect(set_condition, export_created_the_file), + side_effect(wait_condition, clear_removed_the_file), + ) + + mock_rq_get_current_job.return_value = MagicMock(timeout=5) + + exited_by_timeout = False + try: + export(dst_format=format_name, task_id=task_id) + except _LockTimeoutError: + # should come from waiting for clear_removed_the_file + exited_by_timeout = True + + assert exited_by_timeout + mock_os_replace.assert_called_once() + + + def _clear(*_, file_path: str, file_ctime: str): + from os import remove as original_remove + from cvat.apps.dataset_manager.util import LockNotAvailableError + + with ( + patch('cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_TIMEOUT', new=5), + patch( + 'cvat.apps.dataset_manager.views.get_export_cache_lock', + new=patched_get_export_cache_lock + ), + patch('cvat.apps.dataset_manager.views.os.remove') as mock_os_remove, + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), + ): + mock_os_remove.side_effect = chain_side_effects( + side_effect(wait_condition, export_created_the_file), + original_remove, + side_effect(set_condition, clear_removed_the_file), + ) + + mock_rq_get_current_job.return_value = MagicMock(timeout=5) + + exited_by_timeout = False + try: + clear_export_cache( + file_path=file_path, file_ctime=file_ctime, logger=MagicMock() + ) + except LockNotAvailableError: + # should come from waiting for get_export_cache_lock + exited_by_timeout = True + + assert exited_by_timeout + + + # The problem checked is TOCTOU / race condition for file existence check and + # further file creation / removal. There are several possible variants of the problem. + # An example: + # 1. export checks the file exists, but outdated + # 2. clear checks the file exists, and matches the creation timestamp + # 3. export creates the new export file + # 4. remove removes the new export file (instead of the one that it checked) + # Thus, we have no exported file after the successful export. + # + # Other variants can be variations on the intermediate calls, such as getmtime: + # - export: exists() + # - clear: remove() + # - export: getmtime() -> an exception + # etc. + + images = self._generate_task_images(3) + task = self._create_task(tasks["main"], images) + self._create_annotations(task, f'{format_name} many jobs', "default") + task_id = task["id"] + + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + ): + mock_rq_job = MagicMock(timeout=5) + mock_rq_get_current_job.return_value = mock_rq_job + + first_export_path = export(dst_format=format_name, task_id=task_id) + + export_instance_timestamp = parse_export_file_path(first_export_path).instance_timestamp + + self._create_annotations(task, f'{format_name} many jobs', "default") + + processes_finished_correctly = False + with ExitStack() as es: + # Run both operations concurrently + # Threads could be faster, but they can't be terminated + export_process = es.enter_context(process_closing(multiprocessing.Process( + target=_export, + args=( + export_cache_lock, + export_checked_the_file, export_created_the_file, + export_file_path, clear_removed_the_file, + ), + kwargs=dict(task_id=task_id), + ))) + clear_process = es.enter_context(process_closing(multiprocessing.Process( + target=_clear, + args=( + export_cache_lock, + export_checked_the_file, export_created_the_file, + export_file_path, clear_removed_the_file, + ), + kwargs=dict(file_path=first_export_path, file_ctime=export_instance_timestamp), + ))) + + export_process.start() + + wait_condition(export_checked_the_file) # ensure the expected execution order + clear_process.start() + + # A deadlock (interrupted by a timeout error) is the positive outcome in this test, + # if the problem is fixed. + # clear() must wait for the export cache lock release (acquired by export()). + # It must be finished by a timeout, as export() holds it, waiting + clear_process.join(timeout=10) + + # export() must wait for the clear() file existence check and fail because of timeout + export_process.join(timeout=10) + + self.assertFalse(export_process.is_alive()) + self.assertFalse(clear_process.is_alive()) + + # All the expected exceptions should be handled in the process callbacks. + # This is to avoid passing the test with unexpected errors + self.assertEqual(export_process.exitcode, 0) + self.assertEqual(clear_process.exitcode, 0) + + processes_finished_correctly = True + + self.assertTrue(processes_finished_correctly) + + # terminate() may break the locks, don't try to acquire + # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate + self.assertTrue(export_checked_the_file.get()) + self.assertTrue(export_created_the_file.get()) + + self.assertFalse(clear_removed_the_file.get()) + + new_export_path = export_file_path.get() + self.assertGreater(len(new_export_path), 0) + self.assertTrue(osp.isfile(new_export_path)) + + def test_concurrent_download_and_cleanup(self): + side_effect = self.side_effect + chain_side_effects = self.chain_side_effects + set_condition = self.set_condition + wait_condition = self.wait_condition + process_closing = self.process_closing + + format_name = "CVAT for images 1.1" + + export_cache_lock = multiprocessing.Lock() + + download_checked_the_file = self.SharedBool() + clear_removed_the_file = self.SharedBool() + + images = self._generate_task_images(3) + task = self._create_task(tasks["main"], images) + self._create_annotations(task, f'{format_name} many jobs', "default") + task_id = task["id"] + + download_url = self._generate_url_dump_tasks_annotations(task_id) + download_params = { + "format": format_name, + } + + @contextmanager + def patched_get_export_cache_lock(export_path, *, ttl, block=True, acquire_timeout=None): + # fakeredis lock acquired in a subprocess won't be visible to other processes + # just implement the lock here + from cvat.apps.dataset_manager.util import LockNotAvailableError + + if isinstance(acquire_timeout, timedelta): + acquire_timeout = acquire_timeout.total_seconds() + if acquire_timeout is None: + acquire_timeout = -1 + + acquired = export_cache_lock.acquire( + block=block, + timeout=acquire_timeout if acquire_timeout > -1 else None + ) + + if not acquired: + raise LockNotAvailableError + + try: + yield + finally: + export_cache_lock.release() + + def _download(*_, task_id: int, export_path: str): + from os.path import exists as original_exists + + def patched_osp_exists(path: str): + result = original_exists(path) + + if path == export_path: + set_condition(download_checked_the_file) + wait_condition( + clear_removed_the_file, timeout=20 + ) # wait more than the process timeout + + return result + + with ( + patch( + 'cvat.apps.engine.views.dm.util.get_export_cache_lock', + new=patched_get_export_cache_lock + ), + patch('cvat.apps.dataset_manager.views.osp.exists') as mock_osp_exists, + TemporaryDirectory() as temp_dir, + ): + mock_osp_exists.side_effect = patched_osp_exists + + response = self._get_request_with_data(download_url, download_params, self.admin) + self.assertEqual(response.status_code, status.HTTP_200_OK) + + content = BytesIO(b"".join(response.streaming_content)) + with open(osp.join(temp_dir, "export.zip"), "wb") as f: + f.write(content.getvalue()) + + mock_osp_exists.assert_called() + + def _clear(*_, file_path: str, file_ctime: str): + from os import remove as original_remove + from cvat.apps.dataset_manager.util import LockNotAvailableError + + with ( + patch('cvat.apps.dataset_manager.views.EXPORT_CACHE_LOCK_TIMEOUT', new=5), + patch( + 'cvat.apps.dataset_manager.views.get_export_cache_lock', + new=patched_get_export_cache_lock + ), + patch('cvat.apps.dataset_manager.views.os.remove') as mock_os_remove, + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), + ): + mock_os_remove.side_effect = chain_side_effects( + original_remove, + side_effect(set_condition, clear_removed_the_file), + ) + + mock_rq_get_current_job.return_value = MagicMock(timeout=5) + + exited_by_timeout = False + try: + clear_export_cache( + file_path=file_path, file_ctime=file_ctime, logger=MagicMock() + ) + except LockNotAvailableError: + # should come from waiting for get_export_cache_lock + exited_by_timeout = True + + assert exited_by_timeout + + + # The problem checked is TOCTOU / race condition for file existence check and + # further file reading / removal. There are several possible variants of the problem. + # An example: + # 1. download exports the file + # 2. download checks the export is still relevant + # 3. clear checks the file exists + # 4. clear removes the export file + # 5. download checks if the file exists -> an exception + # + # There can be variations on the intermediate calls, such as: + # - download: exists() + # - clear: remove() + # - download: open() -> an exception + # etc. + + export_path = None + + def patched_export(*args, **kwargs): + nonlocal export_path + + result = export(*args, **kwargs) + export_path = result + + return result + + with patch('cvat.apps.dataset_manager.views.export', new=patched_export): + response = self._get_request_with_data(download_url, download_params, self.admin) + self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) + + response = self._get_request_with_data(download_url, download_params, self.admin) + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + export_instance_time = parse_export_file_path(export_path).instance_timestamp + + download_params["action"] = "download" + + processes_finished_correctly = False + with ExitStack() as es: + # Run both operations concurrently + # Threads could be faster, but they can't be terminated + download_process = es.enter_context(process_closing(multiprocessing.Process( + target=_download, + args=(download_checked_the_file, clear_removed_the_file, export_cache_lock), + kwargs=dict(task_id=task_id, export_path=export_path), + ))) + clear_process = es.enter_context(process_closing(multiprocessing.Process( + target=_clear, + args=(download_checked_the_file, clear_removed_the_file, export_cache_lock), + kwargs=dict(file_path=export_path, file_ctime=export_instance_time), + ))) + + download_process.start() + + wait_condition(download_checked_the_file) # ensure the expected execution order + clear_process.start() + + # A deadlock (interrupted by a timeout error) is the positive outcome in this test, + # if the problem is fixed. + # clear() must wait for the export cache lock release (acquired by download()). + # It must be finished by a timeout, as download() holds it, waiting + clear_process.join(timeout=5) + + # download() must wait for the clear() file existence check and fail because of timeout + download_process.join(timeout=5) + + self.assertTrue(download_process.is_alive()) + self.assertFalse(clear_process.is_alive()) + + download_process.terminate() + download_process.join(timeout=5) + + # All the expected exceptions should be handled in the process callbacks. + # This is to avoid passing the test with unexpected errors + self.assertEqual(download_process.exitcode, -15) # sigterm + self.assertEqual(clear_process.exitcode, 0) + + processes_finished_correctly = True + + self.assertTrue(processes_finished_correctly) + + # terminate() may break the locks, don't try to acquire + # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate + self.assertTrue(download_checked_the_file.get()) + + self.assertFalse(clear_removed_the_file.get()) + + def test_export_can_create_file_and_cleanup_job(self): + format_name = "CVAT for images 1.1" + images = self._generate_task_images(3) + task = self._create_task(tasks["main"], images) + self._create_annotations(task, f'{format_name} many jobs', "default") + task_id = task["id"] + + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler') as mock_rq_get_scheduler, + patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), + ): + mock_rq_job = MagicMock(timeout=5) + mock_rq_get_current_job.return_value = mock_rq_job + + mock_rq_scheduler = MagicMock() + mock_rq_get_scheduler.return_value = mock_rq_scheduler + + export_path = export(dst_format=format_name, task_id=task_id) + + self.assertTrue(osp.isfile(export_path)) + mock_rq_scheduler.enqueue_in.assert_called_once() + + def test_export_cache_lock_can_raise_on_releasing_expired_lock(self): + from pottery import ReleaseUnlockedLock + + with self.assertRaises(ReleaseUnlockedLock): + lock_time = 2 + with get_export_cache_lock('test_export_path', ttl=lock_time, acquire_timeout=5): + sleep(lock_time + 1) + + def test_export_can_request_retry_on_locking_failure(self): + format_name = "CVAT for images 1.1" + images = self._generate_task_images(3) + task = self._create_task(tasks["main"], images) + self._create_annotations(task, f'{format_name} many jobs', "default") + task_id = task["id"] + + from cvat.apps.dataset_manager.util import LockNotAvailableError + with ( + patch( + 'cvat.apps.dataset_manager.views.get_export_cache_lock', + side_effect=LockNotAvailableError + ) as mock_get_export_cache_lock, + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + self.assertRaises(LockNotAvailableError), + ): + mock_rq_job = MagicMock(timeout=5) + mock_rq_get_current_job.return_value = mock_rq_job + + export(dst_format=format_name, task_id=task_id) + + mock_get_export_cache_lock.assert_called() + self.assertEqual(mock_rq_job.retries_left, 1) + self.assertEqual(len(mock_rq_job.retry_intervals), 1) + + def test_export_can_reuse_older_file_if_still_relevant(self): + format_name = "CVAT for images 1.1" + images = self._generate_task_images(3) + task = self._create_task(tasks["main"], images) + self._create_annotations(task, f'{format_name} many jobs', "default") + task_id = task["id"] + + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + ): + mock_rq_get_current_job.return_value = MagicMock(timeout=5) + + first_export_path = export(dst_format=format_name, task_id=task_id) + + from os.path import exists as original_exists + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch('cvat.apps.dataset_manager.views.osp.exists', side_effect=original_exists) as mock_osp_exists, + patch('cvat.apps.dataset_manager.views.os.replace') as mock_os_replace, + ): + mock_rq_get_current_job.return_value = MagicMock(timeout=5) + + second_export_path = export(dst_format=format_name, task_id=task_id) + + self.assertEqual(first_export_path, second_export_path) + mock_osp_exists.assert_called_with(first_export_path) + mock_os_replace.assert_not_called() + + def test_cleanup_can_remove_file(self): + format_name = "CVAT for images 1.1" + images = self._generate_task_images(3) + task = self._create_task(tasks["main"], images) + self._create_annotations(task, f'{format_name} many jobs', "default") + task_id = task["id"] + + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + ): + mock_rq_get_current_job.return_value = MagicMock(timeout=5) + + export_path = export(dst_format=format_name, task_id=task_id) + + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), + ): + mock_rq_get_current_job.return_value = MagicMock(timeout=5) + + export_path = export(dst_format=format_name, task_id=task_id) + file_ctime = parse_export_file_path(export_path).instance_timestamp + clear_export_cache(file_path=export_path, file_ctime=file_ctime, logger=MagicMock()) + + self.assertFalse(osp.isfile(export_path)) + + def test_cleanup_can_request_retry_on_locking_failure(self): + format_name = "CVAT for images 1.1" + images = self._generate_task_images(3) + task = self._create_task(tasks["main"], images) + self._create_annotations(task, f'{format_name} many jobs', "default") + task_id = task["id"] + + from cvat.apps.dataset_manager.util import LockNotAvailableError + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + ): + mock_rq_get_current_job.return_value = MagicMock(timeout=5) + + export_path = export(dst_format=format_name, task_id=task_id) + + with ( + patch( + 'cvat.apps.dataset_manager.views.get_export_cache_lock', + side_effect=LockNotAvailableError + ) as mock_get_export_cache_lock, + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + self.assertRaises(LockNotAvailableError), + ): + mock_rq_job = MagicMock(timeout=5) + mock_rq_get_current_job.return_value = mock_rq_job + + file_ctime = parse_export_file_path(export_path).instance_timestamp + clear_export_cache(file_path=export_path, file_ctime=file_ctime, logger=MagicMock()) + + mock_get_export_cache_lock.assert_called() + self.assertEqual(mock_rq_job.retries_left, 1) + self.assertEqual(len(mock_rq_job.retry_intervals), 1) + self.assertTrue(osp.isfile(export_path)) + + def test_cleanup_can_fail_if_no_file(self): + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + self.assertRaises(FileNotFoundError), + ): + mock_rq_job = MagicMock(timeout=5) + mock_rq_get_current_job.return_value = mock_rq_job + + clear_export_cache(file_path="non existent file path", file_ctime=0, logger=MagicMock()) + + def test_cleanup_can_defer_removal_if_file_is_used_recently(self): + format_name = "CVAT for images 1.1" + images = self._generate_task_images(3) + task = self._create_task(tasks["main"], images) + self._create_annotations(task, f'{format_name} many jobs', "default") + task_id = task["id"] + + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + ): + mock_rq_get_current_job.return_value = MagicMock(timeout=5) + + export_path = export(dst_format=format_name, task_id=task_id) + + from cvat.apps.dataset_manager.views import FileIsBeingUsedError + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(hours=1)}), + self.assertRaises(FileIsBeingUsedError), + ): + mock_rq_job = MagicMock(timeout=5) + mock_rq_get_current_job.return_value = mock_rq_job + + export_path = export(dst_format=format_name, task_id=task_id) + file_ctime = parse_export_file_path(export_path).instance_timestamp + clear_export_cache(file_path=export_path, file_ctime=file_ctime, logger=MagicMock()) + + self.assertEqual(mock_rq_job.retries_left, 1) + self.assertEqual(len(mock_rq_job.retry_intervals), 1) + self.assertTrue(osp.isfile(export_path)) + + def test_cleanup_can_be_called_with_old_signature(self): + # Test RQ jobs for backward compatibility of API prior to the PR + # https://github.com/cvat-ai/cvat/pull/7864 + # Jobs referring to the old API can exist in the redis queues after the server is updated + + format_name = "CVAT for images 1.1" + images = self._generate_task_images(3) + task = self._create_task(tasks["main"], images) + self._create_annotations(task, f'{format_name} many jobs', "default") + task_id = task["id"] + + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.django_rq.get_scheduler'), + ): + mock_rq_get_current_job.return_value = MagicMock(timeout=5) + + export_path = export(dst_format=format_name, task_id=task_id) + + file_ctime = parse_export_file_path(export_path).instance_timestamp + old_kwargs = { + 'file_path': export_path, + 'file_ctime': file_ctime, + 'logger': MagicMock(), + } + + with ( + patch('cvat.apps.dataset_manager.views.rq.get_current_job') as mock_rq_get_current_job, + patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), + ): + mock_rq_get_current_job.return_value = MagicMock(timeout=5) + + clear_export_cache(**old_kwargs) + + self.assertFalse(osp.isfile(export_path)) + + class ProjectDumpUpload(_DbTestBase): def test_api_v2_export_import_dataset(self): test_name = self._testMethodName @@ -1311,6 +2068,8 @@ def test_api_v2_export_import_dataset(self): self._create_annotations(task, dump_format_name, "random") for user, edata in list(expected.items()): + self._clear_rq_jobs() # clean up from previous tests and iterations + user_name = edata['name'] file_zip_name = osp.join(test_dir, f'{test_name}_{user_name}_{dump_format_name}.zip') data = { @@ -1391,6 +2150,8 @@ def test_api_v2_export_annotations(self): url = self._generate_url_dump_project_annotations(project['id'], dump_format_name) for user, edata in list(expected.items()): + self._clear_rq_jobs() # clean up from previous tests and iterations + user_name = edata['name'] file_zip_name = osp.join(test_dir, f'{test_name}_{user_name}_{dump_format_name}.zip') data = { diff --git a/cvat/apps/dataset_manager/util.py b/cvat/apps/dataset_manager/util.py index 387b74d21777..064303f84f84 100644 --- a/cvat/apps/dataset_manager/util.py +++ b/cvat/apps/dataset_manager/util.py @@ -1,16 +1,28 @@ - # Copyright (C) 2019-2022 Intel Corporation +# Copyright (C) 2023-2024 CVAT.ai Corporation # # SPDX-License-Identifier: MIT -from copy import deepcopy -from typing import Sequence import inspect -import os, os.path as osp +import os +import os.path as osp +import re import zipfile - +from contextlib import contextmanager +from copy import deepcopy +from datetime import timedelta +from threading import Lock +from typing import Any, Generator, Optional, Sequence + +import attrs +import django_rq +from datumaro.util import to_snake_case +from datumaro.util.os_util import make_file_name from django.conf import settings from django.db import models +from pottery import Redlock + +from cvat.apps.engine.models import Job, Project, Task def current_function_name(depth=1): @@ -80,3 +92,140 @@ def deepcopy_simple(v): return v else: return deepcopy(v) + + +class LockNotAvailableError(Exception): + pass + + +def make_export_cache_lock_key(filename: os.PathLike[str]) -> str: + return f"export_lock:{os.fspath(filename)}" + + +@contextmanager +def get_export_cache_lock( + export_path: os.PathLike[str], + *, + ttl: int | timedelta, + block: bool = True, + acquire_timeout: Optional[int | timedelta] = None, +) -> Generator[Lock, Any, Any]: + if isinstance(acquire_timeout, timedelta): + acquire_timeout = acquire_timeout.total_seconds() + if acquire_timeout is not None and acquire_timeout < 0: + raise ValueError("acquire_timeout must be a non-negative number") + elif acquire_timeout is None: + acquire_timeout = -1 + + if isinstance(ttl, timedelta): + ttl = ttl.total_seconds() + if not ttl or ttl < 0: + raise ValueError("ttl must be a non-negative number") + + # https://redis.io/docs/latest/develop/use/patterns/distributed-locks/ + # The lock is exclusive, so it may potentially reduce performance in some cases, + # where parallel access is potentially possible and valid, + # e.g. dataset downloading could use a shared lock instead. + lock = Redlock( + key=make_export_cache_lock_key(export_path), + masters={django_rq.get_connection(settings.CVAT_QUEUES.EXPORT_DATA.value)}, + auto_release_time=ttl, + ) + acquired = lock.acquire(blocking=block, timeout=acquire_timeout) + try: + if acquired: + yield lock + else: + raise LockNotAvailableError + + finally: + if acquired: + lock.release() + + +EXPORT_CACHE_DIR_NAME = 'export_cache' + + +def get_export_cache_dir(db_instance: Project | Task | Job) -> str: + base_dir = osp.abspath(db_instance.get_dirname()) + + if osp.isdir(base_dir): + return osp.join(base_dir, EXPORT_CACHE_DIR_NAME) + else: + raise FileNotFoundError( + '{} dir {} does not exist'.format(db_instance.__class__.__name__, base_dir) + ) + + +def make_export_filename( + dst_dir: str, + save_images: bool, + instance_timestamp: float, + format_name: str, +) -> str: + from .formats.registry import EXPORT_FORMATS + file_ext = EXPORT_FORMATS[format_name].EXT + + filename = '%s-instance%f-%s.%s' % ( + 'dataset' if save_images else 'annotations', + # store the instance timestamp in the file name to reliably get this information + # ctime / mtime do not return file creation time on linux + # mtime is used for file usage checks + instance_timestamp, + make_file_name(to_snake_case(format_name)), + file_ext, + ) + return osp.join(dst_dir, filename) + + +@attrs.define +class ParsedExportFilename: + instance_type: str + has_images: bool + instance_timestamp: Optional[float] + format_repr: str + file_ext: str + + +def parse_export_file_path(file_path: os.PathLike[str]) -> ParsedExportFilename: + file_path = osp.normpath(file_path) + dirname, basename = osp.split(file_path) + + basename_match = re.fullmatch( + ( + r'(?Pdataset|annotations)' + r'(?:-instance(?P\d+\.\d+))?' # optional for backward compatibility + r'-(?P.+)' + r'\.(?P.+)' + ), + basename + ) + if not basename_match: + raise ValueError(f"Couldn't parse filename components in '{basename}'") + + dirname_match = re.search(rf'/(jobs|tasks|projects)/\d+/{EXPORT_CACHE_DIR_NAME}$', dirname) + if not dirname_match: + raise ValueError(f"Couldn't parse instance type in '{dirname}'") + + match dirname_match.group(1): + case 'jobs': + instance_type_name = 'job' + case 'tasks': + instance_type_name = 'task' + case 'projects': + instance_type_name = 'project' + case _: + assert False + + if 'instance_timestamp' in basename_match.groupdict(): + instance_timestamp = float(basename_match.group('instance_timestamp')) + else: + instance_timestamp = None + + return ParsedExportFilename( + instance_type=instance_type_name, + has_images=basename_match.group('export_mode') == 'dataset', + instance_timestamp=instance_timestamp, + format_repr=basename_match.group('format_tag'), + file_ext=basename_match.group('file_ext'), + ) diff --git a/cvat/apps/dataset_manager/views.py b/cvat/apps/dataset_manager/views.py index 41bc6ae1b671..53cbdd5c03b1 100644 --- a/cvat/apps/dataset_manager/views.py +++ b/cvat/apps/dataset_manager/views.py @@ -1,26 +1,32 @@ # Copyright (C) 2019-2022 Intel Corporation -# Copyright (C) 2023 CVAT.ai Corporation +# Copyright (C) 2023-2024 CVAT.ai Corporation # # SPDX-License-Identifier: MIT +import logging import os import os.path as osp import tempfile from datetime import timedelta import django_rq -from datumaro.util.os_util import make_file_name -from datumaro.util import to_snake_case -from django.utils import timezone +import rq from django.conf import settings +from django.utils import timezone -import cvat.apps.dataset_manager.task as task import cvat.apps.dataset_manager.project as project +import cvat.apps.dataset_manager.task as task from cvat.apps.engine.log import ServerLogManager -from cvat.apps.engine.models import Project, Task, Job +from cvat.apps.engine.models import Job, Project, Task from .formats.registry import EXPORT_FORMATS, IMPORT_FORMATS -from .util import current_function_name +from .util import ( + LockNotAvailableError, + current_function_name, get_export_cache_lock, + get_export_cache_dir, make_export_filename, + parse_export_file_path +) +from .util import EXPORT_CACHE_DIR_NAME # pylint: disable=unused-import slogger = ServerLogManager(__name__) @@ -32,78 +38,103 @@ def log_exception(logger=None, exc_info=True): (_MODULE_NAME, current_function_name(2)), exc_info=exc_info) +DEFAULT_CACHE_TTL = timedelta(seconds=settings.DATASET_CACHE_TTL) +PROJECT_CACHE_TTL = DEFAULT_CACHE_TTL / 3 +TASK_CACHE_TTL = DEFAULT_CACHE_TTL +JOB_CACHE_TTL = DEFAULT_CACHE_TTL +TTL_CONSTS = { + 'project': PROJECT_CACHE_TTL, + 'task': TASK_CACHE_TTL, + 'job': JOB_CACHE_TTL, +} -def get_export_cache_dir(db_instance): - base_dir = osp.abspath(db_instance.get_dirname()) +EXPORT_CACHE_LOCK_TIMEOUT = timedelta(seconds=settings.DATASET_CACHE_LOCK_TIMEOUT) +EXPORT_LOCKED_RETRY_INTERVAL = timedelta(seconds=settings.DATASET_EXPORT_LOCKED_RETRY_INTERVAL) - if osp.isdir(base_dir): - return osp.join(base_dir, 'export_cache') - else: - raise FileNotFoundError('{} dir {} does not exist'.format(db_instance.__class__.__name__, base_dir)) -DEFAULT_CACHE_TTL = timedelta(hours=10) -TASK_CACHE_TTL = DEFAULT_CACHE_TTL -PROJECT_CACHE_TTL = DEFAULT_CACHE_TTL / 3 -JOB_CACHE_TTL = DEFAULT_CACHE_TTL +def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> timedelta: + if isinstance(db_instance, (Project, Task, Job)): + db_instance = db_instance.__class__.__name__ + + return TTL_CONSTS[db_instance.lower()] + def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False): try: if task_id is not None: logger = slogger.task[task_id] - cache_ttl = TASK_CACHE_TTL export_fn = task.export_task db_instance = Task.objects.get(pk=task_id) elif project_id is not None: logger = slogger.project[project_id] - cache_ttl = PROJECT_CACHE_TTL export_fn = project.export_project db_instance = Project.objects.get(pk=project_id) else: logger = slogger.job[job_id] - cache_ttl = JOB_CACHE_TTL export_fn = task.export_job db_instance = Job.objects.get(pk=job_id) - cache_dir = get_export_cache_dir(db_instance) + cache_ttl = get_export_cache_ttl(db_instance) - exporter = EXPORT_FORMATS[dst_format] - output_base = '%s_%s' % ('dataset' if save_images else 'annotations', - make_file_name(to_snake_case(dst_format))) - output_path = '%s.%s' % (output_base, exporter.EXT) - output_path = osp.join(cache_dir, output_path) + cache_dir = get_export_cache_dir(db_instance) - instance_time = timezone.localtime(db_instance.updated_date).timestamp() + # As we're not locking the db object here, it can be updated by the time of actual export. + # The file will be saved with the older timestamp. + # When it's time to download the file, it will be handled - the export will be restarted. + # The situation is considered rare, so no locking is used. + instance_update_time = timezone.localtime(db_instance.updated_date) if isinstance(db_instance, Project): - tasks_update = list(map(lambda db_task: timezone.localtime( - db_task.updated_date).timestamp(), db_instance.tasks.all())) - instance_time = max(tasks_update + [instance_time]) - if not (osp.exists(output_path) and \ - instance_time <= osp.getmtime(output_path)): - os.makedirs(cache_dir, exist_ok=True) - with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir: - temp_file = osp.join(temp_dir, 'result') - export_fn(db_instance.id, temp_file, dst_format, - server_url=server_url, save_images=save_images) - os.replace(temp_file, output_path) - - archive_ctime = osp.getctime(output_path) - scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value) - cleaning_job = scheduler.enqueue_in(time_delta=cache_ttl, - func=clear_export_cache, - file_path=output_path, - file_ctime=archive_ctime, - logger=logger) - logger.info( - "The {} '{}' is exported as '{}' at '{}' " - "and available for downloading for the next {}. " - "Export cache cleaning job is enqueued, id '{}'".format( - db_instance.__class__.__name__.lower(), - db_instance.name if isinstance(db_instance, (Project, Task)) else db_instance.id, - dst_format, output_path, cache_ttl, - cleaning_job.id - )) + tasks_update = list(map( + lambda db_task: timezone.localtime(db_task.updated_date), + db_instance.tasks.all() + )) + instance_update_time = max(tasks_update + [instance_update_time]) + + output_path = make_export_filename( + cache_dir, save_images, instance_update_time.timestamp(), dst_format + ) + + os.makedirs(cache_dir, exist_ok=True) + + with get_export_cache_lock( + output_path, + block=True, + acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT, + ttl=rq.get_current_job().timeout, + ): + if not osp.exists(output_path): + with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir: + temp_file = osp.join(temp_dir, 'result') + export_fn(db_instance.id, temp_file, dst_format, + server_url=server_url, save_images=save_images) + os.replace(temp_file, output_path) + + scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value) + cleaning_job = scheduler.enqueue_in( + time_delta=cache_ttl, + func=clear_export_cache, + file_path=output_path, + file_ctime=instance_update_time.timestamp(), + logger=logger + ) + logger.info( + "The {} '{}' is exported as '{}' at '{}' " + "and available for downloading for the next {}. " + "Export cache cleaning job is enqueued, id '{}'".format( + db_instance.__class__.__name__.lower(), + db_instance.name if isinstance(db_instance, (Project, Task)) else db_instance.id, + dst_format, output_path, cache_ttl, + cleaning_job.id + ) + ) return output_path + except LockNotAvailableError: + # Need to retry later if the lock was not available + rq_job = rq.get_current_job() # the worker references the same object + rq_job.retries_left = 1 + rq_job.retry_intervals = [EXPORT_LOCKED_RETRY_INTERVAL.total_seconds()] + raise # should be handled by the worker except Exception: log_exception(logger) raise @@ -127,14 +158,46 @@ def export_project_annotations(project_id, dst_format=None, server_url=None): return export(dst_format, project_id=project_id, server_url=server_url, save_images=False) -def clear_export_cache(file_path, file_ctime, logger): +class FileIsBeingUsedError(Exception): + pass + +def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger) -> None: + # file_ctime is for backward compatibility with older RQ jobs, not needed now + try: - if osp.exists(file_path) and osp.getctime(file_path) == file_ctime: + with get_export_cache_lock( + file_path, + block=True, + acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT, + ttl=rq.get_current_job().timeout, + ): + if not osp.exists(file_path): + raise FileNotFoundError("Export cache file '{}' doesn't exist".format(file_path)) + + parsed_filename = parse_export_file_path(file_path) + cache_ttl = get_export_cache_ttl(parsed_filename.instance_type) + + if timezone.now().timestamp() <= osp.getmtime(file_path) + cache_ttl.total_seconds(): + # Need to retry later, the export is in use + rq_job = rq.get_current_job() # the worker references the same object + rq_job.retries_left = 1 + rq_job.retry_intervals = [cache_ttl.total_seconds()] + logger.info( + "Export cache file '{}' is recently accessed, will retry in {}".format( + file_path, cache_ttl + ) + ) + raise FileIsBeingUsedError # should be handled by the worker + + # TODO: maybe remove all outdated exports os.remove(file_path) - - logger.info( - "Export cache file '{}' successfully removed" \ - .format(file_path)) + logger.info("Export cache file '{}' successfully removed".format(file_path)) + except LockNotAvailableError: + # Need to retry later if the lock was not available + rq_job = rq.get_current_job() # the worker references the same object + rq_job.retries_left = 1 + rq_job.retry_intervals = [EXPORT_LOCKED_RETRY_INTERVAL.total_seconds()] + raise # should be handled by the worker except Exception: log_exception(logger) raise diff --git a/cvat/apps/engine/tests/test_rest_api_3D.py b/cvat/apps/engine/tests/test_rest_api_3D.py index 84247b84d484..fc8f8e95c26e 100644 --- a/cvat/apps/engine/tests/test_rest_api_3D.py +++ b/cvat/apps/engine/tests/test_rest_api_3D.py @@ -19,39 +19,19 @@ from django.contrib.auth.models import Group, User from rest_framework import status -from rest_framework.test import APIClient, APITestCase from cvat.apps.engine.media_extractors import ValidateDimension from cvat.apps.dataset_manager.task import TaskAnnotation from datumaro.util.test_utils import TestDir -from cvat.apps.engine.tests.utils import get_paginated_collection +from cvat.apps.engine.tests.utils import get_paginated_collection, ApiTestBase, ForceLogin CREATE_ACTION = "create" UPDATE_ACTION = "update" DELETE_ACTION = "delete" -class ForceLogin: - def __init__(self, user, client): - self.user = user - self.client = client - - def __enter__(self): - if self.user: - self.client.force_login(self.user, - backend='django.contrib.auth.backends.ModelBackend') - - return self - - def __exit__(self, exception_type, exception_value, traceback): - if self.user: - self.client.logout() - -class _DbTestBase(APITestCase): - def setUp(self): - self.client = APIClient() - +class _DbTestBase(ApiTestBase): @classmethod def setUpTestData(cls): cls.create_db_users() @@ -547,6 +527,8 @@ def test_api_v2_dump_and_upload_annotation(self): for user, edata in list(self.expected_dump_upload.items()): with self.subTest(format=f"{format_name}_{edata['name']}_dump"): + self._clear_rq_jobs() # clean up from previous tests and iterations + url = self._generate_url_dump_tasks_annotations(task_id) file_name = osp.join(test_dir, f"{format_name}_{edata['name']}.zip") @@ -740,6 +722,8 @@ def test_api_v2_export_dataset(self): for user, edata in list(self.expected_dump_upload.items()): with self.subTest(format=f"{format_name}_{edata['name']}_export"): + self._clear_rq_jobs() # clean up from previous tests and iterations + url = self._generate_url_dump_dataset(task_id) file_name = osp.join(test_dir, f"{format_name}_{edata['name']}.zip") diff --git a/cvat/apps/engine/tests/utils.py b/cvat/apps/engine/tests/utils.py index f0b71463a202..b884b3e9b4c4 100644 --- a/cvat/apps/engine/tests/utils.py +++ b/cvat/apps/engine/tests/utils.py @@ -1,4 +1,4 @@ -# Copyright (C) 2023 CVAT.ai Corporation +# Copyright (C) 2023-2024 CVAT.ai Corporation # # SPDX-License-Identifier: MIT @@ -9,11 +9,13 @@ import logging import os +from django.conf import settings from django.core.cache import caches from django.http.response import HttpResponse from PIL import Image from rest_framework.test import APIClient, APITestCase import av +import django_rq import numpy as np T = TypeVar('T') @@ -46,7 +48,53 @@ def __exit__(self, exception_type, exception_value, traceback): self.client.logout() +def clear_rq_jobs(): + for queue_name in settings.RQ_QUEUES: + queue = django_rq.get_queue(queue_name) + + # Remove actual jobs + queue.empty() + + # Clean up the registries + for registry in [ + queue.failed_job_registry, + queue.finished_job_registry, + queue.started_job_registry, + queue.scheduled_job_registry, + ]: + for job_id in registry.get_job_ids(): + registry.remove(job_id) + + # Remove orphaned jobs that can't be normally reported by DjangoRQ + # https://github.com/rq/django-rq/issues/73 + for key in queue.connection.keys('rq:job:*'): + job_id = key.decode().split('rq:job:', maxsplit=1)[1] + job = queue.fetch_job(job_id) + if not job: + # The job can belong to a different queue, using the same connection + continue + + job.delete() + + # Clean up the scheduler, if any + try: + scheduler = django_rq.get_scheduler(queue_name, queue) + except ImportError: + # If the scheduler is not enabled, an exception is thrown + continue + + try: + scheduler.acquire_lock() + for job in scheduler.get_jobs(): + scheduler.cancel(job) + finally: + scheduler.remove_lock() + + class ApiTestBase(APITestCase): + def _clear_rq_jobs(self): + clear_rq_jobs() + def setUp(self): super().setUp() self.client = APIClient() @@ -61,6 +109,9 @@ def tearDown(self): for cache in caches.all(initialized_only=True): cache.clear() + # Clear any remaining RQ jobs produced by the tests executed + self._clear_rq_jobs() + return super().tearDown() diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index 28bac7295fde..2ef73e29c6bb 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -1,5 +1,5 @@ # Copyright (C) 2018-2022 Intel Corporation -# Copyright (C) 2022-2023 CVAT.ai Corporation +# Copyright (C) 2022-2024 CVAT.ai Corporation # # SPDX-License-Identifier: MIT @@ -2960,20 +2960,24 @@ def _export_annotations( f"Unexpected location {location} specified for the request" ) - last_instance_update_time = timezone.localtime(db_instance.updated_date) + cache_ttl = dm.views.get_export_cache_ttl(db_instance) + instance_update_time = timezone.localtime(db_instance.updated_date) if isinstance(db_instance, Project): tasks_update = list(map(lambda db_task: timezone.localtime(db_task.updated_date), db_instance.tasks.all())) - last_instance_update_time = max(tasks_update + [last_instance_update_time]) + instance_update_time = max(tasks_update + [instance_update_time]) - timestamp = datetime.strftime(last_instance_update_time, "%Y_%m_%d_%H_%M_%S") + instance_timestamp = datetime.strftime(instance_update_time, "%Y_%m_%d_%H_%M_%S") is_annotation_file = rq_id.startswith('export:annotations') if rq_job: rq_request = rq_job.meta.get('request', None) request_time = rq_request.get('timestamp', None) if rq_request else None - if request_time is None or request_time < last_instance_update_time: - # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER - # we have to enqueue dependent jobs after canceling one + if request_time is None or request_time < instance_update_time: + # The result is outdated, need to restart the export. + # Cancel the current job. + # The new attempt will be made after the last existing job. + # In the case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER + # we have to enqueue dependent jobs after canceling one. rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) rq_job.delete() else: @@ -2986,35 +2990,71 @@ def _export_annotations( file_path = rq_job.return_value() if not file_path: - return Response('A result for exporting job was not found for finished RQ job', status=status.HTTP_500_INTERNAL_SERVER_ERROR) - elif not osp.exists(file_path): - return Response('The result file does not exist in export cache', status=status.HTTP_500_INTERNAL_SERVER_ERROR) - - if action == "download": - filename = filename or \ - build_annotations_file_name( - class_name=db_instance.__class__.__name__, - identifier=db_instance.name if isinstance(db_instance, (Task, Project)) else db_instance.id, - timestamp=timestamp, - format_name=format_name, - is_annotation_file=is_annotation_file, - extension=osp.splitext(file_path)[1] - ) - - rq_job.delete() - return sendfile(request, file_path, attachment=True, attachment_filename=filename) - - return Response(status=status.HTTP_201_CREATED) + return Response( + 'A result for exporting job was not found for finished RQ job', + status=status.HTTP_500_INTERNAL_SERVER_ERROR + ) + + with dm.util.get_export_cache_lock( + file_path, ttl=60, # request timeout + ): + if action == "download": + if not osp.exists(file_path): + return Response( + "The exported file has expired, please retry exporting", + status=status.HTTP_404_NOT_FOUND + ) + + filename = filename or \ + build_annotations_file_name( + class_name=db_instance.__class__.__name__, + identifier=db_instance.name if isinstance(db_instance, (Task, Project)) else db_instance.id, + timestamp=instance_timestamp, + format_name=format_name, + is_annotation_file=is_annotation_file, + extension=osp.splitext(file_path)[1] + ) + + rq_job.delete() + return sendfile(request, file_path, attachment=True, attachment_filename=filename) + else: + if osp.exists(file_path): + # Update last update time to prolong the export lifetime + # as the last access time is not available on every filesystem + os.utime(file_path, None) + + return Response(status=status.HTTP_201_CREATED) + else: + # Cancel and reenqueue the job. + # The new attempt will be made after the last existing job. + # In the case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER + # we have to enqueue dependent jobs after canceling one. + rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) + rq_job.delete() else: raise NotImplementedError(f"Export to {location} location is not implemented yet") elif rq_job.is_failed: exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info)) rq_job.delete() - return Response(exc_info, - status=status.HTTP_500_INTERNAL_SERVER_ERROR) + return Response(exc_info, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + elif rq_job.is_deferred and rq_id not in queue.deferred_job_registry.get_job_ids(): + # Sometimes jobs can depend on outdated jobs in the deferred jobs registry. + # They can be fetched by their specific ids, but are not listed by get_job_ids(). + # Supposedly, this can happen because of the server restarts + # (potentially, because the redis used for the queue is inmemory). + # Another potential reason is canceling without enqueueing dependents. + # Such dependencies are never removed or finished, + # as there is no TTL for deferred jobs, + # so the current job can be blocked indefinitely. + + # Cancel the current job and then reenqueue it, considering the current situation. + # The new attempt will be made after the last existing job. + # In the case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER + # we have to enqueue dependent jobs after canceling one. + rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) + rq_job.delete() else: return Response(status=status.HTTP_202_ACCEPTED) - try: if request.scheme: server_address = request.scheme + '://' @@ -3022,12 +3062,6 @@ def _export_annotations( except Exception: server_address = None - TTL_CONSTS = { - 'project': dm.views.PROJECT_CACHE_TTL, - 'task': dm.views.TASK_CACHE_TTL, - 'job': dm.views.JOB_CACHE_TTL, - } - ttl = TTL_CONSTS[db_instance.__class__.__name__.lower()].total_seconds() user_id = request.user.id func = callback if location == Location.LOCAL else export_resource_to_cloud_storage @@ -3047,7 +3081,7 @@ def _export_annotations( filename_pattern = build_annotations_file_name( class_name=db_instance.__class__.__name__, identifier=db_instance.name if isinstance(db_instance, (Task, Project)) else db_instance.id, - timestamp=timestamp, + timestamp=instance_timestamp, format_name=format_name, is_annotation_file=is_annotation_file, ) @@ -3062,8 +3096,8 @@ def _export_annotations( job_id=rq_id, meta=get_rq_job_meta(request=request, db_obj=db_instance), depends_on=define_dependent_job(queue, user_id, rq_id=rq_id), - result_ttl=ttl, - failure_ttl=ttl, + result_ttl=cache_ttl.total_seconds(), + failure_ttl=cache_ttl.total_seconds(), ) handle_dataset_export(db_instance, diff --git a/cvat/requirements/base.in b/cvat/requirements/base.in index 8df954a13b25..8c652d6f6048 100644 --- a/cvat/requirements/base.in +++ b/cvat/requirements/base.in @@ -38,6 +38,7 @@ patool==1.12 pdf2image==1.14.0 Pillow>=10.3.0 +pottery~=3.0 psutil==5.9.4 psycopg2-binary==2.9.5 python-ldap==3.4.3 diff --git a/cvat/requirements/base.txt b/cvat/requirements/base.txt index 9aaa38230250..fbfbd4dd3a31 100644 --- a/cvat/requirements/base.txt +++ b/cvat/requirements/base.txt @@ -1,4 +1,4 @@ -# SHA1:dd0cf6b5e35e6d492749b679dccd43a87410571e +# SHA1:72d30d3a78182797ba879af4b7b50f17ecaf2e8f # # This file is autogenerated by pip-compile-multi # To update, run: @@ -6,6 +6,7 @@ # pip-compile-multi # -r ../../utils/dataset_manifest/requirements.txt + asgiref==3.8.1 # via django async-timeout==4.0.3 @@ -50,7 +51,7 @@ coreschema==0.0.4 # via coreapi crontab==1.0.1 # via rq-scheduler -cryptography==42.0.5 +cryptography==42.0.7 # via # azure-storage-blob # pyjwt @@ -68,7 +69,7 @@ dj-pagination==2.5.0 # via -r cvat/requirements/base.in dj-rest-auth[with_social]==5.0.2 # via -r cvat/requirements/base.in -django==4.2.11 +django==4.2.13 # via # -r cvat/requirements/base.in # dj-rest-auth @@ -99,7 +100,7 @@ django-crum==0.7.9 # via -r cvat/requirements/base.in django-filter==2.4.0 # via -r cvat/requirements/base.in -django-health-check==3.18.1 +django-health-check==3.18.2 # via -r cvat/requirements/base.in django-rq==2.8.1 # via -r cvat/requirements/base.in @@ -118,7 +119,7 @@ entrypoint2==1.1 # via pyunpack fonttools==4.51.0 # via matplotlib -freezegun==1.5.0 +freezegun==1.5.1 # via rq-scheduler furl==2.1.0 # via -r cvat/requirements/base.in @@ -157,7 +158,7 @@ isodate==0.6.1 # python3-saml itypes==1.2.0 # via coreapi -jinja2==3.1.3 +jinja2==3.1.4 # via coreschema jmespath==0.10.0 # via @@ -167,9 +168,9 @@ jsonschema==4.17.3 # via drf-spectacular kiwisolver==1.4.5 # via matplotlib -limits==3.11.0 +limits==3.12.0 # via python-logstash-async -lxml==5.2.1 +lxml==5.2.2 # via # -r cvat/requirements/base.in # datumaro @@ -183,6 +184,8 @@ matplotlib==3.8.4 # via # datumaro # pycocotools +mmh3==4.1.0 + # via pottery msrest==0.7.1 # via azure-storage-blob networkx==3.3 @@ -193,7 +196,7 @@ oauthlib==3.2.2 # via requests-oauthlib orderedmultidict==1.0.1 # via furl -orjson==3.10.2 +orjson==3.10.3 # via datumaro packaging==24.0 # via @@ -207,6 +210,8 @@ patool==1.12 # via -r cvat/requirements/base.in pdf2image==1.14.0 # via -r cvat/requirements/base.in +pottery==3.0.0 + # via -r cvat/requirements/base.in proto-plus==1.23.0 # via google-api-core protobuf==4.25.3 @@ -274,6 +279,7 @@ redis==4.5.4 # via # -r cvat/requirements/base.in # django-rq + # pottery # rq requests==2.31.0 # via @@ -331,6 +337,7 @@ typing-extensions==4.11.0 # azure-core # datumaro # limits + # pottery tzdata==2024.1 # via pandas uritemplate==4.1.1 @@ -348,7 +355,7 @@ xmlsec==1.3.14 # via # -r cvat/requirements/base.in # python3-saml -zipp==3.18.1 +zipp==3.18.2 # via importlib-metadata zstandard==0.22.0 # via clickhouse-connect diff --git a/cvat/requirements/development.txt b/cvat/requirements/development.txt index 459f8ccf154f..52a87e9fb598 100644 --- a/cvat/requirements/development.txt +++ b/cvat/requirements/development.txt @@ -6,6 +6,7 @@ # pip-compile-multi # -r base.txt + astroid==2.11.7 # via pylint autopep8==2.1.0 @@ -30,7 +31,7 @@ mypy-extensions==1.0.0 # via black pathspec==0.12.1 # via black -platformdirs==4.2.1 +platformdirs==4.2.2 # via # black # pylint @@ -56,7 +57,7 @@ tomli==2.0.1 # autopep8 # black # pylint -tomlkit==0.12.4 +tomlkit==0.12.5 # via pylint tornado==6.4 # via snakeviz diff --git a/cvat/requirements/testing.in b/cvat/requirements/testing.in index 52aef23f3a99..4398c9efedaf 100644 --- a/cvat/requirements/testing.in +++ b/cvat/requirements/testing.in @@ -1,4 +1,4 @@ -r development.in coverage==7.2.3 -fakeredis==2.10.3 +fakeredis[lua]==2.10.3 diff --git a/cvat/requirements/testing.txt b/cvat/requirements/testing.txt index f746e59ede46..cf7f07c185de 100644 --- a/cvat/requirements/testing.txt +++ b/cvat/requirements/testing.txt @@ -1,4 +1,4 @@ -# SHA1:429cfd9ce2f6b66fbb7c898a5c6279d9d8a61335 +# SHA1:6ed6047b6ebd6295b732838ffe76f05505bfc34a # # This file is autogenerated by pip-compile-multi # To update, run: @@ -6,10 +6,13 @@ # pip-compile-multi # -r development.txt + coverage==7.2.3 # via -r cvat/requirements/testing.in -fakeredis==2.10.3 +fakeredis[lua]==2.10.3 # via -r cvat/requirements/testing.in +lupa==1.14.1 + # via fakeredis sortedcontainers==2.4.0 # via fakeredis diff --git a/tests/python/rest_api/test_resource_import_export.py b/tests/python/rest_api/test_resource_import_export.py index 78b1d0cf1b26..833661fcfab8 100644 --- a/tests/python/rest_api/test_resource_import_export.py +++ b/tests/python/rest_api/test_resource_import_export.py @@ -1,5 +1,5 @@ # Copyright (C) 2021-2022 Intel Corporation -# Copyright (C) 2022-2023 CVAT.ai Corporation +# Copyright (C) 2022-2024 CVAT.ai Corporation # # SPDX-License-Identifier: MIT @@ -29,6 +29,7 @@ def _make_client(): @pytest.mark.usefixtures("restore_db_per_class") class TestExportResourceToS3(_S3ResourceTest): + @pytest.mark.usefixtures("restore_redis_inmem_per_function") @pytest.mark.parametrize("cloud_storage_id", [3]) @pytest.mark.parametrize( "obj_id, obj, resource", @@ -53,6 +54,7 @@ def test_save_resource_to_cloud_storage_with_specific_location( self._export_resource(cloud_storage, obj_id, obj, resource, **kwargs) + @pytest.mark.usefixtures("restore_redis_inmem_per_function") @pytest.mark.parametrize("user_type", ["admin", "assigned_supervisor_org_member"]) @pytest.mark.parametrize( "obj_id, obj, resource", @@ -177,6 +179,7 @@ def test_user_cannot_export_to_cloud_storage_with_specific_location_without_acce @pytest.mark.usefixtures("restore_db_per_function") @pytest.mark.usefixtures("restore_cvat_data") class TestImportResourceFromS3(_S3ResourceTest): + @pytest.mark.usefixtures("restore_redis_inmem_per_function") @pytest.mark.parametrize("cloud_storage_id", [3]) @pytest.mark.parametrize( "obj_id, obj, resource", @@ -201,6 +204,7 @@ def test_import_resource_from_cloud_storage_with_specific_location( self._export_resource(cloud_storage, obj_id, obj, resource, **export_kwargs) self._import_resource(cloud_storage, resource, obj_id, obj, **kwargs) + @pytest.mark.usefixtures("restore_redis_inmem_per_function") @pytest.mark.parametrize( "user_type", ["admin", "assigned_supervisor_org_member"], diff --git a/utils/dataset_manifest/requirements.txt b/utils/dataset_manifest/requirements.txt index 0edc970086de..c69cb3045079 100644 --- a/utils/dataset_manifest/requirements.txt +++ b/utils/dataset_manifest/requirements.txt @@ -1,4 +1,4 @@ -# SHA1:c60d1ed19f53b618c5528cd4b4fc708bc7ba404f +# SHA1:3671835f743ca6c6c8d49b36eda2bb7e0763fa0b # # This file is autogenerated by pip-compile-multi # To update, run: @@ -15,5 +15,5 @@ opencv-python-headless==4.9.0.80 # via -r utils/dataset_manifest/requirements.in pillow==10.3.0 # via -r utils/dataset_manifest/requirements.in -tqdm==4.66.2 +tqdm==4.66.4 # via -r utils/dataset_manifest/requirements.in