Skip to content

Commit

Permalink
Fixed resources import (#5909)
Browse files Browse the repository at this point in the history
Fixed:
- wrong location of tmp file when importing job annotations
- ```Traceback (most recent call last):
File
"/home/maya/Documents/cvat/.env/lib/python3.8/site-packages/rq/worker.py",
line 795, in work
      self.execute_job(job, queue)
File "/home/maya/Documents/cvat/cvat/rqworker.py", line 37, in
execute_job
      return self.perform_job(*args, **kwargs)
File
"/home/maya/Documents/cvat/.env/lib/python3.8/site-packages/rq/worker.py",
line 1389, in perform_job
      self.handle_exception(job, *exc_info)
File
"/home/maya/Documents/cvat/.env/lib/python3.8/site-packages/rq/worker.py",
line 1438, in handle_exception
      fallthrough = handler(job, *exc_info)
File "/home/maya/Documents/cvat/cvat/apps/engine/views.py", line 2233,
in rq_exception_handler
      rq_job.exc_info = "".join(
  AttributeError: can't set attribute
  ```

Resolves #5773
Resolves #5563

- root causes of the issues: 
  - the annotation file was uploaded to the server by tus protocol and
rq job was created but no one next requests for checking status were not
made. (e.g. user closed the browser tab)
  - the annotation file was uploaded to the server by tus protocol but
rq job has not yet been created (e.g cvat instance restarted)
  - tasks/projects creation from backups with the same name at the
same time by different users

Co-authored-by: Roman Donchenko <roman@cvat.ai>
Co-authored-by: Maxim Zhiltsov <zhiltsov.max35@gmail.com>
  • Loading branch information
3 people authored Jun 2, 2023
1 parent 2584b96 commit f7fd06c
Show file tree
Hide file tree
Showing 33 changed files with 714 additions and 136 deletions.
23 changes: 23 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,28 @@
"env": {},
"console": "internalConsole"
},
{
"name": "server: RQ - cleaning",
"type": "python",
"request": "launch",
"stopOnEntry": false,
"justMyCode": false,
"python": "${command:python.interpreterPath}",
"program": "${workspaceRoot}/manage.py",
"args": [
"rqworker",
"cleaning",
"--worker-class",
"cvat.rqworker.SimpleWorker"
],
"django": true,
"cwd": "${workspaceFolder}",
"env": {
"DJANGO_LOG_SERVER_HOST": "localhost",
"DJANGO_LOG_SERVER_PORT": "8282"
},
"console": "internalConsole"
},
{
"name": "server: migrate",
"type": "python",
Expand Down Expand Up @@ -433,6 +455,7 @@
"server: RQ - annotation",
"server: RQ - webhooks",
"server: RQ - scheduler",
"server: RQ - cleaning",
"server: git",
]
}
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Support task creation with any type of data supported by the server by default from cloud storage
without use_cache option (<https://github.com/opencv/cvat/pull/6074>)
- Support task creation with cloud storage data and without use_cache option (<https://github.com/opencv/cvat/pull/6074>)
- Cleaning worker to check that the uploaded resource has been deleted or delete otherwise (<https://github.com/opencv/cvat/pull/5909>)

### Changed
- Resource links are opened from any organization/sandbox if available for user (<https://github.com/opencv/cvat/pull/5892>)
Expand All @@ -31,6 +32,12 @@ without use_cache option (<https://github.com/opencv/cvat/pull/6074>)
### Fixed
- Skeletons dumping on created tasks/projects (<https://github.com/opencv/cvat/pull/6157>)
- Fix saving annotations for skeleton tracks (<https://github.com/opencv/cvat/pull/6075>)
- Wrong location of tmp file when importing job annotations (<https://github.com/opencv/cvat/pull/5909>)
- Removing uploaded file with annotations/backups when rq job was created
but no next requests for checking status were not made (<https://github.com/opencv/cvat/pull/5909>)
- Removing uploaded file with annotations/backups after file was uploaded to the server by tus protocol
but rq job has not yet been created (<https://github.com/opencv/cvat/pull/5909>)
- Tasks/projects creation from backups with the same name at the same time by different users (<https://github.com/opencv/cvat/pull/5909>)

### Security
- TDB
Expand Down
18 changes: 12 additions & 6 deletions cvat-core/src/server-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,14 @@ async function importDataset(
};

const url = `${backendAPI}/projects/${id}/dataset`;
let rqId: string;

async function wait() {
return new Promise<void>((resolve, reject) => {
async function requestStatus() {
try {
const response = await Axios.get(url, {
params: { ...params, action: 'import_status' },
params: { ...params, action: 'import_status', rq_id: rqId },
});
if (response.status === 202) {
if (response.data.message) {
Expand All @@ -812,10 +813,11 @@ async function importDataset(

if (isCloudStorage) {
try {
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
Expand All @@ -837,11 +839,12 @@ async function importDataset(
headers: { 'Upload-Start': true },
});
await chunkUpload(file, uploadConfig);
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
headers: { 'Upload-Finish': true },
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
Expand Down Expand Up @@ -1617,6 +1620,7 @@ async function uploadAnnotations(
filename: typeof file === 'string' ? file : file.name,
conv_mask_to_poly: options.convMaskToPoly,
};
let rqId: string;

const url = `${backendAPI}/${session}s/${id}/annotations`;
async function wait() {
Expand All @@ -1627,7 +1631,7 @@ async function uploadAnnotations(
url,
new FormData(),
{
params,
params: { ...params, rq_id: rqId },
},
);
if (response.status === 202) {
Expand All @@ -1646,10 +1650,11 @@ async function uploadAnnotations(

if (isCloudStorage) {
try {
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
Expand All @@ -1667,11 +1672,12 @@ async function uploadAnnotations(
headers: { 'Upload-Start': true },
});
await chunkUpload(file, uploadConfig);
await Axios.post(url,
const response = await Axios.post(url,
new FormData(), {
params,
headers: { 'Upload-Finish': true },
});
rqId = response.data.rq_id;
} catch (errorData) {
throw generateError(errorData);
}
Expand Down
2 changes: 1 addition & 1 deletion cvat-sdk/cvat_sdk/core/proxies/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,6 @@ def create_from_backup(
)

task_id = json.loads(response.data)["id"]
self._client.logger.info(f"Task has been imported sucessfully. Task ID: {task_id}")
self._client.logger.info(f"Task has been imported successfully. Task ID: {task_id}")

return self.retrieve(task_id)
27 changes: 21 additions & 6 deletions cvat-sdk/cvat_sdk/core/uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from __future__ import annotations

import json
import os
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple
Expand Down Expand Up @@ -89,6 +90,7 @@ def create_url(self):
headers["upload-length"] = str(self.file_size)
headers["upload-metadata"] = ",".join(self.encode_metadata())
resp = self._api_client.rest_client.POST(self.client.url, headers=headers)
self.real_filename = resp.headers.get("Upload-Filename")
url = resp.headers.get("location")
if url is None:
msg = "Attempt to retrieve create file url with status {}".format(resp.status_code)
Expand Down Expand Up @@ -179,9 +181,10 @@ def upload_file(
assert meta["filename"]

self._tus_start_upload(url, query_params=query_params)
self._upload_file_data_with_tus(
real_filename = self._upload_file_data_with_tus(
url=url, filename=filename, meta=meta, pbar=pbar, logger=logger
)
query_params["filename"] = real_filename
return self._tus_finish_upload(url, query_params=query_params, fields=fields)

def _wait_for_completion(
Expand Down Expand Up @@ -216,7 +219,9 @@ def _make_tus_uploader(api_client: ApiClient, url: str, **kwargs):

return _MyTusUploader(client=client, api_client=api_client, **kwargs)

def _upload_file_data_with_tus(self, url, filename, *, meta=None, pbar=None, logger=None):
def _upload_file_data_with_tus(
self, url, filename, *, meta=None, pbar=None, logger=None
) -> str:
file_size = filename.stat().st_size
if pbar is None:
pbar = NullProgressReporter()
Expand All @@ -233,6 +238,7 @@ def _upload_file_data_with_tus(self, url, filename, *, meta=None, pbar=None, log
log_func=logger,
)
tus_uploader.upload()
return tus_uploader.real_filename

def _tus_start_upload(self, url, *, query_params=None):
response = self._client.api_client.rest_client.POST(
Expand Down Expand Up @@ -273,17 +279,21 @@ def upload_file_and_wait(
):
url = self._client.api_map.make_endpoint_url(endpoint.path, kwsub=url_params)
params = {"format": format_name, "filename": filename.name}
self.upload_file(
response = self.upload_file(
url, filename, pbar=pbar, query_params=params, meta={"filename": params["filename"]}
)

rq_id = json.loads(response.data).get("rq_id")
assert rq_id, "The rq_id was not found in the response"
params["rq_id"] = rq_id

self._wait_for_completion(
url,
success_status=201,
positive_statuses=[202],
status_check_period=status_check_period,
query_params=params,
method="POST",
method="PUT",
)


Expand All @@ -301,12 +311,17 @@ def upload_file_and_wait(
):
url = self._client.api_map.make_endpoint_url(upload_endpoint.path, kwsub=url_params)
params = {"format": format_name, "filename": filename.name}
self.upload_file(
response = self.upload_file(
url, filename, pbar=pbar, query_params=params, meta={"filename": params["filename"]}
)
rq_id = json.loads(response.data).get("rq_id")
assert rq_id, "The rq_id was not found in the response"

url = self._client.api_map.make_endpoint_url(retrieve_endpoint.path, kwsub=url_params)
params = {"action": "import_status"}
params = {
"action": "import_status",
"rq_id": rq_id,
}
self._wait_for_completion(
url,
success_status=201,
Expand Down
12 changes: 8 additions & 4 deletions cvat/apps/dataset_manager/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tempfile import TemporaryDirectory
import rq
from typing import Any, Callable, List, Mapping, Tuple
from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError

from django.db import transaction

Expand All @@ -16,7 +17,7 @@
from cvat.apps.dataset_manager.task import TaskAnnotation

from .annotation import AnnotationIR
from .bindings import ProjectData, load_dataset_data
from .bindings import ProjectData, load_dataset_data, CvatImportError
from .formats.registry import make_exporter, make_importer

def export_project(project_id, dst_file, format_name,
Expand Down Expand Up @@ -160,7 +161,7 @@ def data(self) -> dict:
raise NotImplementedError()

@transaction.atomic
def import_dataset_as_project(project_id, dataset_file, format_name, conv_mask_to_poly):
def import_dataset_as_project(src_file, project_id, format_name, conv_mask_to_poly):
rq_job = rq.get_current_job()
rq_job.meta['status'] = 'Dataset import has been started...'
rq_job.meta['progress'] = 0.
Expand All @@ -170,5 +171,8 @@ def import_dataset_as_project(project_id, dataset_file, format_name, conv_mask_t
project.init_from_db()

importer = make_importer(format_name)
with open(dataset_file, 'rb') as f:
project.import_dataset(f, importer, conv_mask_to_poly=conv_mask_to_poly)
with open(src_file, 'rb') as f:
try:
project.import_dataset(f, importer, conv_mask_to_poly=conv_mask_to_poly)
except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex:
raise CvatImportError(str(ex))
18 changes: 12 additions & 6 deletions cvat/apps/dataset_manager/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from copy import deepcopy
from enum import Enum
from tempfile import TemporaryDirectory
from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError

from django.db import transaction
from django.db.models.query import Prefetch
Expand All @@ -19,11 +20,10 @@
from cvat.apps.profiler import silk_profile

from .annotation import AnnotationIR, AnnotationManager
from .bindings import JobData, TaskData
from .bindings import JobData, TaskData, CvatImportError
from .formats.registry import make_exporter, make_importer
from .util import bulk_create


class dotdict(OrderedDict):
"""dot.notation access to dictionary attributes"""
__getattr__ = OrderedDict.get
Expand Down Expand Up @@ -853,19 +853,25 @@ def export_task(task_id, dst_file, format_name, server_url=None, save_images=Fal
task.export(f, exporter, host=server_url, save_images=save_images)

@transaction.atomic
def import_task_annotations(task_id, src_file, format_name, conv_mask_to_poly):
def import_task_annotations(src_file, task_id, format_name, conv_mask_to_poly):
task = TaskAnnotation(task_id)
task.init_from_db()

importer = make_importer(format_name)
with open(src_file, 'rb') as f:
task.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly)
try:
task.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly)
except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex:
raise CvatImportError(str(ex))

@transaction.atomic
def import_job_annotations(job_id, src_file, format_name, conv_mask_to_poly):
def import_job_annotations(src_file, job_id, format_name, conv_mask_to_poly):
job = JobAnnotation(job_id)
job.init_from_db()

importer = make_importer(format_name)
with open(src_file, 'rb') as f:
job.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly)
try:
job.import_annotations(f, importer, conv_mask_to_poly=conv_mask_to_poly)
except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex:
raise CvatImportError(str(ex))
5 changes: 2 additions & 3 deletions cvat/apps/dataset_manager/tests/test_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,8 +923,7 @@ def _test_can_import_annotations(self, task, import_format):
expected_ann = TaskAnnotation(task["id"])
expected_ann.init_from_db()

dm.task.import_task_annotations(task["id"],
file_path, import_format, True)
dm.task.import_task_annotations(file_path, task["id"], import_format, True)
actual_ann = TaskAnnotation(task["id"])
actual_ann.init_from_db()

Expand Down Expand Up @@ -976,6 +975,6 @@ def test_can_import_mots_annotations_with_splited_masks(self):
task.update()
task = self._create_task(task, images)

dm.task.import_task_annotations(task['id'], dataset_path, format_name, True)
dm.task.import_task_annotations(dataset_path, task['id'], format_name, True)
self._test_can_import_annotations(task, format_name)

1 change: 0 additions & 1 deletion cvat/apps/dataset_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ def get_export_cache_dir(db_instance):
PROJECT_CACHE_TTL = DEFAULT_CACHE_TTL / 3
JOB_CACHE_TTL = DEFAULT_CACHE_TTL


def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False):
try:
if task_id is not None:
Expand Down
Loading

0 comments on commit f7fd06c

Please sign in to comment.