From f7fd06c658b40372fbe8586a71b7d412f564bfd7 Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Fri, 2 Jun 2023 09:54:07 +0100 Subject: [PATCH] Fixed resources import (#5909) Fixed: - wrong location of tmp file when importing job annotations - ```Traceback (most recent call last): File "/home/maya/Documents/cvat/.env/lib/python3.8/site-packages/rq/worker.py", line 795, in work self.execute_job(job, queue) File "/home/maya/Documents/cvat/cvat/rqworker.py", line 37, in execute_job return self.perform_job(*args, **kwargs) File "/home/maya/Documents/cvat/.env/lib/python3.8/site-packages/rq/worker.py", line 1389, in perform_job self.handle_exception(job, *exc_info) File "/home/maya/Documents/cvat/.env/lib/python3.8/site-packages/rq/worker.py", line 1438, in handle_exception fallthrough = handler(job, *exc_info) File "/home/maya/Documents/cvat/cvat/apps/engine/views.py", line 2233, in rq_exception_handler rq_job.exc_info = "".join( AttributeError: can't set attribute ``` Resolves https://github.com/opencv/cvat/issues/5773 Resolves https://github.com/opencv/cvat/issues/5563 - root causes of the issues: - the annotation file was uploaded to the server by tus protocol and rq job was created but no one next requests for checking status were not made. (e.g. user closed the browser tab) - the annotation file was uploaded to the server by tus protocol but rq job has not yet been created (e.g cvat instance restarted) - tasks/projects creation from backups with the same name at the same time by different users Co-authored-by: Roman Donchenko Co-authored-by: Maxim Zhiltsov --- .vscode/launch.json | 23 ++ CHANGELOG.md | 7 + cvat-core/src/server-proxy.ts | 18 +- cvat-sdk/cvat_sdk/core/proxies/tasks.py | 2 +- cvat-sdk/cvat_sdk/core/uploading.py | 27 ++- cvat/apps/dataset_manager/project.py | 12 +- cvat/apps/dataset_manager/task.py | 18 +- .../dataset_manager/tests/test_formats.py | 5 +- cvat/apps/dataset_manager/views.py | 1 - cvat/apps/engine/backup.py | 36 +-- cvat/apps/engine/handlers.py | 22 ++ cvat/apps/engine/location.py | 4 +- cvat/apps/engine/mixins.py | 48 +++- cvat/apps/engine/serializers.py | 7 +- cvat/apps/engine/tests/test_rest_api.py | 7 +- cvat/apps/engine/utils.py | 57 ++++- cvat/apps/engine/views.py | 229 ++++++++++++++---- cvat/apps/events/export.py | 2 +- cvat/requirements/base.in | 2 +- cvat/requirements/base.txt | 4 +- cvat/schema.yml | 128 +++++++++- cvat/settings/base.py | 13 +- cvat/settings/testing_rest.py | 2 + docker-compose.yml | 1 + supervisord/utils.conf | 9 + tests/python/pytest.ini | 1 - tests/python/rest_api/test_projects.py | 4 +- tests/python/rest_api/test_tasks.py | 113 ++++++++- tests/python/sdk/test_tasks.py | 2 +- tests/python/shared/fixtures/init.py | 18 +- tests/python/shared/utils/config.py | 4 + .../shared/utils/resource_import_export.py | 20 +- tests/values.test.yaml | 4 + 33 files changed, 714 insertions(+), 136 deletions(-) create mode 100644 cvat/apps/engine/handlers.py diff --git a/.vscode/launch.json b/.vscode/launch.json index 0b02a0110cad..884ed02c3b6f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -270,6 +270,28 @@ "env": {}, "console": "internalConsole" }, + { + "name": "server: RQ - cleaning", + "type": "python", + "request": "launch", + "stopOnEntry": false, + "justMyCode": false, + "python": "${command:python.interpreterPath}", + "program": "${workspaceRoot}/manage.py", + "args": [ + "rqworker", + "cleaning", + "--worker-class", + "cvat.rqworker.SimpleWorker" + ], + "django": true, + "cwd": "${workspaceFolder}", + "env": { + "DJANGO_LOG_SERVER_HOST": "localhost", + "DJANGO_LOG_SERVER_PORT": "8282" + }, + "console": "internalConsole" + }, { "name": "server: migrate", "type": "python", @@ -433,6 +455,7 @@ "server: RQ - annotation", "server: RQ - webhooks", "server: RQ - scheduler", + "server: RQ - cleaning", "server: git", ] } diff --git a/CHANGELOG.md b/CHANGELOG.md index 26608ea2d1ae..363130ed983e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support task creation with any type of data supported by the server by default from cloud storage without use_cache option () - Support task creation with cloud storage data and without use_cache option () +- Cleaning worker to check that the uploaded resource has been deleted or delete otherwise () ### Changed - Resource links are opened from any organization/sandbox if available for user () @@ -31,6 +32,12 @@ without use_cache option () ### Fixed - Skeletons dumping on created tasks/projects () - Fix saving annotations for skeleton tracks () +- Wrong location of tmp file when importing job annotations () +- Removing uploaded file with annotations/backups when rq job was created +but no next requests for checking status were not made () +- Removing uploaded file with annotations/backups after file was uploaded to the server by tus protocol +but rq job has not yet been created () +- Tasks/projects creation from backups with the same name at the same time by different users () ### Security - TDB diff --git a/cvat-core/src/server-proxy.ts b/cvat-core/src/server-proxy.ts index 1e7d4d205a23..35d15f2501f5 100644 --- a/cvat-core/src/server-proxy.ts +++ b/cvat-core/src/server-proxy.ts @@ -783,13 +783,14 @@ async function importDataset( }; const url = `${backendAPI}/projects/${id}/dataset`; + let rqId: string; async function wait() { return new Promise((resolve, reject) => { async function requestStatus() { try { const response = await Axios.get(url, { - params: { ...params, action: 'import_status' }, + params: { ...params, action: 'import_status', rq_id: rqId }, }); if (response.status === 202) { if (response.data.message) { @@ -812,10 +813,11 @@ async function importDataset( if (isCloudStorage) { try { - await Axios.post(url, + const response = await Axios.post(url, new FormData(), { params, }); + rqId = response.data.rq_id; } catch (errorData) { throw generateError(errorData); } @@ -837,11 +839,12 @@ async function importDataset( headers: { 'Upload-Start': true }, }); await chunkUpload(file, uploadConfig); - await Axios.post(url, + const response = await Axios.post(url, new FormData(), { params, headers: { 'Upload-Finish': true }, }); + rqId = response.data.rq_id; } catch (errorData) { throw generateError(errorData); } @@ -1617,6 +1620,7 @@ async function uploadAnnotations( filename: typeof file === 'string' ? file : file.name, conv_mask_to_poly: options.convMaskToPoly, }; + let rqId: string; const url = `${backendAPI}/${session}s/${id}/annotations`; async function wait() { @@ -1627,7 +1631,7 @@ async function uploadAnnotations( url, new FormData(), { - params, + params: { ...params, rq_id: rqId }, }, ); if (response.status === 202) { @@ -1646,10 +1650,11 @@ async function uploadAnnotations( if (isCloudStorage) { try { - await Axios.post(url, + const response = await Axios.post(url, new FormData(), { params, }); + rqId = response.data.rq_id; } catch (errorData) { throw generateError(errorData); } @@ -1667,11 +1672,12 @@ async function uploadAnnotations( headers: { 'Upload-Start': true }, }); await chunkUpload(file, uploadConfig); - await Axios.post(url, + const response = await Axios.post(url, new FormData(), { params, headers: { 'Upload-Finish': true }, }); + rqId = response.data.rq_id; } catch (errorData) { throw generateError(errorData); } diff --git a/cvat-sdk/cvat_sdk/core/proxies/tasks.py b/cvat-sdk/cvat_sdk/core/proxies/tasks.py index 74e2b5fbac2c..29e84dc3545d 100644 --- a/cvat-sdk/cvat_sdk/core/proxies/tasks.py +++ b/cvat-sdk/cvat_sdk/core/proxies/tasks.py @@ -450,6 +450,6 @@ def create_from_backup( ) task_id = json.loads(response.data)["id"] - self._client.logger.info(f"Task has been imported sucessfully. Task ID: {task_id}") + self._client.logger.info(f"Task has been imported successfully. Task ID: {task_id}") return self.retrieve(task_id) diff --git a/cvat-sdk/cvat_sdk/core/uploading.py b/cvat-sdk/cvat_sdk/core/uploading.py index ceacda782ce3..e4635f69d88e 100644 --- a/cvat-sdk/cvat_sdk/core/uploading.py +++ b/cvat-sdk/cvat_sdk/core/uploading.py @@ -4,6 +4,7 @@ from __future__ import annotations +import json import os from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple @@ -89,6 +90,7 @@ def create_url(self): headers["upload-length"] = str(self.file_size) headers["upload-metadata"] = ",".join(self.encode_metadata()) resp = self._api_client.rest_client.POST(self.client.url, headers=headers) + self.real_filename = resp.headers.get("Upload-Filename") url = resp.headers.get("location") if url is None: msg = "Attempt to retrieve create file url with status {}".format(resp.status_code) @@ -179,9 +181,10 @@ def upload_file( assert meta["filename"] self._tus_start_upload(url, query_params=query_params) - self._upload_file_data_with_tus( + real_filename = self._upload_file_data_with_tus( url=url, filename=filename, meta=meta, pbar=pbar, logger=logger ) + query_params["filename"] = real_filename return self._tus_finish_upload(url, query_params=query_params, fields=fields) def _wait_for_completion( @@ -216,7 +219,9 @@ def _make_tus_uploader(api_client: ApiClient, url: str, **kwargs): return _MyTusUploader(client=client, api_client=api_client, **kwargs) - def _upload_file_data_with_tus(self, url, filename, *, meta=None, pbar=None, logger=None): + def _upload_file_data_with_tus( + self, url, filename, *, meta=None, pbar=None, logger=None + ) -> str: file_size = filename.stat().st_size if pbar is None: pbar = NullProgressReporter() @@ -233,6 +238,7 @@ def _upload_file_data_with_tus(self, url, filename, *, meta=None, pbar=None, log log_func=logger, ) tus_uploader.upload() + return tus_uploader.real_filename def _tus_start_upload(self, url, *, query_params=None): response = self._client.api_client.rest_client.POST( @@ -273,17 +279,21 @@ def upload_file_and_wait( ): url = self._client.api_map.make_endpoint_url(endpoint.path, kwsub=url_params) params = {"format": format_name, "filename": filename.name} - self.upload_file( + response = self.upload_file( url, filename, pbar=pbar, query_params=params, meta={"filename": params["filename"]} ) + rq_id = json.loads(response.data).get("rq_id") + assert rq_id, "The rq_id was not found in the response" + params["rq_id"] = rq_id + self._wait_for_completion( url, success_status=201, positive_statuses=[202], status_check_period=status_check_period, query_params=params, - method="POST", + method="PUT", ) @@ -301,12 +311,17 @@ def upload_file_and_wait( ): url = self._client.api_map.make_endpoint_url(upload_endpoint.path, kwsub=url_params) params = {"format": format_name, "filename": filename.name} - self.upload_file( + response = self.upload_file( url, filename, pbar=pbar, query_params=params, meta={"filename": params["filename"]} ) + rq_id = json.loads(response.data).get("rq_id") + assert rq_id, "The rq_id was not found in the response" url = self._client.api_map.make_endpoint_url(retrieve_endpoint.path, kwsub=url_params) - params = {"action": "import_status"} + params = { + "action": "import_status", + "rq_id": rq_id, + } self._wait_for_completion( url, success_status=201, diff --git a/cvat/apps/dataset_manager/project.py b/cvat/apps/dataset_manager/project.py index e52fb2ebab88..bf621fff7c9a 100644 --- a/cvat/apps/dataset_manager/project.py +++ b/cvat/apps/dataset_manager/project.py @@ -7,6 +7,7 @@ from tempfile import TemporaryDirectory import rq from typing import Any, Callable, List, Mapping, Tuple +from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError from django.db import transaction @@ -16,7 +17,7 @@ from cvat.apps.dataset_manager.task import TaskAnnotation from .annotation import AnnotationIR -from .bindings import ProjectData, load_dataset_data +from .bindings import ProjectData, load_dataset_data, CvatImportError from .formats.registry import make_exporter, make_importer def export_project(project_id, dst_file, format_name, @@ -160,7 +161,7 @@ def data(self) -> dict: raise NotImplementedError() @transaction.atomic -def import_dataset_as_project(project_id, dataset_file, format_name, conv_mask_to_poly): +def import_dataset_as_project(src_file, project_id, format_name, conv_mask_to_poly): rq_job = rq.get_current_job() rq_job.meta['status'] = 'Dataset import has been started...' rq_job.meta['progress'] = 0. @@ -170,5 +171,8 @@ def import_dataset_as_project(project_id, dataset_file, format_name, conv_mask_t project.init_from_db() importer = make_importer(format_name) - with open(dataset_file, 'rb') as f: - project.import_dataset(f, importer, conv_mask_to_poly=conv_mask_to_poly) + with open(src_file, 'rb') as f: + try: + project.import_dataset(f, importer, conv_mask_to_poly=conv_mask_to_poly) + except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex: + raise CvatImportError(str(ex)) diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index c596db9920de..452a93505f90 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -8,6 +8,7 @@ from copy import deepcopy from enum import Enum from tempfile import TemporaryDirectory +from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError from django.db import transaction from django.db.models.query import Prefetch @@ -19,11 +20,10 @@ from cvat.apps.profiler import silk_profile from .annotation import AnnotationIR, AnnotationManager -from .bindings import JobData, TaskData +from .bindings import JobData, TaskData, CvatImportError from .formats.registry import make_exporter, make_importer from .util import bulk_create - class dotdict(OrderedDict): """dot.notation access to dictionary attributes""" __getattr__ = OrderedDict.get @@ -853,19 +853,25 @@ def export_task(task_id, dst_file, format_name, server_url=None, save_images=Fal task.export(f, exporter, host=server_url, save_images=save_images) @transaction.atomic -def import_task_annotations(task_id, src_file, format_name, conv_mask_to_poly): +def import_task_annotations(src_file, task_id, format_name, conv_mask_to_poly): task = TaskAnnotation(task_id) task.init_from_db() importer = make_importer(format_name) with open(src_file, 'rb') as f: - task.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly) + try: + task.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly) + except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex: + raise CvatImportError(str(ex)) @transaction.atomic -def import_job_annotations(job_id, src_file, format_name, conv_mask_to_poly): +def import_job_annotations(src_file, job_id, format_name, conv_mask_to_poly): job = JobAnnotation(job_id) job.init_from_db() importer = make_importer(format_name) with open(src_file, 'rb') as f: - job.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly) + try: + job.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly) + except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex: + raise CvatImportError(str(ex)) diff --git a/cvat/apps/dataset_manager/tests/test_formats.py b/cvat/apps/dataset_manager/tests/test_formats.py index 86ef91efc85b..e1fa84d874d3 100644 --- a/cvat/apps/dataset_manager/tests/test_formats.py +++ b/cvat/apps/dataset_manager/tests/test_formats.py @@ -923,8 +923,7 @@ def _test_can_import_annotations(self, task, import_format): expected_ann = TaskAnnotation(task["id"]) expected_ann.init_from_db() - dm.task.import_task_annotations(task["id"], - file_path, import_format, True) + dm.task.import_task_annotations(file_path, task["id"], import_format, True) actual_ann = TaskAnnotation(task["id"]) actual_ann.init_from_db() @@ -976,6 +975,6 @@ def test_can_import_mots_annotations_with_splited_masks(self): task.update() task = self._create_task(task, images) - dm.task.import_task_annotations(task['id'], dataset_path, format_name, True) + dm.task.import_task_annotations(dataset_path, task['id'], format_name, True) self._test_can_import_annotations(task, format_name) diff --git a/cvat/apps/dataset_manager/views.py b/cvat/apps/dataset_manager/views.py index ba133cc69953..5d652bf7285d 100644 --- a/cvat/apps/dataset_manager/views.py +++ b/cvat/apps/dataset_manager/views.py @@ -44,7 +44,6 @@ def get_export_cache_dir(db_instance): PROJECT_CACHE_TTL = DEFAULT_CACHE_TTL / 3 JOB_CACHE_TTL = DEFAULT_CACHE_TTL - def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False): try: if task_id is not None: diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index 3f519edede39..18c28b6420d2 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -33,8 +33,10 @@ from cvat.apps.engine.log import slogger from cvat.apps.engine.serializers import (AttributeSerializer, DataSerializer, LabelSerializer, LabeledDataSerializer, SegmentSerializer, SimpleJobSerializer, TaskReadSerializer, - ProjectReadSerializer, ProjectFileSerializer, TaskFileSerializer) -from cvat.apps.engine.utils import av_scan_paths, process_failed_job, configure_dependent_job, get_rq_job_meta + ProjectReadSerializer, ProjectFileSerializer, TaskFileSerializer, RqIdSerializer) +from cvat.apps.engine.utils import ( + av_scan_paths, process_failed_job, configure_dependent_job, get_rq_job_meta, get_import_rq_id, import_resource_with_clean_up_after +) from cvat.apps.engine.models import ( StorageChoice, StorageMethodChoice, DataChoice, Task, Project, Location) from cvat.apps.engine.task import JobFileMapping, _create_thread @@ -47,7 +49,6 @@ class Version(Enum): V1 = '1.0' - def _get_label_mapping(db_labels): label_mapping = {db_label.id: db_label.name for db_label in db_labels} for db_label in db_labels: @@ -869,7 +870,7 @@ def export(db_instance, request, queue_name): if os.path.exists(file_path): return Response(status=status.HTTP_201_CREATED) elif rq_job.is_failed: - exc_info = str(rq_job.exc_info) + exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info)) rq_job.delete() return Response(exc_info, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @@ -896,6 +897,9 @@ def _download_file_from_bucket(db_storage, filename, key): def _import(importer, request, queue, rq_id, Serializer, file_field_name, location_conf, filename=None): rq_job = queue.fetch_job(rq_id) + if (user_id_from_meta := getattr(rq_job, 'meta', {}).get('user', {}).get('id')) and user_id_from_meta != request.user.id: + return Response(status=status.HTTP_403_FORBIDDEN) + if not rq_job: org_id = getattr(request.iam_context['organization'], 'id', None) dependent_job = None @@ -939,22 +943,25 @@ def _import(importer, request, queue, rq_id, Serializer, file_field_name, locati filename=filename, key=key, request=request, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) rq_job = queue.enqueue_call( - func=importer, - args=(filename, request.user.id, org_id), + func=import_resource_with_clean_up_after, + args=(importer, filename, request.user.id, org_id), job_id=rq_id, meta={ 'tmp_file': filename, **get_rq_job_meta(request=request, db_obj=None) }, - depends_on=dependent_job + depends_on=dependent_job, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) else: if rq_job.is_finished: project_id = rq_job.return_value - os.remove(rq_job.meta['tmp_file']) rq_job.delete() return Response({'id': project_id}, status=status.HTTP_201_CREATED) elif rq_job.is_failed or \ @@ -971,7 +978,10 @@ def _import(importer, request, queue, rq_id, Serializer, file_field_name, locati return Response(data=exc_info, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - return Response({'rq_id': rq_id}, status=status.HTTP_202_ACCEPTED) + serializer = RqIdSerializer(data={'rq_id': rq_id}) + serializer.is_valid(raise_exception=True) + + return Response(serializer.data, status=status.HTTP_202_ACCEPTED) def get_backup_dirname(): return settings.TMP_FILES_ROOT @@ -980,7 +990,7 @@ def import_project(request, queue_name, filename=None): if 'rq_id' in request.data: rq_id = request.data['rq_id'] else: - rq_id = f"import:project.{uuid.uuid4()}-by-{request.user}" + rq_id = get_import_rq_id('project', uuid.uuid4(), 'backup', request.user) Serializer = ProjectFileSerializer file_field_name = 'project_file' @@ -1003,10 +1013,8 @@ def import_project(request, queue_name, filename=None): ) def import_task(request, queue_name, filename=None): - if 'rq_id' in request.data: - rq_id = request.data['rq_id'] - else: - rq_id = f"import:task.{uuid.uuid4()}-by-{request.user}" + rq_id = request.data.get('rq_id', get_import_rq_id('task', uuid.uuid4(), 'backup', request.user)) + Serializer = TaskFileSerializer file_field_name = 'task_file' diff --git a/cvat/apps/engine/handlers.py b/cvat/apps/engine/handlers.py new file mode 100644 index 000000000000..3253957dd3e0 --- /dev/null +++ b/cvat/apps/engine/handlers.py @@ -0,0 +1,22 @@ +# Copyright (C) 2023 CVAT.ai Corporation +# +# SPDX-License-Identifier: MIT + +from pathlib import Path +from time import time +from django.conf import settings +from cvat.apps.engine.log import slogger + + +def clear_import_cache(path: Path, creation_time: float) -> None: + """ + This function checks and removes the import files if they have not been removed from rq import jobs. + This means that for some reason file was uploaded to CVAT server but rq import job was not created. + + Args: + path (Path): path to file + creation_time (float): file creation time + """ + if path.is_file() and (time() - creation_time + 1) >= settings.IMPORT_CACHE_CLEAN_DELAY.total_seconds(): + path.unlink() + slogger.glob.warning(f"The file {str(path)} was removed from cleaning job.") diff --git a/cvat/apps/engine/location.py b/cvat/apps/engine/location.py index e463370bca6c..d2dc669f86ef 100644 --- a/cvat/apps/engine/location.py +++ b/cvat/apps/engine/location.py @@ -5,7 +5,7 @@ from enum import Enum from typing import Any, Dict -from cvat.apps.engine.models import Location +from cvat.apps.engine.models import Location, Job class StorageType(str, Enum): TARGET = 'target_storage' @@ -20,7 +20,7 @@ def get_location_configuration(obj, field_name: str, use_settings: bool = False) } if use_settings: - storage = getattr(obj, field_name) + storage = getattr(obj, field_name) if not isinstance(obj, Job) else getattr(obj.segment.task, field_name) if storage is None: location_conf['location'] = Location.LOCAL else: diff --git a/cvat/apps/engine/mixins.py b/cvat/apps/engine/mixins.py index 405dce1f5330..1906da6c9900 100644 --- a/cvat/apps/engine/mixins.py +++ b/cvat/apps/engine/mixins.py @@ -9,15 +9,21 @@ import uuid from dataclasses import asdict, dataclass from distutils.util import strtobool +from pathlib import Path +from tempfile import NamedTemporaryFile from unittest import mock +import django_rq from django.conf import settings from rest_framework import mixins, status from rest_framework.response import Response from cvat.apps.engine.location import StorageType, get_location_configuration +from cvat.apps.engine.log import slogger from cvat.apps.engine.models import Location from cvat.apps.engine.serializers import DataSerializer +from cvat.apps.engine.handlers import clear_import_cache +from cvat.apps.engine.utils import get_import_rq_id class TusFile: @@ -221,7 +227,27 @@ def init_tus_upload(self, request): if message_id: metadata["message_id"] = base64.b64decode(message_id) - file_exists = os.path.lexists(os.path.join(self.get_upload_dir(), filename)) + import_type = request.path.strip('/').split('/')[-1] + if import_type == 'backup': + # we need to create unique temp file here because + # users can try to import backups with the same name at the same time + with NamedTemporaryFile(prefix=f'cvat-backup-{filename}-by-{request.user}', suffix='.zip', dir=self.get_upload_dir()) as tmp_file: + filename = os.path.relpath(tmp_file.name, self.get_upload_dir()) + metadata['filename'] = filename + file_path = os.path.join(self.get_upload_dir(), filename) + file_exists = os.path.lexists(file_path) and import_type != 'backup' + + if file_exists: + # check whether the rq_job is in progress or has been finished/failed + object_class_name = self._object.__class__.__name__.lower() + template = get_import_rq_id(object_class_name, self._object.pk, import_type, request.user) + queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value) + finished_job_ids = queue.finished_job_registry.get_job_ids() + failed_job_ids = queue.failed_job_registry.get_job_ids() + if template in finished_job_ids or template in failed_job_ids: + os.remove(file_path) + file_exists = False + if file_exists: return self._tus_response(status=status.HTTP_409_CONFLICT, data="File with same name already exists") @@ -231,11 +257,27 @@ def init_tus_upload(self, request): return self._tus_response(status=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, data="File size exceeds max limit of {} bytes".format(self._tus_max_file_size)) + tus_file = TusFile.create_file(metadata, file_size, self.get_upload_dir()) location = request.build_absolute_uri() if 'HTTP_X_FORWARDED_HOST' not in request.META: location = request.META.get('HTTP_ORIGIN') + request.META.get('PATH_INFO') + + if import_type in ('backup', 'annotations', 'datasets'): + scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.CLEANING.value) + path = Path(self.get_upload_dir()) / tus_file.filename + cleaning_job = scheduler.enqueue_in(time_delta=settings.IMPORT_CACHE_CLEAN_DELAY, + func=clear_import_cache, + path=path, + creation_time=Path(tus_file.file_path).stat().st_ctime + ) + slogger.glob.info( + f'The cleaning job {cleaning_job.id} is queued.' + f'The check that the file {path} is deleted will be carried out after ' + f'{settings.IMPORT_CACHE_CLEAN_DELAY}.' + ) + return self._tus_response( status=status.HTTP_201_CREATED, extra_headers={'Location': '{}{}'.format(location, tus_file.file_id), @@ -330,7 +372,7 @@ def export_annotations(self, request, db_obj, export_func, callback, get_data=No data = get_data(self._object.pk) return Response(data) - def import_annotations(self, request, db_obj, import_func, rq_func, rq_id): + def import_annotations(self, request, db_obj, import_func, rq_func, rq_id_template): is_tus_request = request.headers.get('Upload-Length', None) is not None or \ request.method == 'OPTIONS' if is_tus_request: @@ -352,7 +394,7 @@ def import_annotations(self, request, db_obj, import_func, rq_func, rq_id): return import_func( request=request, - rq_id=rq_id, + rq_id_template=rq_id_template, rq_func=rq_func, db_obj=self._object, format_name=format_name, diff --git a/cvat/apps/engine/serializers.py b/cvat/apps/engine/serializers.py index 3fefec17b434..1d85679960f0 100644 --- a/cvat/apps/engine/serializers.py +++ b/cvat/apps/engine/serializers.py @@ -22,12 +22,10 @@ from cvat.apps.engine import models from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status from cvat.apps.engine.log import slogger -from cvat.apps.engine.utils import parse_specific_attributes +from cvat.apps.engine.utils import parse_specific_attributes, build_field_filter_params, get_list_view_name, reverse from drf_spectacular.utils import OpenApiExample, extend_schema_field, extend_schema_serializer -from cvat.apps.engine.utils import build_field_filter_params, get_list_view_name, reverse - class WriteOnceMixin: """ @@ -667,6 +665,9 @@ class RqStatusSerializer(serializers.Serializer): message = serializers.CharField(allow_blank=True, default="") progress = serializers.FloatField(max_value=100, default=0) +class RqIdSerializer(serializers.Serializer): + rq_id = serializers.CharField() + class JobFiles(serializers.ListField): """ diff --git a/cvat/apps/engine/tests/test_rest_api.py b/cvat/apps/engine/tests/test_rest_api.py index ef91c34441dc..da49ba70d698 100644 --- a/cvat/apps/engine/tests/test_rest_api.py +++ b/cvat/apps/engine/tests/test_rest_api.py @@ -1836,9 +1836,9 @@ def _run_api_v2_projects_id_dataset_import(self, pid, user, data, f): response = self.client.post("/api/projects/{}/dataset?format={}".format(pid, f), data=data, format="multipart") return response - def _run_api_v2_projects_id_dataset_import_status(self, pid, user): + def _run_api_v2_projects_id_dataset_import_status(self, pid, user, rq_id): with ForceLogin(user, self.client): - response = self.client.get("/api/projects/{}/dataset?action=import_status".format(pid), format="json") + response = self.client.get("/api/projects/{}/dataset?action=import_status&rq_id={}".format(pid, rq_id), format="json") return response def test_api_v2_projects_id_export_import(self): @@ -1867,7 +1867,8 @@ def test_api_v2_projects_id_export_import(self): response = self._run_api_v2_projects_id_dataset_import(pid_import, self.owner, import_data, "CVAT 1.1") self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) - response = self._run_api_v2_projects_id_dataset_import_status(pid_import, self.owner) + rq_id = response.data.get('rq_id') + response = self._run_api_v2_projects_id_dataset_import_status(pid_import, self.owner, rq_id) self.assertEqual(response.status_code, status.HTTP_201_CREATED) def tearDown(self): diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index c1a5eedc84c2..af1ee77bc9e8 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -9,17 +9,21 @@ import importlib import sys import traceback -from typing import Any, Dict, Optional +from contextlib import suppress +from typing import Any, Dict, Optional, Callable, Union import subprocess import os import urllib.parse +import logging import platform + +from rq.job import Job +from django_rq.queues import DjangoRQ from pathlib import Path from django.http.request import HttpRequest from django.utils import timezone from django.utils.http import urlencode - from rest_framework.reverse import reverse as _reverse from av import VideoFrame @@ -133,17 +137,29 @@ def parse_exception_message(msg): pass return parsed_msg -def process_failed_job(rq_job): - if os.path.exists(rq_job.meta['tmp_file']): - os.remove(rq_job.meta['tmp_file']) - exc_info = str(rq_job.exc_info or rq_job.dependency.exc_info) +def process_failed_job(rq_job: Job): + exc_info = str(rq_job.exc_info or getattr(rq_job.dependency, 'exc_info', None) or '') if rq_job.dependency: rq_job.dependency.delete() rq_job.delete() - return parse_exception_message(exc_info) - -def configure_dependent_job(queue, rq_id, rq_func, db_storage, filename, key, request): + msg = parse_exception_message(exc_info) + log = logging.getLogger('cvat.server.engine') + log.error(msg) + return msg + + +def configure_dependent_job( + queue: DjangoRQ, + rq_id: str, + rq_func: Callable[[Any, str, str], None], + db_storage: Any, + filename: str, + key: str, + request: HttpRequest, + result_ttl: float, + failure_ttl: float +) -> Job: rq_job_id_download_file = rq_id + f'?action=download_{filename}' rq_job_download_file = queue.fetch_job(rq_job_id_download_file) if not rq_job_download_file: @@ -153,6 +169,8 @@ def configure_dependent_job(queue, rq_id, rq_func, db_storage, filename, key, re args=(db_storage, filename, key), job_id=rq_job_id_download_file, meta=get_rq_job_meta(request=request, db_obj=db_storage), + result_ttl=result_ttl, + failure_ttl=failure_ttl ) return rq_job_download_file @@ -218,6 +236,27 @@ def get_list_view_name(model): 'model_name': model._meta.object_name.lower() } +def get_import_rq_id( + resource_type: str, + resource_id: int, + subresource_type: str, + user: str, +) -> str: + # import:---by- + return f"import:{resource_type}-{resource_id}-{subresource_type}-by-{user}" + +def import_resource_with_clean_up_after( + func: Union[Callable[[str, int, int], int], Callable[[str, int, str, bool], None]], + filename: str, + *args, + **kwargs, +) -> Any: + try: + result = func(filename, *args, **kwargs) + finally: + with suppress(FileNotFoundError): + os.remove(filename) + return result def get_cpu_number() -> int: cpu_number = None diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index d8c356943652..527ebae4eb20 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -8,6 +8,7 @@ import os.path as osp import pytz import traceback +import textwrap from datetime import datetime from distutils.util import strtobool from tempfile import NamedTemporaryFile @@ -62,12 +63,12 @@ UserSerializer, PluginsSerializer, IssueReadSerializer, IssueWriteSerializer, CommentReadSerializer, CommentWriteSerializer, CloudStorageWriteSerializer, CloudStorageReadSerializer, DatasetFileSerializer, - ProjectFileSerializer, TaskFileSerializer, CloudStorageContentSerializer) + ProjectFileSerializer, TaskFileSerializer, RqIdSerializer, CloudStorageContentSerializer) from cvat.apps.engine.view_utils import get_cloud_storage_for_import_or_export from utils.dataset_manifest import ImageManifestManager from cvat.apps.engine.utils import ( - av_scan_paths, process_failed_job, configure_dependent_job, parse_exception_message, get_rq_job_meta + av_scan_paths, process_failed_job, configure_dependent_job, parse_exception_message, get_rq_job_meta, get_import_rq_id, import_resource_with_clean_up_after ) from cvat.apps.engine import backup from cvat.apps.engine.mixins import PartialUpdateModelMixin, UploadMixin, AnnotationMixin, SerializeMixin @@ -249,6 +250,7 @@ class ProjectViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, ordering = "-id" lookup_fields = {'owner': 'owner__username', 'assignee': 'assignee__username'} iam_organization_field = 'organization' + IMPORT_RQ_ID_TEMPLATE = get_import_rq_id('project', {}, 'dataset', {}) def get_serializer_class(self): if self.request.method in SAFE_METHODS: @@ -271,6 +273,14 @@ def perform_create(self, serializer, **kwargs): ) @extend_schema(methods=['GET'], summary='Export project as a dataset in a specific format', + description=textwrap.dedent(""" + To check the status of the process of importing a project dataset from a file: + + After initiating the dataset upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + GET /api/projects/id/dataset requests to track the status of the dataset import. + Also you should specify action parameter: action=import_status. + """), parameters=[ OpenApiParameter('format', description='Desired output format name\n' 'You can get the list of supported formats at:\n/server/annotation/formats', @@ -287,6 +297,8 @@ def perform_create(self, serializer, **kwargs): OpenApiParameter('use_default_location', description='Use the location that was configured in project to import dataset', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL, required=False, default=True), + OpenApiParameter('rq_id', description='rq id', + location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), ], responses={ '200': OpenApiResponse(OpenApiTypes.BINARY, description='Download of file started'), @@ -294,7 +306,13 @@ def perform_create(self, serializer, **kwargs): '202': OpenApiResponse(description='Exporting has been started'), '405': OpenApiResponse(description='Format is not available'), }) - @extend_schema(methods=['POST'], summary='Import dataset in specific format as a project', + @extend_schema(methods=['POST'], + summary='Import dataset in specific format as a project or check status of dataset import process', + description=textwrap.dedent(""" + The request POST /api/projects/id/dataset will initiate file upload and will create + the rq job on the server in which the process of dataset import from a file + will be carried out. Please, use the GET /api/projects/id/dataset endpoint for checking status of the process. + """), parameters=[ OpenApiParameter('format', description='Desired dataset format name\n' 'You can get the list of supported formats at:\n/server/annotation/formats', @@ -315,7 +333,7 @@ def perform_create(self, serializer, **kwargs): resource_type_field_name=None ), responses={ - '202': OpenApiResponse(description='Importing has been started'), + '202': OpenApiResponse(RqIdSerializer, description='Importing has been started'), '400': OpenApiResponse(description='Failed to import dataset'), '405': OpenApiResponse(description='Format is not available'), }) @@ -323,7 +341,6 @@ def perform_create(self, serializer, **kwargs): url_path=r'dataset/?$', parser_classes=_UPLOAD_PARSER_CLASSES) def dataset(self, request, pk): self._object = self.get_object() # force call of check_object_permissions() - rq_id = f"import:dataset-for-project.id{pk}-by-{request.user}" if request.method in {'POST', 'OPTIONS'}: return self.import_annotations( @@ -331,17 +348,26 @@ def dataset(self, request, pk): db_obj=self._object, import_func=_import_project_dataset, rq_func=dm.project.import_dataset_as_project, - rq_id=rq_id, + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE ) else: action = request.query_params.get("action", "").lower() if action in ("import_status",): queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value) + rq_id = request.query_params.get('rq_id') + if not rq_id: + return Response('The rq_id param should be specified in the query parameters', status=status.HTTP_400_BAD_REQUEST) + + # check that the user has access to the current rq_job + # We should not return any status of job including "404 not found" for user that has no access for this rq_job + + if self.IMPORT_RQ_ID_TEMPLATE.format(pk, request.user) != rq_id: + return Response(status=status.HTTP_403_FORBIDDEN) + rq_job = queue.fetch_job(rq_id) if rq_job is None: return Response(status=status.HTTP_404_NOT_FOUND) elif rq_job.is_finished: - os.remove(rq_job.meta['tmp_file']) if rq_job.dependency: rq_job.dependency.delete() rq_job.delete() @@ -394,7 +420,7 @@ def upload_finished(self, request): return _import_project_dataset( request=request, filename=uploaded_file, - rq_id=f"import:dataset-for-project.id{self._object.pk}-by-{request.user}", + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE, rq_func=dm.project.import_dataset_as_project, db_obj=self._object, format_name=format_name, @@ -483,7 +509,19 @@ def annotations(self, request, pk): def export_backup(self, request, pk=None): return self.serialize(request, backup.export) - @extend_schema(summary='Methods create a project from a backup', + @extend_schema(methods=['POST'], summary='Methods create a project from a backup', + description=textwrap.dedent(""" + The backup import process is as follows: + + The first request POST /api/projects/backup will initiate file upload and will create + the rq job on the server in which the process of a project creating from an uploaded backup + will be carried out. + + After initiating the backup upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent requests + to track the status of the project creation. + Once the project has been successfully created, the server will return the id of the newly created project. + """), parameters=[ *ORGANIZATION_OPEN_API_PARAMETERS, OpenApiParameter('location', description='Where to import the backup file from', @@ -493,14 +531,20 @@ def export_backup(self, request, pk=None): location=OpenApiParameter.QUERY, type=OpenApiTypes.NUMBER, required=False), OpenApiParameter('filename', description='Backup file name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), + OpenApiParameter('rq_id', description='rq id', + location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), ], request=PolymorphicProxySerializer('BackupWrite', serializers=[ProjectFileSerializer, OpenApiTypes.NONE], resource_type_field_name=None ), + # TODO: for some reason the code generated by the openapi generator from schema with different serializers + # contains only one serializer, need to fix that. + # https://github.com/OpenAPITools/openapi-generator/issues/6126 responses={ - '201': OpenApiResponse(description='The project has been imported'), # or better specify {id: project_id} - '202': OpenApiResponse(description='Importing a backup file has been started'), + # 201: OpenApiResponse(inline_serializer("ImportedProjectIdSerializer", fields={"id": serializers.IntegerField(required=True)}) + '201': OpenApiResponse(description='The project has been imported'), + '202': OpenApiResponse(RqIdSerializer, description='Importing a backup file has been started'), }) @action(detail=False, methods=['OPTIONS', 'POST'], url_path=r'backup/?$', serializer_class=ProjectFileSerializer(required=False), @@ -699,6 +743,7 @@ class TaskViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, ordering_fields = list(filter_fields) ordering = "-id" iam_organization_field = 'organization' + IMPORT_RQ_ID_TEMPLATE = get_import_rq_id('task', {}, 'annotations', {}) def get_serializer_class(self): if self.request.method in SAFE_METHODS: @@ -716,6 +761,18 @@ def get_queryset(self): return queryset @extend_schema(summary='Method recreates a task from an attached task backup file', + description=textwrap.dedent(""" + The backup import process is as follows: + + The first request POST /api/tasks/backup will initiate file upload and will create + the rq job on the server in which the process of a task creating from an uploaded backup + will be carried out. + + After initiating the backup upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent requests + to track the status of the task creation. + Once the task has been successfully created, the server will return the id of the newly created task. + """), parameters=[ *ORGANIZATION_OPEN_API_PARAMETERS, OpenApiParameter('location', description='Where to import the backup file from', @@ -725,12 +782,19 @@ def get_queryset(self): location=OpenApiParameter.QUERY, type=OpenApiTypes.NUMBER, required=False), OpenApiParameter('filename', description='Backup file name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), + OpenApiParameter('rq_id', description='rq id', + location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), ], request=TaskFileSerializer(required=False), + # TODO: for some reason the code generated by the openapi generator from schema with different serializers + # contains only one serializer, need to fix that. + # https://github.com/OpenAPITools/openapi-generator/issues/6126 responses={ - '201': OpenApiResponse(description='The task has been imported'), # or better specify {id: task_id} - '202': OpenApiResponse(description='Importing a backup file has been started'), + # 201: OpenApiResponse(inline_serializer("ImportedTaskIdSerializer", fields={"id": serializers.IntegerField(required=True)}) + '201': OpenApiResponse(description='The task has been imported'), + '202': OpenApiResponse(RqIdSerializer, description='Importing a backup file has been started'), }) + @action(detail=False, methods=['OPTIONS', 'POST'], url_path=r'backup/?$', serializer_class=TaskFileSerializer(required=False), parser_classes=_UPLOAD_PARSER_CLASSES) @@ -810,8 +874,7 @@ def upload_finished(self, request): return _import_annotations( request=request, filename=annotation_file, - rq_id=(f"import:annotations-for-task.id{self._object.pk}-" - f"in-{format_name.replace(' ', '_')}-by-{request.user}"), + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE, rq_func=dm.task.import_task_annotations, db_obj=self._object, format_name=format_name, @@ -962,10 +1025,19 @@ def append_data_chunk(self, request, pk, file_id): '400': OpenApiResponse(description='Exporting without data is not allowed'), '405': OpenApiResponse(description='Format is not available'), }) - @extend_schema(methods=['PUT'], summary='Method allows to upload task annotations', + @extend_schema(methods=['PUT'], summary='Method allows to upload task annotations or edit existing annotations', + description=textwrap.dedent(""" + To check the status of the process of uploading a task annotations from a file: + + After initiating the annotations upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + PUT /api/tasks/id/annotations requests to track the status of the annotations upload. + """), parameters=[ OpenApiParameter('format', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, description='Input format name\nYou can get the list of supported formats at:\n/server/annotation/formats'), + OpenApiParameter('rq_id', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, + description='rq id'), ], request=PolymorphicProxySerializer('TaskAnnotationsUpdate', serializers=[LabeledDataSerializer, AnnotationFileSerializer, OpenApiTypes.NONE], @@ -977,7 +1049,12 @@ def append_data_chunk(self, request, pk, file_id): '405': OpenApiResponse(description='Format is not available'), }) @extend_schema(methods=['POST'], - summary="Method allows to upload task annotations from a local file or a cloud storage", + summary="Method allows to initialize the process of upload task annotations from a local or a cloud storage file", + description=textwrap.dedent(""" + The request POST /api/tasks/id/annotations will initiate file upload and will create + the rq job on the server in which the process of annotations uploading from file + will be carried out. Please, use the PUT /api/tasks/id/annotations endpoint for checking status of the process. + """), parameters=[ OpenApiParameter('format', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, description='Input format name\nYou can get the list of supported formats at:\n/server/annotation/formats'), @@ -998,7 +1075,7 @@ def append_data_chunk(self, request, pk, file_id): ), responses={ '201': OpenApiResponse(description='Uploading has finished'), - '202': OpenApiResponse(description='Uploading has been started'), + '202': OpenApiResponse(RqIdSerializer, description='Uploading has been started'), '405': OpenApiResponse(description='Format is not available'), }) @extend_schema(methods=['PATCH'], summary='Method performs a partial update of annotations in a specific task', @@ -1031,17 +1108,19 @@ def annotations(self, request, pk): return Response(data="Exporting annotations from a task without data is not allowed", status=status.HTTP_400_BAD_REQUEST) elif request.method == 'POST' or request.method == 'OPTIONS': + # NOTE: initialization process of annotations import format_name = request.query_params.get('format', '') return self.import_annotations( request=request, db_obj=self._object, import_func=_import_annotations, rq_func=dm.task.import_task_annotations, - rq_id = f"import:annotations-for-task.id{pk}-in-{format_name.replace(' ', '_')}-by-{request.user}" + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE ) elif request.method == 'PUT': format_name = request.query_params.get('format', '') if format_name: + # NOTE: continue process of import annotations use_settings = strtobool(str(request.query_params.get('use_default_location', True))) conv_mask_to_poly = strtobool(request.query_params.get('conv_mask_to_poly', 'True')) obj = self._object if use_settings else request.query_params @@ -1050,7 +1129,7 @@ def annotations(self, request, pk): ) return _import_annotations( request=request, - rq_id = f"import:annotations-for-task.id{pk}-in-{format_name.replace(' ', '_')}-by-{request.user}", + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE, rq_func=dm.task.import_task_annotations, db_obj=self._object, format_name=format_name, @@ -1274,6 +1353,7 @@ class JobViewSet(viewsets.GenericViewSet, mixins.ListModelMixin, 'project_name': 'segment__task__project__name', 'assignee': 'assignee__username' } + IMPORT_RQ_ID_TEMPLATE = get_import_rq_id('job', {}, 'annotations', {}) def get_queryset(self): queryset = super().get_queryset() @@ -1292,24 +1372,21 @@ def get_serializer_class(self): # UploadMixin method def get_upload_dir(self): - task = self._object.segment.task - return task.get_tmp_dirname() + return self._object.get_tmp_dirname() # UploadMixin method def upload_finished(self, request): - task = self._object.segment.task if self.action == 'annotations': format_name = request.query_params.get("format", "") filename = request.query_params.get("filename", "") conv_mask_to_poly = strtobool(request.query_params.get('conv_mask_to_poly', 'True')) - tmp_dir = task.get_tmp_dirname() + tmp_dir = self.get_upload_dir() if os.path.isfile(os.path.join(tmp_dir, filename)): annotation_file = os.path.join(tmp_dir, filename) return _import_annotations( request=request, filename=annotation_file, - rq_id=(f"import:annotations-for-job.id{self._object.pk}-" - f"in-{format_name.replace(' ', '_')}-by-{request.user}"), + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE, rq_func=dm.task.import_job_annotations, db_obj=self._object, format_name=format_name, @@ -1352,7 +1429,13 @@ def upload_finished(self, request): '202': OpenApiResponse(description='Exporting has been started'), '405': OpenApiResponse(description='Format is not available'), }) - @extend_schema(methods=['POST'], summary='Method allows to upload job annotations', + @extend_schema(methods=['POST'], + summary='Method allows to initialize the process of the job annotation upload from a local file or a cloud storage', + description=textwrap.dedent(""" + The request POST /api/jobs/id/annotations will initiate file upload and will create + the rq job on the server in which the process of annotations uploading from file + will be carried out. Please, use the PUT /api/jobs/id/annotations endpoint for checking status of the process. + """), parameters=[ OpenApiParameter('format', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, description='Input format name\nYou can get the list of supported formats at:\n/server/annotation/formats'), @@ -1370,13 +1453,24 @@ def upload_finished(self, request): request=AnnotationFileSerializer, responses={ '201': OpenApiResponse(description='Uploading has finished'), - '202': OpenApiResponse(description='Uploading has been started'), + '202': OpenApiResponse(RqIdSerializer, description='Uploading has been started'), '405': OpenApiResponse(description='Format is not available'), }) - @extend_schema(methods=['PUT'], summary='Method performs an update of all annotations in a specific job', + @extend_schema(methods=['PUT'], + summary='Method performs an update of all annotations in a specific job ' + 'or used for uploading annotations from a file', + description=textwrap.dedent(""" + To check the status of the process of uploading a job annotations from a file: + + After initiating the annotations upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + PUT /api/jobs/id/annotations requests to track the status of the annotations upload. + """), parameters=[ OpenApiParameter('format', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, description='Input format name\nYou can get the list of supported formats at:\n/server/annotation/formats'), + OpenApiParameter('rq_id', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, + description='rq id'), ], request=PolymorphicProxySerializer( component_name='JobAnnotationsUpdate', @@ -1418,11 +1512,10 @@ def annotations(self, request, pk): format_name = request.query_params.get('format', '') return self.import_annotations( request=request, - db_obj=self._object.segment.task, + db_obj=self._object, import_func=_import_annotations, rq_func=dm.task.import_job_annotations, - rq_id=(f"import:annotations-for-job.id{self._object.pk}-" - f"in-{format_name.replace(' ', '_')}-by-{request.user}"), + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE ) elif request.method == 'PUT': @@ -1436,8 +1529,7 @@ def annotations(self, request, pk): ) return _import_annotations( request=request, - rq_id=(f"import:annotations-for-job.id{pk}-" - f"in-{format_name.replace(' ', '_')}-by-{request.user}"), + rq_id_template=self.IMPORT_RQ_ID_TEMPLATE, rq_func=dm.task.import_job_annotations, db_obj=self._object, format_name=format_name, @@ -2302,9 +2394,9 @@ def actions(self, request, pk): return HttpResponseBadRequest(msg) def rq_exception_handler(rq_job, exc_type, exc_value, tb): - rq_job.exc_info = "".join( + rq_job.meta["formatted_exception"] = "".join( traceback.format_exception_only(exc_type, exc_value)) - rq_job.save() + rq_job.save_meta() return True @@ -2315,8 +2407,9 @@ def _download_file_from_bucket(db_storage, filename, key): with open(filename, 'wb+') as f: f.write(data.getbuffer()) -def _import_annotations(request, rq_id, rq_func, db_obj, format_name, +def _import_annotations(request, rq_id_template, rq_func, db_obj, format_name, filename=None, location_conf=None, conv_mask_to_poly=True): + format_desc = {f.DISPLAY_NAME: f for f in dm.views.get_import_formats()}.get(format_name) if format_desc is None: @@ -2325,9 +2418,25 @@ def _import_annotations(request, rq_id, rq_func, db_obj, format_name, elif not format_desc.ENABLED: return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED) + rq_id = request.query_params.get('rq_id') + rq_id_should_be_checked = bool(rq_id) + if not rq_id: + rq_id = rq_id_template.format(db_obj.pk, request.user) + queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value) rq_job = queue.fetch_job(rq_id) + if rq_id_should_be_checked and rq_id_template.format(db_obj.pk, request.user) != rq_id: + return Response(status=status.HTTP_403_FORBIDDEN) + + if rq_job and request.method == 'POST': + # If there is a previous job that has not been deleted + if rq_job.is_finished or rq_job.is_failed: + rq_job.delete() + rq_job = queue.fetch_job(rq_id) + else: + return Response(status=status.HTTP_409_CONFLICT, data='Import job already exists') + if not rq_job: # If filename is specified we consider that file was uploaded via TUS, so it exists in filesystem # Then we dont need to create temporary file @@ -2375,6 +2484,8 @@ def _import_annotations(request, rq_id, rq_func, db_obj, format_name, filename=filename, key=key, request=request, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) av_scan_paths(filename) @@ -2382,15 +2493,20 @@ def _import_annotations(request, rq_id, rq_func, db_obj, format_name, 'tmp_file': filename, } rq_job = queue.enqueue_call( - func=rq_func, - args=(db_obj.pk, filename, format_name, conv_mask_to_poly), + func=import_resource_with_clean_up_after, + args=(rq_func, filename, db_obj.pk, format_name, conv_mask_to_poly), job_id=rq_id, depends_on=dependent_job, - meta={**meta, **get_rq_job_meta(request=request, db_obj=db_obj)} + meta={**meta, **get_rq_job_meta(request=request, db_obj=db_obj)}, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) + serializer = RqIdSerializer(data={'rq_id': rq_id}) + serializer.is_valid(raise_exception=True) + + return Response(serializer.data, status=status.HTTP_202_ACCEPTED) else: if rq_job.is_finished: - os.remove(rq_job.meta['tmp_file']) rq_job.delete() return Response(status=status.HTTP_201_CREATED) elif rq_job.is_failed or \ @@ -2399,9 +2515,8 @@ def _import_annotations(request, rq_id, rq_func, db_obj, format_name, # RQ adds a prefix with exception class name import_error_prefix = '{}.{}'.format( CvatImportError.__module__, CvatImportError.__name__) - if exc_info.startswith(import_error_prefix): - exc_info = exc_info.replace(import_error_prefix + ': ', '') - return Response(data=exc_info, + if import_error_prefix in exc_info: + return Response(data="The annotations that were uploaded are not correct", status=status.HTTP_400_BAD_REQUEST) else: return Response(data=exc_info, @@ -2481,7 +2596,7 @@ def _export_annotations(db_instance, rq_id, request, format_name, action, callba if osp.exists(file_path): return Response(status=status.HTTP_201_CREATED) elif rq_job.is_failed: - exc_info = str(rq_job.exc_info) + exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info)) rq_job.delete() return Response(exc_info, status=status.HTTP_500_INTERNAL_SERVER_ERROR) @@ -2509,7 +2624,7 @@ def _export_annotations(db_instance, rq_id, request, format_name, action, callba result_ttl=ttl, failure_ttl=ttl) return Response(status=status.HTTP_202_ACCEPTED) -def _import_project_dataset(request, rq_id, rq_func, db_obj, format_name, filename=None, conv_mask_to_poly=True, location_conf=None): +def _import_project_dataset(request, rq_id_template, rq_func, db_obj, format_name, filename=None, conv_mask_to_poly=True, location_conf=None): format_desc = {f.DISPLAY_NAME: f for f in dm.views.get_import_formats()}.get(format_name) if format_desc is None: @@ -2518,10 +2633,17 @@ def _import_project_dataset(request, rq_id, rq_func, db_obj, format_name, filena elif not format_desc.ENABLED: return Response(status=status.HTTP_405_METHOD_NOT_ALLOWED) + rq_id = rq_id_template.format(db_obj.pk, request.user) + queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value) rq_job = queue.fetch_job(rq_id) - if not rq_job: + if not rq_job or rq_job.is_finished or rq_job.is_failed: + if rq_job and (rq_job.is_finished or rq_job.is_failed): + # for some reason the previous job has not been deleted + # (e.g the user closed the browser tab when job has been created + # but no one requests for checking status were not made) + rq_job.delete() dependent_job = None location = location_conf.get('location') if location_conf else None if not filename and location != Location.CLOUD_STORAGE: @@ -2563,19 +2685,26 @@ def _import_project_dataset(request, rq_id, rq_func, db_obj, format_name, filena filename=filename, key=key, request=request, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) rq_job = queue.enqueue_call( - func=rq_func, - args=(db_obj.pk, filename, format_name, conv_mask_to_poly), + func=import_resource_with_clean_up_after, + args=(rq_func, filename, db_obj.pk, format_name, conv_mask_to_poly), job_id=rq_id, meta={ 'tmp_file': filename, **get_rq_job_meta(request=request, db_obj=db_obj), }, depends_on=dependent_job, + result_ttl=settings.IMPORT_CACHE_SUCCESS_TTL.total_seconds(), + failure_ttl=settings.IMPORT_CACHE_FAILED_TTL.total_seconds() ) else: return Response(status=status.HTTP_409_CONFLICT, data='Import job already exists') - return Response(status=status.HTTP_202_ACCEPTED) + serializer = RqIdSerializer(data={'rq_id': rq_id}) + serializer.is_valid(raise_exception=True) + + return Response(serializer.data, status=status.HTTP_202_ACCEPTED) diff --git a/cvat/apps/events/export.py b/cvat/apps/events/export.py index ef464b5b7da9..90aca64db8be 100644 --- a/cvat/apps/events/export.py +++ b/cvat/apps/events/export.py @@ -148,7 +148,7 @@ def export(request, filter_query, queue_name): if os.path.exists(file_path): return Response(status=status.HTTP_201_CREATED) elif rq_job.is_failed: - exc_info = str(rq_job.exc_info) + exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info)) rq_job.delete() return Response(exc_info, status=status.HTTP_500_INTERNAL_SERVER_ERROR) diff --git a/cvat/requirements/base.in b/cvat/requirements/base.in index 933548869798..7c773219336f 100644 --- a/cvat/requirements/base.in +++ b/cvat/requirements/base.in @@ -50,7 +50,7 @@ diskcache==5.4.0 boto3==1.17.61 azure-storage-blob==12.13.0 google-cloud-storage==1.42.0 -git+https://github.com/cvat-ai/datumaro.git@0817144ade1ddc514e182ca1835e322cb9af00a0 +git+https://github.com/cvat-ai/datumaro.git@ff83c00c2c1bc4b8fdfcc55067fcab0a9b5b6b11 urllib3>=1.26.5 # not directly required, pinned by Snyk to avoid a vulnerability natsort==8.0.0 mistune>=2.0.1 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/cvat/requirements/base.txt b/cvat/requirements/base.txt index d95148f72979..5df59babe6d1 100644 --- a/cvat/requirements/base.txt +++ b/cvat/requirements/base.txt @@ -1,4 +1,4 @@ -# SHA1:53feeaa402abed516aad4a640244c5fd1bff765a +# SHA1:d1435558d66ec49d0c691492b2f3798960ca3bba # # This file is autogenerated by pip-compile-multi # To update, run: @@ -66,7 +66,7 @@ cryptography==40.0.2 # pyjwt cycler==0.11.0 # via matplotlib -datumaro @ git+https://github.com/cvat-ai/datumaro.git@0817144ade1ddc514e182ca1835e322cb9af00a0 +datumaro @ git+https://github.com/cvat-ai/datumaro.git@ff83c00c2c1bc4b8fdfcc55067fcab0a9b5b6b11 # via -r cvat/requirements/base.in defusedxml==0.7.1 # via diff --git a/cvat/schema.yml b/cvat/schema.yml index b7d2997b2c58..97a1f1cc6666 100644 --- a/cvat/schema.yml +++ b/cvat/schema.yml @@ -1681,7 +1681,13 @@ paths: description: Format is not available post: operationId: jobs_create_annotations - summary: Method allows to upload job annotations + description: |2 + + The request POST /api/jobs/id/annotations will initiate file upload and will create + the rq job on the server in which the process of annotations uploading from file + will be carried out. Please, use the PUT /api/jobs/id/annotations endpoint for checking status of the process. + summary: Method allows to initialize the process of the job annotation upload + from a local file or a cloud storage parameters: - in: query name: cloud_storage_id @@ -1742,12 +1748,24 @@ paths: '201': description: Uploading has finished '202': + content: + application/vnd.cvat+json: + schema: + $ref: '#/components/schemas/RqId' description: Uploading has been started '405': description: Format is not available put: operationId: jobs_update_annotations - summary: Method performs an update of all annotations in a specific job + description: |2 + + To check the status of the process of uploading a job annotations from a file: + + After initiating the annotations upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + PUT /api/jobs/id/annotations requests to track the status of the annotations upload. + summary: Method performs an update of all annotations in a specific job or used + for uploading annotations from a file parameters: - in: query name: format @@ -1763,6 +1781,11 @@ paths: type: integer description: A unique integer value identifying this job. required: true + - in: query + name: rq_id + schema: + type: string + description: rq id tags: - jobs requestBody: @@ -3059,6 +3082,14 @@ paths: /api/projects/{id}/dataset/: get: operationId: projects_retrieve_dataset + description: |2 + + To check the status of the process of importing a project dataset from a file: + + After initiating the dataset upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + GET /api/projects/id/dataset requests to track the status of the dataset import. + Also you should specify action parameter: action=import_status. summary: Export project as a dataset in a specific format parameters: - in: query @@ -3102,6 +3133,11 @@ paths: - cloud_storage - local description: Where need to save downloaded dataset + - in: query + name: rq_id + schema: + type: string + description: rq id - in: query name: use_default_location schema: @@ -3132,7 +3168,13 @@ paths: description: Format is not available post: operationId: projects_create_dataset - summary: Import dataset in specific format as a project + description: |2 + + The request POST /api/projects/id/dataset will initiate file upload and will create + the rq job on the server in which the process of dataset import from a file + will be carried out. Please, use the GET /api/projects/id/dataset endpoint for checking status of the process. + summary: Import dataset in specific format as a project or check status of dataset + import process parameters: - in: query name: cloud_storage_id @@ -3191,6 +3233,10 @@ paths: - basicAuth: [] responses: '202': + content: + application/vnd.cvat+json: + schema: + $ref: '#/components/schemas/RqId' description: Importing has been started '400': description: Failed to import dataset @@ -3223,6 +3269,18 @@ paths: /api/projects/backup/: post: operationId: projects_create_backup + description: |2 + + The backup import process is as follows: + + The first request POST /api/projects/backup will initiate file upload and will create + the rq job on the server in which the process of a project creating from an uploaded backup + will be carried out. + + After initiating the backup upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent requests + to track the status of the project creation. + Once the project has been successfully created, the server will return the id of the newly created project. summary: Methods create a project from a backup parameters: - in: header @@ -3259,6 +3317,11 @@ paths: schema: type: integer description: Organization identifier + - in: query + name: rq_id + schema: + type: string + description: rq id tags: - projects requestBody: @@ -3279,6 +3342,10 @@ paths: '201': description: The project has been imported '202': + content: + application/vnd.cvat+json: + schema: + $ref: '#/components/schemas/RqId' description: Importing a backup file has been started /api/schema/: get: @@ -3835,8 +3902,13 @@ paths: description: Format is not available post: operationId: tasks_create_annotations - summary: Method allows to upload task annotations from a local file or a cloud - storage + description: |2 + + The request POST /api/tasks/id/annotations will initiate file upload and will create + the rq job on the server in which the process of annotations uploading from file + will be carried out. Please, use the PUT /api/tasks/id/annotations endpoint for checking status of the process. + summary: Method allows to initialize the process of upload task annotations + from a local or a cloud storage file parameters: - in: query name: cloud_storage_id @@ -3896,12 +3968,23 @@ paths: '201': description: Uploading has finished '202': + content: + application/vnd.cvat+json: + schema: + $ref: '#/components/schemas/RqId' description: Uploading has been started '405': description: Format is not available put: operationId: tasks_update_annotations - summary: Method allows to upload task annotations + description: |2 + + To check the status of the process of uploading a task annotations from a file: + + After initiating the annotations upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent + PUT /api/tasks/id/annotations requests to track the status of the annotations upload. + summary: Method allows to upload task annotations or edit existing annotations parameters: - in: query name: format @@ -3917,6 +4000,11 @@ paths: type: integer description: A unique integer value identifying this task. required: true + - in: query + name: rq_id + schema: + type: string + description: rq id tags: - tasks requestBody: @@ -4341,6 +4429,18 @@ paths: /api/tasks/backup/: post: operationId: tasks_create_backup + description: |2 + + The backup import process is as follows: + + The first request POST /api/tasks/backup will initiate file upload and will create + the rq job on the server in which the process of a task creating from an uploaded backup + will be carried out. + + After initiating the backup upload, you will receive an rq_id parameter. + Make sure to include this parameter as a query parameter in your subsequent requests + to track the status of the task creation. + Once the task has been successfully created, the server will return the id of the newly created task. summary: Method recreates a task from an attached task backup file parameters: - in: header @@ -4377,6 +4477,11 @@ paths: schema: type: integer description: Organization identifier + - in: query + name: rq_id + schema: + type: string + description: rq id tags: - tasks requestBody: @@ -4398,6 +4503,10 @@ paths: '201': description: The task has been imported '202': + content: + application/vnd.cvat+json: + schema: + $ref: '#/components/schemas/RqId' description: Importing a backup file has been started /api/users: get: @@ -7339,6 +7448,13 @@ components: * `supervisor` - Supervisor * `maintainer` - Maintainer * `owner` - Owner + RqId: + type: object + properties: + rq_id: + type: string + required: + - rq_id RqStatus: type: object properties: diff --git a/cvat/settings/base.py b/cvat/settings/base.py index 8c24978a9220..4b04ea8d79e0 100644 --- a/cvat/settings/base.py +++ b/cvat/settings/base.py @@ -21,6 +21,7 @@ import shutil import subprocess import sys +from datetime import timedelta from distutils.util import strtobool from enum import Enum @@ -294,6 +295,7 @@ class CVAT_QUEUES(Enum): AUTO_ANNOTATION = 'annotation' WEBHOOKS = 'webhooks' NOTIFICATIONS = 'notifications' + CLEANING = 'cleaning' RQ_QUEUES = { CVAT_QUEUES.IMPORT_DATA.value: { @@ -326,6 +328,12 @@ class CVAT_QUEUES(Enum): 'DB': 0, 'DEFAULT_TIMEOUT': '1h' }, + CVAT_QUEUES.CLEANING.value: { + 'HOST': 'localhost', + 'PORT': 6379, + 'DB': 0, + 'DEFAULT_TIMEOUT': '1h' + }, } NUCLIO = { @@ -346,7 +354,6 @@ class CVAT_QUEUES(Enum): 'cvat.apps.events.handlers.handle_rq_exception', ] - # JavaScript and CSS compression # https://django-compressor.readthedocs.io @@ -667,3 +674,7 @@ class CVAT_QUEUES(Enum): } BUCKET_CONTENT_MAX_PAGE_SIZE = 500 + +IMPORT_CACHE_FAILED_TTL = timedelta(days=90) +IMPORT_CACHE_SUCCESS_TTL = timedelta(hours=1) +IMPORT_CACHE_CLEAN_DELAY = timedelta(hours=2) diff --git a/cvat/settings/testing_rest.py b/cvat/settings/testing_rest.py index 36bf80dc0a2b..5fb329732f29 100644 --- a/cvat/settings/testing_rest.py +++ b/cvat/settings/testing_rest.py @@ -10,3 +10,5 @@ PASSWORD_HASHERS = [ "django.contrib.auth.hashers.MD5PasswordHasher", ] + +IMPORT_CACHE_CLEAN_DELAY = timedelta(seconds=30) diff --git a/docker-compose.yml b/docker-compose.yml index dc46e4a320e0..a91b002e7e4d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -77,6 +77,7 @@ services: DJANGO_LOG_SERVER_HOST: vector DJANGO_LOG_SERVER_PORT: 80 no_proxy: clickhouse,grafana,vector,nuclio,opa,${no_proxy:-} + NUMPROCS: 1 command: -c supervisord/utils.conf volumes: - cvat_data:/home/django/data diff --git a/supervisord/utils.conf b/supervisord/utils.conf index 2e53b6f1de33..925ada37324f 100644 --- a/supervisord/utils.conf +++ b/supervisord/utils.conf @@ -41,3 +41,12 @@ command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -i " environment=VECTOR_EVENT_HANDLER="SynchronousLogstashHandler" numprocs=1 + +[program:rqworker_cleaning] +command=%(ENV_HOME)s/wait-for-it.sh %(ENV_CVAT_REDIS_HOST)s:6379 -t 0 -- bash -ic " \ + exec python3 %(ENV_HOME)s/manage.py rqworker -v 3 cleaning \ + --worker-class cvat.rqworker.DefaultWorker \ + " +environment=SSH_AUTH_SOCK="/tmp/ssh-agent.sock",VECTOR_EVENT_HANDLER="SynchronousLogstashHandler" +numprocs=%(ENV_NUMPROCS)s +process_name=rqworker_cleaning_%(process_num)s \ No newline at end of file diff --git a/tests/python/pytest.ini b/tests/python/pytest.ini index 653ae999256e..05cda52273da 100644 --- a/tests/python/pytest.ini +++ b/tests/python/pytest.ini @@ -8,4 +8,3 @@ timeout = 15 markers = with_external_services: The test requires services extrernal to the default CVAT deployment, e.g. a Git server etc. - diff --git a/tests/python/rest_api/test_projects.py b/tests/python/rest_api/test_projects.py index 9c335aa74513..40def2386124 100644 --- a/tests/python/rest_api/test_projects.py +++ b/tests/python/rest_api/test_projects.py @@ -473,11 +473,13 @@ def _test_import_project(self, username, project_id, format_name, data): _content_type="multipart/form-data", ) assert response.status == HTTPStatus.ACCEPTED + rq_id = json.loads(response.data).get("rq_id") + assert rq_id, "The rq_id was not found in the response" while True: # TODO: It's better be refactored to a separate endpoint to get request status (_, response) = api_client.projects_api.retrieve_dataset( - project_id, action="import_status" + project_id, action="import_status", rq_id=rq_id ) if response.status == HTTPStatus.CREATED: break diff --git a/tests/python/rest_api/test_tasks.py b/tests/python/rest_api/test_tasks.py index 77c1baaac04c..59e118a91a9b 100644 --- a/tests/python/rest_api/test_tasks.py +++ b/tests/python/rest_api/test_tasks.py @@ -11,8 +11,10 @@ from functools import partial from http import HTTPStatus from itertools import chain, product +from math import ceil from pathlib import Path -from tempfile import TemporaryDirectory +from tempfile import NamedTemporaryFile, TemporaryDirectory +from time import sleep, time from typing import List, Optional import pytest @@ -21,10 +23,12 @@ from cvat_sdk.api_client.api_client import ApiClient, Endpoint from cvat_sdk.core.helpers import get_paginated_collection from cvat_sdk.core.proxies.tasks import ResourceType, Task +from cvat_sdk.core.uploading import Uploader from deepdiff import DeepDiff from PIL import Image import shared.utils.s3 as s3 +from shared.fixtures.init import docker_exec_cvat, kube_exec_cvat from shared.utils.config import ( BASE_URL, USER_PASS, @@ -1726,3 +1730,110 @@ def test_can_report_correct_completed_jobs_count(tasks, jobs, admin_user): task, _ = api_client.tasks_api.retrieve(task["id"]) assert task.jobs.completed == 1 + + +class TestImportTaskAnnotations: + def _make_client(self) -> Client: + return Client(BASE_URL, config=Config(status_check_period=0.01)) + + @pytest.fixture(autouse=True) + def setup(self, restore_db_per_function, tmp_path: Path, admin_user: str): + self.tmp_dir = tmp_path + self.client = self._make_client() + self.user = admin_user + self.format = "COCO 1.0" + + with self.client: + self.client.login((self.user, USER_PASS)) + + def _check_annotations(self, task_id): + with make_api_client(self.user) as api_client: + (_, response) = api_client.tasks_api.retrieve_annotations(id=task_id) + assert response.status == HTTPStatus.OK + annotations = json.loads(response.data)["shapes"] + assert len(annotations) > 0 + + def _delete_annotations(self, task_id): + with make_api_client(self.user) as api_client: + (_, response) = api_client.tasks_api.destroy_annotations(id=task_id) + assert response.status == HTTPStatus.NO_CONTENT + + @pytest.mark.timeout(64) + @pytest.mark.parametrize("successful_upload", [True, False]) + def test_can_import_annotations_after_previous_unclear_import( + self, successful_upload: bool, tasks_with_shapes + ): + task_id = tasks_with_shapes[0]["id"] + self._check_annotations(task_id) + + with NamedTemporaryFile() as f: + filename = self.tmp_dir / f"task_{task_id}_{Path(f.name).name}_coco.zip" + + task = self.client.tasks.retrieve(task_id) + task.export_dataset(self.format, filename, include_images=False) + + self._delete_annotations(task_id) + + params = {"format": self.format, "filename": filename.name} + url = self.client.api_map.make_endpoint_url( + self.client.api_client.tasks_api.create_annotations_endpoint.path + ).format(id=task_id) + uploader = Uploader(self.client) + + if successful_upload: + # define time required to upload file with annotations + start_time = time() + task.import_annotations(self.format, filename) + required_time = ceil(time() - start_time) * 2 + self._delete_annotations(task_id) + + response = uploader.upload_file( + url, filename, meta=params, query_params=params, logger=self.client.logger.debug + ) + rq_id = json.loads(response.data)["rq_id"] + assert rq_id + else: + required_time = 54 + uploader._tus_start_upload(url, query_params=params) + uploader._upload_file_data_with_tus( + url, filename, meta=params, logger=self.client.logger.debug + ) + + sleep(required_time) + if successful_upload: + self._check_annotations(task_id) + self._delete_annotations(task_id) + task.import_annotations(self.format, filename) + self._check_annotations(task_id) + + @pytest.mark.timeout(64) + def test_check_import_cache_after_previous_interrupted_upload(self, tasks_with_shapes, request): + task_id = tasks_with_shapes[0]["id"] + with NamedTemporaryFile() as f: + filename = self.tmp_dir / f"task_{task_id}_{Path(f.name).name}_coco.zip" + task = self.client.tasks.retrieve(task_id) + task.export_dataset(self.format, filename, include_images=False) + + params = {"format": self.format, "filename": filename.name} + url = self.client.api_map.make_endpoint_url( + self.client.api_client.tasks_api.create_annotations_endpoint.path + ).format(id=task_id) + + uploader = Uploader(self.client) + uploader._tus_start_upload(url, query_params=params) + uploader._upload_file_data_with_tus( + url, filename, meta=params, logger=self.client.logger.debug + ) + number_of_files = 1 + sleep(30) # wait when the cleaning job from rq worker will be started + command = ["/bin/bash", "-c", f"ls data/tasks/{task_id}/tmp | wc -l"] + platform = request.config.getoption("--platform") + assert platform in ("kube", "local") + func = docker_exec_cvat if platform == "local" else kube_exec_cvat + for _ in range(12): + sleep(2) + result, _ = func(command) + number_of_files = int(result) + if not number_of_files: + break + assert not number_of_files diff --git a/tests/python/sdk/test_tasks.py b/tests/python/sdk/test_tasks.py index 3a8faeddef45..59069a4f80ce 100644 --- a/tests/python/sdk/test_tasks.py +++ b/tests/python/sdk/test_tasks.py @@ -367,7 +367,7 @@ def _test_can_create_from_backup(self, fxt_new_task: Task, fxt_backup_file: Path assert task.id assert task.id != fxt_new_task.id assert task.size == fxt_new_task.size - assert "imported sucessfully" in self.logger_stream.getvalue() + assert "imported successfully" in self.logger_stream.getvalue() assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1] assert self.stdout.getvalue() == "" diff --git a/tests/python/shared/fixtures/init.py b/tests/python/shared/fixtures/init.py index c2fd87671521..792fe36825f2 100644 --- a/tests/python/shared/fixtures/init.py +++ b/tests/python/shared/fixtures/init.py @@ -8,6 +8,7 @@ from pathlib import Path from subprocess import PIPE, CalledProcessError, run from time import sleep +from typing import List, Union import pytest import requests @@ -23,7 +24,6 @@ CONTAINER_NAME_FILES = ["docker-compose.tests.yml"] - DC_FILES = [ "docker-compose.dev.yml", "tests/docker-compose.file_share.yml", @@ -85,7 +85,7 @@ def _run(command, capture_output=True): proc = run(_command, check=True) # nosec return stdout, stderr except CalledProcessError as exc: - stderr = exc.stderr.decode() if capture_output else "see above" + stderr = exc.stderr.decode() or exc.stdout.decode() if capture_output else "see above" pytest.exit( f"Command failed: {command}.\n" f"Error message: {stderr}.\n" @@ -120,13 +120,17 @@ def kube_cp(source, target): _run(f"kubectl cp {source} {target}") -def docker_exec_cvat(command): - _run(f"docker exec {PREFIX}_cvat_server_1 {command}") +def docker_exec_cvat(command: Union[List[str], str]): + base = f"docker exec {PREFIX}_cvat_server_1" + _command = f"{base} {command}" if isinstance(command, str) else base.split() + command + return _run(_command) -def kube_exec_cvat(command): +def kube_exec_cvat(command: Union[List[str], str]): pod_name = _kube_get_server_pod_name() - _run(f"kubectl exec {pod_name} -- {command}") + base = f"kubectl exec {pod_name} --" + _command = f"{base} {command}" if isinstance(command, str) else base.split() + command + return _run(_command) def docker_exec_cvat_db(command): @@ -211,7 +215,7 @@ def create_compose_files(container_name_files): for service_name, service_config in dc_config["services"].items(): service_config.pop("container_name", None) - if service_name == "cvat_server": + if service_name in ("cvat_server", "cvat_utils"): service_env = service_config["environment"] service_env["DJANGO_SETTINGS_MODULE"] = "cvat.settings.testing_rest" diff --git a/tests/python/shared/utils/config.py b/tests/python/shared/utils/config.py index f5a3206c5aff..0e9669fce5d0 100644 --- a/tests/python/shared/utils/config.py +++ b/tests/python/shared/utils/config.py @@ -58,6 +58,10 @@ def post_files_method(username, endpoint, data, files, **kwargs): ) +def put_method(username, endpoint, data, **kwargs): + return requests.put(get_api_url(endpoint, **kwargs), json=data, auth=(username, USER_PASS)) + + def server_get(username, endpoint, **kwargs): return requests.get(get_server_url(endpoint, **kwargs), auth=(username, USER_PASS)) diff --git a/tests/python/shared/utils/resource_import_export.py b/tests/python/shared/utils/resource_import_export.py index 5adf8aecf7c4..9ee8bdec26e7 100644 --- a/tests/python/shared/utils/resource_import_export.py +++ b/tests/python/shared/utils/resource_import_export.py @@ -9,7 +9,7 @@ T = TypeVar("T") -from shared.utils.config import get_method, post_method +from shared.utils.config import get_method, post_method, put_method FILENAME_TEMPLATE = "cvat/{}/{}.zip" EXPORT_FORMAT = "CVAT for images 1.1" @@ -117,9 +117,16 @@ def _import_annotations_from_cloud_storage( response = post_method(user, url, data=None, **kwargs) status = response.status_code + # Only the first POST request contains rq_id in response. + # Exclude cases with 403 expected status. + rq_id = None + if status == HTTPStatus.ACCEPTED: + rq_id = response.json().get("rq_id") + assert rq_id, "The rq_id was not found in the response" + while status != _expect_status: assert status == HTTPStatus.ACCEPTED - response = post_method(user, url, data=None, **kwargs) + response = put_method(user, url, data=None, rq_id=rq_id, **kwargs) status = response.status_code if _check_uploaded: @@ -154,9 +161,16 @@ def _import_dataset_from_cloud_storage( response = post_method(user, url, data=None, **kwargs) status = response.status_code + # Only the first POST request contains rq_id in response. + # Exclude cases with 403 expected status. + rq_id = None + if status == HTTPStatus.ACCEPTED: + rq_id = response.json().get("rq_id") + assert rq_id, "The rq_id was not found in the response" + while status != _expect_status: assert status == HTTPStatus.ACCEPTED - response = get_method(user, url, action="import_status") + response = get_method(user, url, action="import_status", rq_id=rq_id) status = response.status_code def _import_resource(self, cloud_storage: Dict[str, Any], resource_type: str, *args, **kwargs): diff --git a/tests/values.test.yaml b/tests/values.test.yaml index a4d90fc12428..e281ecb32143 100644 --- a/tests/values.test.yaml +++ b/tests/values.test.yaml @@ -14,6 +14,10 @@ cvat: - mountPath: /home/django/share name: cvat-backend-data subPath: share + utils: + additionalEnv: + - name: DJANGO_SETTINGS_MODULE + value: cvat.settings.testing_rest # Images are already present in the node imagePullPolicy: Never frontend: