From 496de269705fdb5a5c904e97540c30fb34703fd7 Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Thu, 4 Jan 2024 12:32:50 +0100 Subject: [PATCH 1/9] Fix export to cloud storage && fix enqueuing dependents after canceling rq job --- cvat-core/src/server-proxy.ts | 15 +-- cvat/apps/engine/backup.py | 108 +++++++++++--------- cvat/apps/engine/cloud_provider.py | 22 +++- cvat/apps/engine/mixins.py | 13 ++- cvat/apps/engine/utils.py | 27 +++++ cvat/apps/engine/views.py | 159 +++++++++++++++++------------ tests/python/rest_api/utils.py | 14 +-- 7 files changed, 227 insertions(+), 131 deletions(-) diff --git a/cvat-core/src/server-proxy.ts b/cvat-core/src/server-proxy.ts index bb7b5cf77a70..41c087b04782 100644 --- a/cvat-core/src/server-proxy.ts +++ b/cvat-core/src/server-proxy.ts @@ -795,10 +795,11 @@ function exportDataset(instanceType: 'projects' | 'jobs' | 'tasks') { .then((response) => { const isCloudStorage = targetStorage.location === StorageLocation.CLOUD_STORAGE; const { status } = response; - if (status === 201) params.action = 'download'; - if (status === 202 || (isCloudStorage && status === 201)) { + + if (status === 202) { setTimeout(request, 3000); } else if (status === 201) { + params.action = 'download'; resolve(`${baseURL}?${new URLSearchParams(params).toString()}`); } else if (isCloudStorage && status === 200) { resolve(); @@ -925,10 +926,11 @@ async function backupTask(id: number, targetStorage: Storage, useDefaultSettings }); const isCloudStorage = targetStorage.location === StorageLocation.CLOUD_STORAGE; const { status } = response; - if (status === 201) params.action = 'download'; - if (status === 202 || (isCloudStorage && status === 201)) { + + if (status === 202) { setTimeout(request, 3000); } else if (status === 201) { + params.action = 'download'; resolve(`${url}?${new URLSearchParams(params).toString()}`); } else if (isCloudStorage && status === 200) { resolve(); @@ -1030,10 +1032,11 @@ async function backupProject( }); const isCloudStorage = targetStorage.location === StorageLocation.CLOUD_STORAGE; const { status } = response; - if (status === 201) params.action = 'download'; - if (status === 202 || (isCloudStorage && status === 201)) { + + if (status === 202) { setTimeout(request, 3000); } else if (status === 201) { + params.action = 'download'; resolve(`${url}?${new URLSearchParams(params).toString()}`); } else if (isCloudStorage && status === 200) { resolve(); diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index 2d7f8a2cf19e..d3c322352b30 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -25,7 +25,7 @@ from rest_framework.parsers import JSONParser from rest_framework.renderers import JSONRenderer from rest_framework.response import Response -from rest_framework.exceptions import ValidationError, PermissionDenied, NotFound +from rest_framework.exceptions import ValidationError from distutils.util import strtobool import cvat.apps.dataset_manager as dm @@ -38,12 +38,12 @@ from cvat.apps.engine.utils import ( av_scan_paths, process_failed_job, configure_dependent_job_to_download_from_cs, get_rq_job_meta, get_import_rq_id, import_resource_with_clean_up_after, - sendfile, define_dependent_job, get_rq_lock_by_user + sendfile, define_dependent_job, get_rq_lock_by_user, build_backup_file_name, ) from cvat.apps.engine.models import ( StorageChoice, StorageMethodChoice, DataChoice, Task, Project, Location) from cvat.apps.engine.task import JobFileMapping, _create_thread -from cvat.apps.engine.cloud_provider import db_storage_to_storage_instance +from cvat.apps.engine.cloud_provider import download_file_from_bucket, export_resource_to_cloud_storage from cvat.apps.engine.location import StorageType, get_location_configuration from cvat.apps.engine.view_utils import get_cloud_storage_for_import_or_export from cvat.apps.dataset_manager.views import TASK_CACHE_TTL, PROJECT_CACHE_TTL, get_export_cache_dir, clear_export_cache, log_exception @@ -955,54 +955,49 @@ def export(db_instance, request, queue_name): queue = django_rq.get_queue(queue_name) rq_id = f"export:{obj_type}.id{db_instance.pk}-by-{request.user}" rq_job = queue.fetch_job(rq_id) + + last_instance_update_time = timezone.localtime(db_instance.updated_date) + timestamp = datetime.strftime(last_instance_update_time, "%Y_%m_%d_%H_%M_%S") + location = location_conf.get('location') + if rq_job: - last_project_update_time = timezone.localtime(db_instance.updated_date) rq_request = rq_job.meta.get('request', None) request_time = rq_request.get("timestamp", None) if rq_request else None - if request_time is None or request_time < last_project_update_time: - rq_job.cancel() + if request_time is None or request_time < last_instance_update_time: + # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER + # we have to enqueue dependent jobs after canceling one + rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) rq_job.delete() else: if rq_job.is_finished: - file_path = rq_job.return_value() - if action == "download" and os.path.exists(file_path): - rq_job.delete() + if location == Location.LOCAL: + file_path = rq_job.return_value() + + if not file_path: + return Response('A result for exporting job was not found for finished RQ job', status=status.HTTP_500_INTERNAL_SERVER_ERROR) - timestamp = datetime.strftime(last_project_update_time, - "%Y_%m_%d_%H_%M_%S") - filename = filename or "{}_{}_backup_{}{}".format( - obj_type, db_instance.name, timestamp, - os.path.splitext(file_path)[1]).lower() + elif not os.path.exists(file_path): + return Response('Result file does not exists in export cache', status=status.HTTP_500_INTERNAL_SERVER_ERROR) - location = location_conf.get('location') - if location == Location.LOCAL: + filename = filename or build_backup_file_name( + class_name=obj_type, + identifier=db_instance.name, + timestamp=timestamp, + extension=os.path.splitext(file_path)[1] + ) + + if action == "download": + rq_job.delete() return sendfile(request, file_path, attachment=True, attachment_filename=filename) - elif location == Location.CLOUD_STORAGE: - try: - storage_id = location_conf['storage_id'] - except KeyError: - raise serializers.ValidationError( - 'Cloud storage location was selected as the destination,' - ' but cloud storage id was not specified') - - db_storage = get_cloud_storage_for_import_or_export( - storage_id=storage_id, request=request, - is_default=location_conf['is_default']) - storage = db_storage_to_storage_instance(db_storage) - - try: - storage.upload_file(file_path, filename) - except (ValidationError, PermissionDenied, NotFound) as ex: - msg = str(ex) if not isinstance(ex, ValidationError) else \ - '\n'.join([str(d) for d in ex.detail]) - return Response(data=msg, status=ex.status_code) - return Response(status=status.HTTP_200_OK) - else: - raise NotImplementedError() + + return Response(status=status.HTTP_201_CREATED) + + elif location == Location.CLOUD_STORAGE: + rq_job.delete() + return Response(status=status.HTTP_200_OK) else: - if os.path.exists(file_path): - return Response(status=status.HTTP_201_CREATED) + raise NotImplementedError() elif rq_job.is_failed: exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info)) rq_job.delete() @@ -1014,10 +1009,31 @@ def export(db_instance, request, queue_name): ttl = dm.views.PROJECT_CACHE_TTL.total_seconds() user_id = request.user.id + func = _create_backup if location == Location.LOCAL else export_resource_to_cloud_storage + func_args = (db_instance, Exporter, '{}_backup.zip'.format(obj_type), logger, cache_ttl) + + if location == Location.CLOUD_STORAGE: + try: + storage_id = location_conf['storage_id'] + except KeyError: + raise serializers.ValidationError( + 'Cloud storage location was selected as the destination,' + ' but cloud storage id was not specified') + + db_storage = get_cloud_storage_for_import_or_export( + storage_id=storage_id, request=request, + is_default=location_conf['is_default']) + filename_pattern = build_backup_file_name( + class_name=obj_type, + identifier=db_instance.name, + timestamp=timestamp, + ) + func_args = (db_storage, filename, filename_pattern, _create_backup) + func_args + with get_rq_lock_by_user(queue, user_id): queue.enqueue_call( - func=_create_backup, - args=(db_instance, Exporter, '{}_backup.zip'.format(obj_type), logger, cache_ttl), + func=func, + args=func_args, job_id=rq_id, meta=get_rq_job_meta(request=request, db_obj=db_instance), depends_on=define_dependent_job(queue, user_id, rq_id=rq_id), @@ -1027,12 +1043,6 @@ def export(db_instance, request, queue_name): return Response(status=status.HTTP_202_ACCEPTED) -def _download_file_from_bucket(db_storage, filename, key): - storage = db_storage_to_storage_instance(db_storage) - - with storage.download_fileobj(key) as data, open(filename, 'wb+') as f: - f.write(data.getbuffer()) - def _import(importer, request, queue, rq_id, Serializer, file_field_name, location_conf, filename=None): rq_job = queue.fetch_job(rq_id) @@ -1077,7 +1087,7 @@ def _import(importer, request, queue, rq_id, Serializer, file_field_name, locati dependent_job = configure_dependent_job_to_download_from_cs( queue=queue, rq_id=rq_id, - rq_func=_download_file_from_bucket, + rq_func=download_file_from_bucket, db_storage=db_storage, filename=filename, key=key, diff --git a/cvat/apps/engine/cloud_provider.py b/cvat/apps/engine/cloud_provider.py index 8a7e3bd3a8ab..316722e6550d 100644 --- a/cvat/apps/engine/cloud_provider.py +++ b/cvat/apps/engine/cloud_provider.py @@ -10,7 +10,7 @@ from enum import Enum from io import BytesIO from multiprocessing.pool import ThreadPool -from typing import Dict, List, Optional, Any +from typing import Dict, List, Optional, Any, Callable import boto3 from azure.core.exceptions import HttpResponseError, ResourceExistsError @@ -962,3 +962,23 @@ def db_storage_to_storage_instance(db_storage): 'specific_attributes': db_storage.get_specific_attributes() } return get_cloud_storage_instance(cloud_provider=db_storage.provider_type, **details) + +def download_file_from_bucket(db_storage: Any, filename: str, key: str) -> None: + storage = db_storage_to_storage_instance(db_storage) + + with storage.download_fileobj(key) as data, open(filename, 'wb+') as f: + f.write(data.getbuffer()) + +def export_resource_to_cloud_storage( + db_storage: Any, + key: str, + key_pattern: str, + func: Callable[[int, Optional[str], Optional[str]], str], + *args, + **kwargs, +) -> str: + file_path = func(*args, **kwargs) + storage = db_storage_to_storage_instance(db_storage) + storage.upload_file(file_path, key if key else key_pattern.format(os.path.splitext(file_path)[1].lower())) + + return file_path \ No newline at end of file diff --git a/cvat/apps/engine/mixins.py b/cvat/apps/engine/mixins.py index 81ee63ca5425..b0546cb830ef 100644 --- a/cvat/apps/engine/mixins.py +++ b/cvat/apps/engine/mixins.py @@ -13,6 +13,7 @@ from pathlib import Path from tempfile import NamedTemporaryFile from unittest import mock +from typing import Optional, Callable, Dict, Any import django_rq from django.conf import settings @@ -384,7 +385,15 @@ def upload_finished(self, request): raise NotImplementedError('Must be implemented in the derived class') class AnnotationMixin: - def export_annotations(self, request, db_obj, export_func, callback, get_data=None): + def export_annotations( + self, + request, + db_obj, + export_func, + callback: Callable[[int, Optional[str], Optional[str]], str], + *, + get_data: Optional[Callable[[int], Dict[str, Any]]]= None, + ): format_name = request.query_params.get("format", "") action = request.query_params.get("action", "").lower() filename = request.query_params.get("filename", "") @@ -399,7 +408,7 @@ def export_annotations(self, request, db_obj, export_func, callback, get_data=No ) object_name = self._object.__class__.__name__.lower() - rq_id = f"export:annotations-for-{object_name}.id{self._object.pk}-in-{format_name.replace(' ', '_')}-format" + rq_id = f"export:{request.path.strip('/').split('/')[-1]}-for-{object_name}.id{self._object.pk}-in-{format_name.replace(' ', '_')}-format" if format_name: return export_func(db_instance=self._object, diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index c4fa1a34957a..12beb2d5762f 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -409,3 +409,30 @@ def preload_image(image: tuple[str, str, str])-> tuple[Image.Image, str, str]: def preload_images(images: Iterable[tuple[str, str, str]]) -> list[tuple[Image.Image, str, str]]: return list(map(preload_image, images)) + +def build_backup_file_name( + *, + class_name: str, + identifier: str | int, + timestamp: str, + extension: str = "{}", +) -> str: + # "__backup_.zip" + return "{}_{}_backup_{}{}".format( + class_name, identifier, timestamp, extension, + ).lower() + +def build_annotations_file_name( + *, + class_name: str, + identifier: str | int, + timestamp: str, + format_name: str, + is_annotation_file: bool = True, + extension: str = "{}", +) -> str: + # "____.zip" + return "{}_{}_{}_{}_{}{}".format( + class_name, identifier, 'annotations' if is_annotation_file else 'dataset', + timestamp, format_name, extension, + ).lower() \ No newline at end of file diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index d85bfdf964f1..841d126377a0 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -7,7 +7,7 @@ import os.path as osp from PIL import Image from types import SimpleNamespace -from typing import Optional, Any, Dict, List, cast +from typing import Optional, Any, Dict, List, cast, Callable import traceback import textwrap from copy import copy @@ -20,9 +20,9 @@ from django.conf import settings from django.contrib.auth.models import User from django.db import IntegrityError, transaction -from django.db.models import Count +from django.db.models import Count, Model from django.db.models.query import Prefetch -from django.http import HttpResponse, HttpResponseNotFound, HttpResponseBadRequest +from django.http import HttpResponse, HttpRequest, HttpResponseNotFound, HttpResponseBadRequest from django.utils import timezone from drf_spectacular.types import OpenApiTypes @@ -42,7 +42,7 @@ import cvat.apps.dataset_manager as dm import cvat.apps.dataset_manager.views # pylint: disable=unused-import -from cvat.apps.engine.cloud_provider import db_storage_to_storage_instance +from cvat.apps.engine.cloud_provider import db_storage_to_storage_instance, download_file_from_bucket, export_resource_to_cloud_storage from cvat.apps.dataset_manager.bindings import CvatImportError from cvat.apps.dataset_manager.serializers import DatasetFormatsSerializer from cvat.apps.engine.frame_provider import FrameProvider @@ -71,7 +71,8 @@ from cvat.apps.engine.utils import ( av_scan_paths, process_failed_job, configure_dependent_job_to_download_from_cs, parse_exception_message, get_rq_job_meta, get_import_rq_id, - import_resource_with_clean_up_after, sendfile, define_dependent_job, get_rq_lock_by_user + import_resource_with_clean_up_after, sendfile, define_dependent_job, get_rq_lock_by_user, + build_annotations_file_name, ) from cvat.apps.engine import backup from cvat.apps.engine.mixins import PartialUpdateModelMixin, UploadMixin, AnnotationMixin, SerializeMixin @@ -292,7 +293,7 @@ def perform_create(self, serializer, **kwargs): location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), OpenApiParameter('filename', description='Desired output file name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), - OpenApiParameter('action', description='Used to start downloading process after annotation file had been created', + OpenApiParameter('action', description='Used to start downloading process locally after annotation file has been created', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, enum=['download', 'import_status']), OpenApiParameter('location', description='Where need to save downloaded dataset', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, @@ -456,7 +457,7 @@ def upload_finished(self, request): location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=True), OpenApiParameter('filename', description='Desired output file name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), - OpenApiParameter('action', description='Used to start downloading process after annotation file had been created', + OpenApiParameter('action', description='Used to start downloading process locally after annotation file has been created', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, enum=['download']), OpenApiParameter('location', description='Where need to save downloaded dataset', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, @@ -1233,7 +1234,7 @@ def append_data_chunk(self, request, pk, file_id): OpenApiParameter('filename', description='Desired output file name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), OpenApiParameter('action', location=OpenApiParameter.QUERY, - description='Used to start downloading process after annotation file had been created', + description='Used to start downloading process locally after annotation file has been created', type=OpenApiTypes.STR, required=False, enum=['download']), OpenApiParameter('location', description='Where need to save downloaded dataset', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, @@ -1484,7 +1485,7 @@ def metadata(self, request, pk): OpenApiParameter('filename', description='Desired output file name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), OpenApiParameter('action', location=OpenApiParameter.QUERY, - description='Used to start downloading process after annotation file had been created', + description='Used to start downloading process locally after annotation file has been created', type=OpenApiTypes.STR, required=False, enum=['download']), OpenApiParameter('use_default_location', description='Use the location that was configured in task to export annotations', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL, required=False, @@ -1668,7 +1669,7 @@ def upload_finished(self, request): OpenApiParameter('filename', description='Desired output file name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), OpenApiParameter('action', location=OpenApiParameter.QUERY, - description='Used to start downloading process after annotation file had been created', + description='Used to start downloading process locally after annotation file has been created', type=OpenApiTypes.STR, required=False, enum=['download']), OpenApiParameter('location', description='Where need to save downloaded annotation', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False, @@ -1845,7 +1846,7 @@ def append_annotations_chunk(self, request, pk, file_id): OpenApiParameter('filename', description='Desired output file name', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR, required=False), OpenApiParameter('action', location=OpenApiParameter.QUERY, - description='Used to start downloading process after annotation file had been created', + description='Used to start downloading process locally after annotation file has been created', type=OpenApiTypes.STR, required=False, enum=['download']), OpenApiParameter('use_default_location', description='Use the location that was configured in the task to export dataset', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL, required=False, @@ -2801,12 +2802,6 @@ def rq_exception_handler(rq_job, exc_type, exc_value, tb): return True -def _download_file_from_bucket(db_storage, filename, key): - storage = db_storage_to_storage_instance(db_storage) - - with storage.download_fileobj(key) as data, open(filename, 'wb+') as f: - f.write(data.getbuffer()) - def _import_annotations(request, rq_id_template, rq_func, db_obj, format_name, filename=None, location_conf=None, conv_mask_to_poly=True): @@ -2879,7 +2874,7 @@ def _import_annotations(request, rq_id_template, rq_func, db_obj, format_name, dependent_job = configure_dependent_job_to_download_from_cs( queue=queue, rq_id=rq_id, - rq_func=_download_file_from_bucket, + rq_func=download_file_from_bucket, db_storage=db_storage, filename=filename, key=key, @@ -2928,8 +2923,16 @@ def _import_annotations(request, rq_id_template, rq_func, db_obj, format_name, return Response(status=status.HTTP_202_ACCEPTED) -def _export_annotations(db_instance, rq_id, request, format_name, action, callback, - filename, location_conf): +def _export_annotations( + db_instance: Model, + rq_id: str, + request: HttpRequest, + format_name: str, + action: str, + callback: Callable[[int, Optional[str], Optional[str]], str], + filename: Optional[str], + location_conf: Dict[str, Any] +): if action not in {"", "download"}: raise serializers.ValidationError( "Unexpected action specified for the request") @@ -2945,60 +2948,59 @@ def _export_annotations(db_instance, rq_id, request, format_name, action, callba queue = django_rq.get_queue(settings.CVAT_QUEUES.EXPORT_DATA.value) rq_job = queue.fetch_job(rq_id) + location = location_conf.get('location') + if location not in Location.list(): + raise serializers.ValidationError( + f"Unexpected location {location} specified for the request" + ) + + last_instance_update_time = timezone.localtime(db_instance.updated_date) + if isinstance(db_instance, Project): + tasks_update = list(map(lambda db_task: timezone.localtime(db_task.updated_date), db_instance.tasks.all())) + last_instance_update_time = max(tasks_update + [last_instance_update_time]) + + timestamp = datetime.strftime(last_instance_update_time, "%Y_%m_%d_%H_%M_%S") + are_annotations_exported = 'annotations' in rq_id + if rq_job: - last_instance_update_time = timezone.localtime(db_instance.updated_date) - if isinstance(db_instance, Project): - tasks_update = list(map(lambda db_task: timezone.localtime(db_task.updated_date), db_instance.tasks.all())) - last_instance_update_time = max(tasks_update + [last_instance_update_time]) rq_request = rq_job.meta.get('request', None) request_time = rq_request.get('timestamp', None) if rq_request else None if request_time is None or request_time < last_instance_update_time: - rq_job.cancel() + # in case the server is configured with ONE_RUNNING_JOB_IN_QUEUE_PER_USER + # we have to enqueue dependent jobs after canceling one + rq_job.cancel(enqueue_dependents=settings.ONE_RUNNING_JOB_IN_QUEUE_PER_USER) rq_job.delete() else: if rq_job.is_finished: - file_path = rq_job.return_value() - if action == "download" and osp.exists(file_path): + if location == Location.CLOUD_STORAGE: rq_job.delete() + return Response(status=status.HTTP_200_OK) + + elif location == Location.LOCAL: + file_path = rq_job.return_value() + + if not file_path: + return Response('A result for exporting job was not found for finished RQ job', status=status.HTTP_500_INTERNAL_SERVER_ERROR) + elif not osp.exists(file_path): + return Response('Result file does not exists in export cache', status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + if action == "download": + filename = filename or \ + build_annotations_file_name( + class_name=db_instance.__class__.__name__, + identifier=db_instance.name if isinstance(db_instance, (Task, Project)) else db_instance.id, + timestamp=timestamp, + format_name=format_name, + is_annotation_file=are_annotations_exported, + extension=osp.splitext(file_path)[1] + ) + + rq_job.delete() + return sendfile(request, file_path, attachment=True, attachment_filename=filename) - timestamp = datetime.strftime(last_instance_update_time, - "%Y_%m_%d_%H_%M_%S") - filename = filename or \ - "{}_{}-{}-{}{}".format( - db_instance.__class__.__name__.lower(), - db_instance.name if isinstance(db_instance, (Task, Project)) else db_instance.id, - timestamp, format_name, osp.splitext(file_path)[1] - ).lower() - - # save annotation to specified location - location = location_conf.get('location') - if location == Location.LOCAL: - return sendfile(request, file_path, attachment=True, - attachment_filename=filename) - elif location == Location.CLOUD_STORAGE: - try: - storage_id = location_conf['storage_id'] - except KeyError: - return HttpResponseBadRequest( - 'Cloud storage location was selected as the destination,' - ' but cloud storage id was not specified') - db_storage = get_cloud_storage_for_import_or_export( - storage_id=storage_id, request=request, - is_default=location_conf['is_default']) - storage = db_storage_to_storage_instance(db_storage) - - try: - storage.upload_file(file_path, filename) - except (ValidationError, PermissionDenied, NotFound) as ex: - msg = str(ex) if not isinstance(ex, ValidationError) else \ - '\n'.join([str(d) for d in ex.detail]) - return Response(data=msg, status=ex.status_code) - return Response(status=status.HTTP_200_OK) - else: - raise NotImplementedError() + return Response(status=status.HTTP_201_CREATED) else: - if osp.exists(file_path): - return Response(status=status.HTTP_201_CREATED) + raise NotImplementedError(f"Export to {location} location is not implemented yet") elif rq_job.is_failed: exc_info = rq_job.meta.get('formatted_exception', str(rq_job.exc_info)) rq_job.delete() @@ -3022,10 +3024,33 @@ def _export_annotations(db_instance, rq_id, request, format_name, action, callba ttl = TTL_CONSTS[db_instance.__class__.__name__.lower()].total_seconds() user_id = request.user.id + func = callback if location == Location.LOCAL else export_resource_to_cloud_storage + func_args = (db_instance.id, format_name, server_address) + + if location == Location.CLOUD_STORAGE: + try: + storage_id = location_conf['storage_id'] + except KeyError: + raise serializers.ValidationError( + 'Cloud storage location was selected as the destination,' + ' but cloud storage id was not specified') + + db_storage = get_cloud_storage_for_import_or_export( + storage_id=storage_id, request=request, + is_default=location_conf['is_default']) + filename_pattern = build_annotations_file_name( + class_name=db_instance.__class__.__name__, + identifier=db_instance.name if isinstance(db_instance, (Task, Project)) else db_instance.id, + timestamp=timestamp, + format_name=format_name, + is_annotation_file=are_annotations_exported, + ) + func_args = (db_storage, filename, filename_pattern, callback) + func_args + with get_rq_lock_by_user(queue, user_id): queue.enqueue_call( - func=callback, - args=(db_instance.id, format_name, server_address), + func=func, + args=func_args, job_id=rq_id, meta=get_rq_job_meta(request=request, db_obj=db_instance), depends_on=define_dependent_job(queue, user_id, rq_id=rq_id), @@ -3090,7 +3115,7 @@ def _import_project_dataset(request, rq_id_template, rq_func, db_obj, format_nam dependent_job = configure_dependent_job_to_download_from_cs( queue=queue, rq_id=rq_id, - rq_func=_download_file_from_bucket, + rq_func=download_file_from_bucket, db_storage=db_storage, filename=filename, key=key, diff --git a/tests/python/rest_api/utils.py b/tests/python/rest_api/utils.py index e49c40496429..a8a897b7bb0f 100644 --- a/tests/python/rest_api/utils.py +++ b/tests/python/rest_api/utils.py @@ -27,16 +27,18 @@ def export_dataset( ) -> HTTPResponse: for _ in range(max_retries): (_, response) = endpoint.call_with_http_info(**kwargs, format=format, _parse_response=False) - if response.status == HTTPStatus.CREATED: + if response.status in (HTTPStatus.CREATED, HTTPStatus.OK): break assert response.status == HTTPStatus.ACCEPTED sleep(interval) - assert response.status == HTTPStatus.CREATED + else: + assert False, f"Export process was not finished within allowed time ({interval * max_retries}, sec)" - (_, response) = endpoint.call_with_http_info( - **kwargs, format=format, action="download", _parse_response=False - ) - assert response.status == HTTPStatus.OK + if response.status == HTTPStatus.CREATED: + (_, response) = endpoint.call_with_http_info( + **kwargs, format=format, action="download", _parse_response=False + ) + assert response.status == HTTPStatus.OK return response From afd743493f9aab157a2ddd053d5401e5987089fa Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Fri, 5 Jan 2024 13:11:02 +0100 Subject: [PATCH 2/9] Update schema && fix black/pylint issues --- cvat/apps/engine/cloud_provider.py | 2 +- cvat/apps/engine/utils.py | 2 +- cvat/schema.yml | 24 ++++++++++++------------ tests/python/rest_api/utils.py | 4 +++- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/cvat/apps/engine/cloud_provider.py b/cvat/apps/engine/cloud_provider.py index 316722e6550d..1ddf88c95bdb 100644 --- a/cvat/apps/engine/cloud_provider.py +++ b/cvat/apps/engine/cloud_provider.py @@ -981,4 +981,4 @@ def export_resource_to_cloud_storage( storage = db_storage_to_storage_instance(db_storage) storage.upload_file(file_path, key if key else key_pattern.format(os.path.splitext(file_path)[1].lower())) - return file_path \ No newline at end of file + return file_path diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 12beb2d5762f..ee11a65bec94 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -435,4 +435,4 @@ def build_annotations_file_name( return "{}_{}_{}_{}_{}{}".format( class_name, identifier, 'annotations' if is_annotation_file else 'dataset', timestamp, format_name, extension, - ).lower() \ No newline at end of file + ).lower() diff --git a/cvat/schema.yml b/cvat/schema.yml index 82d361623f17..c3a74c3cb93e 100644 --- a/cvat/schema.yml +++ b/cvat/schema.yml @@ -2021,8 +2021,8 @@ paths: type: string enum: - download - description: Used to start downloading process after annotation file had been - created + description: Used to start downloading process locally after annotation file + has been created - in: query name: cloud_storage_id schema: @@ -2412,8 +2412,8 @@ paths: type: string enum: - download - description: Used to start downloading process after annotation file had been - created + description: Used to start downloading process locally after annotation file + has been created - in: query name: cloud_storage_id schema: @@ -3460,8 +3460,8 @@ paths: type: string enum: - download - description: Used to start downloading process after annotation file had been - created + description: Used to start downloading process locally after annotation file + has been created - in: query name: cloud_storage_id schema: @@ -3602,8 +3602,8 @@ paths: enum: - download - import_status - description: Used to start downloading process after annotation file had been - created + description: Used to start downloading process locally after annotation file + has been created - in: query name: cloud_storage_id schema: @@ -4787,8 +4787,8 @@ paths: type: string enum: - download - description: Used to start downloading process after annotation file had been - created + description: Used to start downloading process locally after annotation file + has been created - in: query name: cloud_storage_id schema: @@ -5311,8 +5311,8 @@ paths: type: string enum: - download - description: Used to start downloading process after annotation file had been - created + description: Used to start downloading process locally after annotation file + has been created - in: query name: cloud_storage_id schema: diff --git a/tests/python/rest_api/utils.py b/tests/python/rest_api/utils.py index a8a897b7bb0f..64d48cdd932b 100644 --- a/tests/python/rest_api/utils.py +++ b/tests/python/rest_api/utils.py @@ -32,7 +32,9 @@ def export_dataset( assert response.status == HTTPStatus.ACCEPTED sleep(interval) else: - assert False, f"Export process was not finished within allowed time ({interval * max_retries}, sec)" + assert ( + False + ), f"Export process was not finished within allowed time ({interval * max_retries}, sec)" if response.status == HTTPStatus.CREATED: (_, response) = endpoint.call_with_http_info( From f8953b5693e7bc025799cb4d1060ddf90df331f3 Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Mon, 8 Jan 2024 13:33:18 +0100 Subject: [PATCH 3/9] Update cypress tests --- tests/cypress/support/commands_projects.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cypress/support/commands_projects.js b/tests/cypress/support/commands_projects.js index ce8648b2ad61..8cab25a80587 100644 --- a/tests/cypress/support/commands_projects.js +++ b/tests/cypress/support/commands_projects.js @@ -254,7 +254,7 @@ Cypress.Commands.add('getDownloadFileName', () => { }); Cypress.Commands.add('waitForFileUploadToCloudStorage', () => { - cy.intercept('GET', '**=download').as('download'); + cy.intercept('GET', /.*\/(annotations|dataset|backup)/).as('download'); cy.wait('@download', { requestTimeout: 7000 }).then((interseption) => { expect(interseption.response.statusCode).to.be.equal(200); }); From eb6db31474831b12593ab63175996e10feb22dd4 Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Mon, 8 Jan 2024 23:14:52 +0100 Subject: [PATCH 4/9] Fix one more cypress test --- .../case_103_project_export.js | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/cypress/e2e/actions_projects_models/case_103_project_export.js b/tests/cypress/e2e/actions_projects_models/case_103_project_export.js index bbaecb505305..9300706cdb84 100644 --- a/tests/cypress/e2e/actions_projects_models/case_103_project_export.js +++ b/tests/cypress/e2e/actions_projects_models/case_103_project_export.js @@ -95,11 +95,7 @@ context('Export project dataset.', { browser: '!firefox' }, () => { dumpType: 'CVAT for images', }; cy.exportProject(exportAnnotation); - cy.getDownloadFileName().then((file) => { - datasetArchiveName = file; - cy.verifyDownload(datasetArchiveName); - }); - cy.verifyNotification(); + cy.waitForDownload(); }); it('Export project dataset. Dataset.', () => { @@ -111,10 +107,14 @@ context('Export project dataset.', { browser: '!firefox' }, () => { dumpType: 'CVAT for images', }; cy.exportProject(exportDataset); - cy.waitForDownload(); + cy.getDownloadFileName().then((file) => { + datasetArchiveName = file; + cy.verifyDownload(datasetArchiveName); + }); + cy.verifyNotification(); }); - it('Export project dataset. Annotation. Rename a archive.', () => { + it('Export project dataset. Annotation. Rename an archive.', () => { cy.goToProjectsList(); const exportAnnotationsRenameArchive = { projectName, From 694d3fc5d675d4a36a567539461e2f9773e3cbd9 Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Mon, 8 Jan 2024 23:24:42 +0100 Subject: [PATCH 5/9] Add changelog --- .../20240108_231721_maria_fix_export_to_cloud_storage.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 changelog.d/20240108_231721_maria_fix_export_to_cloud_storage.md diff --git a/changelog.d/20240108_231721_maria_fix_export_to_cloud_storage.md b/changelog.d/20240108_231721_maria_fix_export_to_cloud_storage.md new file mode 100644 index 000000000000..13554c9e98fc --- /dev/null +++ b/changelog.d/20240108_231721_maria_fix_export_to_cloud_storage.md @@ -0,0 +1,6 @@ +### Fixed + +- 504 Timeout error when exporting resources to cloud storage + () +- Enqueuing deferred jobs when their dependencies have been started -> cancelled -> restarted -> finished + () From b92cc8e3e1caf89f0c1fac3e219b2bceee489e6e Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Mon, 8 Jan 2024 23:26:31 +0100 Subject: [PATCH 6/9] v14.0.1 --- cvat-core/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvat-core/package.json b/cvat-core/package.json index 459687b21e38..d04d89c46d1d 100644 --- a/cvat-core/package.json +++ b/cvat-core/package.json @@ -1,6 +1,6 @@ { "name": "cvat-core", - "version": "14.0.0", + "version": "14.0.1", "type": "module", "description": "Part of Computer Vision Tool which presents an interface for client-side integration", "main": "src/api.ts", From e6f85bb88d441b197cafe077c5bab65a0d551802 Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Mon, 8 Jan 2024 23:28:16 +0100 Subject: [PATCH 7/9] v14.0.4 --- cvat-core/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvat-core/package.json b/cvat-core/package.json index 2f53bfef2d6a..c48a10aa4b92 100644 --- a/cvat-core/package.json +++ b/cvat-core/package.json @@ -1,6 +1,6 @@ { "name": "cvat-core", - "version": "14.0.3", + "version": "14.0.4", "type": "module", "description": "Part of Computer Vision Tool which presents an interface for client-side integration", "main": "src/api.ts", From 51e34590b5a7771b85dedd7b3b7cda4c0a1937f1 Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Tue, 9 Jan 2024 13:11:17 +0100 Subject: [PATCH 8/9] Remove unused imports --- cvat/apps/engine/backup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index a41c2a301dd1..388a8320007c 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -26,7 +26,7 @@ from rest_framework.parsers import JSONParser from rest_framework.renderers import JSONRenderer from rest_framework.response import Response -from rest_framework.exceptions import ValidationError, PermissionDenied, NotFound +from rest_framework.exceptions import ValidationError import cvat.apps.dataset_manager as dm from cvat.apps.engine import models From 5e616f63aa9704964b22d44205e8339cfa920b47 Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Thu, 11 Jan 2024 16:51:50 +0100 Subject: [PATCH 9/9] Small fixes --- cvat/apps/engine/backup.py | 2 +- cvat/apps/engine/views.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index 388a8320007c..10dead22404d 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -977,7 +977,7 @@ def export(db_instance, request, queue_name): return Response('A result for exporting job was not found for finished RQ job', status=status.HTTP_500_INTERNAL_SERVER_ERROR) elif not os.path.exists(file_path): - return Response('Result file does not exists in export cache', status=status.HTTP_500_INTERNAL_SERVER_ERROR) + return Response('The result file does not exist in export cache', status=status.HTTP_500_INTERNAL_SERVER_ERROR) filename = filename or build_backup_file_name( class_name=obj_type, diff --git a/cvat/apps/engine/views.py b/cvat/apps/engine/views.py index c415939393f8..5d33c0430cc6 100644 --- a/cvat/apps/engine/views.py +++ b/cvat/apps/engine/views.py @@ -20,7 +20,7 @@ from django.conf import settings from django.contrib.auth.models import User from django.db import IntegrityError, transaction -from django.db.models import Count, Model +from django.db.models import Count from django.db.models.query import Prefetch from django.http import HttpResponse, HttpRequest, HttpResponseNotFound, HttpResponseBadRequest from django.utils import timezone @@ -2924,7 +2924,7 @@ def _import_annotations(request, rq_id_template, rq_func, db_obj, format_name, return Response(status=status.HTTP_202_ACCEPTED) def _export_annotations( - db_instance: Model, + db_instance: models.Project | models.Task | models.Job, rq_id: str, request: HttpRequest, format_name: str, @@ -2960,7 +2960,7 @@ def _export_annotations( last_instance_update_time = max(tasks_update + [last_instance_update_time]) timestamp = datetime.strftime(last_instance_update_time, "%Y_%m_%d_%H_%M_%S") - are_annotations_exported = 'annotations' in rq_id + is_annotation_file = rq_id.startswith('export:annotations') if rq_job: rq_request = rq_job.meta.get('request', None) @@ -2982,7 +2982,7 @@ def _export_annotations( if not file_path: return Response('A result for exporting job was not found for finished RQ job', status=status.HTTP_500_INTERNAL_SERVER_ERROR) elif not osp.exists(file_path): - return Response('Result file does not exists in export cache', status=status.HTTP_500_INTERNAL_SERVER_ERROR) + return Response('The result file does not exist in export cache', status=status.HTTP_500_INTERNAL_SERVER_ERROR) if action == "download": filename = filename or \ @@ -2991,7 +2991,7 @@ def _export_annotations( identifier=db_instance.name if isinstance(db_instance, (Task, Project)) else db_instance.id, timestamp=timestamp, format_name=format_name, - is_annotation_file=are_annotations_exported, + is_annotation_file=is_annotation_file, extension=osp.splitext(file_path)[1] ) @@ -3043,7 +3043,7 @@ def _export_annotations( identifier=db_instance.name if isinstance(db_instance, (Task, Project)) else db_instance.id, timestamp=timestamp, format_name=format_name, - is_annotation_file=are_annotations_exported, + is_annotation_file=is_annotation_file, ) func_args = (db_storage, filename, filename_pattern, callback) + func_args