Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify error handling with the cloud storage #5389

Merged
merged 15 commits into from
Dec 8, 2022
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ from online detectors & interactors) (<https://github.com/opencv/cvat/pull/4543>
- Allowed trailing slashes in the SDK host address (<https://github.com/opencv/cvat/pull/5057>)
- Adjusted initial camera position, enabled 'Reset zoom' option for 3D canvas (<https://github.com/opencv/cvat/pull/5395>)
- Enabled authentication via email (<https://github.com/opencv/cvat/pull/5037>)
- Unify error handling with the cloud storage (<https://github.com/opencv/cvat/pull/5389>)

### Deprecated
- TDB
Expand Down
14 changes: 9 additions & 5 deletions cvat/apps/engine/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from rest_framework.parsers import JSONParser
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.exceptions import ValidationError, PermissionDenied, NotFound
from django_sendfile import sendfile
from distutils.util import strtobool

Expand All @@ -39,9 +40,7 @@
from cvat.apps.engine.task import _create_thread
from cvat.apps.dataset_manager.views import TASK_CACHE_TTL, PROJECT_CACHE_TTL, get_export_cache_dir, clear_export_cache, log_exception
from cvat.apps.dataset_manager.bindings import CvatImportError
from cvat.apps.engine.cloud_provider import (
db_storage_to_storage_instance, import_from_cloud_storage, export_to_cloud_storage
)
from cvat.apps.engine.cloud_provider import db_storage_to_storage_instance

from cvat.apps.engine.location import StorageType, get_location_configuration

Expand Down Expand Up @@ -798,7 +797,12 @@ def export(db_instance, request):
db_storage = get_object_or_404(CloudStorageModel, pk=storage_id)
storage = db_storage_to_storage_instance(db_storage)

export_to_cloud_storage(storage, file_path, filename)
try:
storage.upload_file(file_path, filename)
except (ValidationError, PermissionDenied, NotFound) as ex:
msg = str(ex) if not isinstance(ex, ValidationError) else \
'\n'.join([str(d) for d in ex.detail])
return Response(data=msg, status=ex.status_code)

Check warning

Code scanning / CodeQL

Information exposure through an exception

[Stack trace information](1) flows to this location and may be exposed to an external user.
return Response(status=status.HTTP_200_OK)
else:
raise NotImplementedError()
Expand Down Expand Up @@ -826,7 +830,7 @@ def export(db_instance, request):
def _download_file_from_bucket(db_storage, filename, key):
storage = db_storage_to_storage_instance(db_storage)

data = import_from_cloud_storage(storage, key)
data = storage.download_fileobj(key)
with open(filename, 'wb+') as f:
f.write(data.getbuffer())

Expand Down
46 changes: 15 additions & 31 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
ImageDatasetManifestReader, VideoDatasetManifestReader)
from cvat.apps.engine.models import DataChoice, StorageChoice
from cvat.apps.engine.models import DimensionType
from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials, Status
from cvat.apps.engine.cloud_provider import get_cloud_storage_instance, Credentials
from cvat.apps.engine.utils import md5_hash
class CacheInteraction:
def __init__(self, dimension=DimensionType.DIM_2D):
Expand Down Expand Up @@ -82,36 +82,20 @@ def prepare_chunk_buff(self, db_data, quality, chunk_number):
'credentials': credentials,
'specific_attributes': db_cloud_storage.get_specific_attributes()
}
try:
cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details)
for item in reader:
file_name = f"{item['name']}{item['extension']}"
with NamedTemporaryFile(mode='w+b', prefix='cvat', suffix=file_name.replace(os.path.sep, '#'), delete=False) as temp_file:
source_path = temp_file.name
buf = cloud_storage_instance.download_fileobj(file_name)
temp_file.write(buf.getvalue())
temp_file.flush()
checksum = item.get('checksum', None)
if not checksum:
slogger.cloud_storage[db_cloud_storage.id].warning('A manifest file does not contain checksum for image {}'.format(item.get('name')))
if checksum and not md5_hash(source_path) == checksum:
slogger.cloud_storage[db_cloud_storage.id].warning('Hash sums of files {} do not match'.format(file_name))
images.append((source_path, source_path, None))
except Exception as ex:
storage_status = cloud_storage_instance.get_status()
if storage_status == Status.FORBIDDEN:
msg = 'The resource {} is no longer available. Access forbidden.'.format(cloud_storage_instance.name)
elif storage_status == Status.NOT_FOUND:
msg = 'The resource {} not found. It may have been deleted.'.format(cloud_storage_instance.name)
else:
# check status of last file
file_status = cloud_storage_instance.get_file_status(file_name)
if file_status == Status.NOT_FOUND:
raise Exception("'{}' not found on the cloud storage '{}'".format(file_name, cloud_storage_instance.name))
elif file_status == Status.FORBIDDEN:
raise Exception("Access to the file '{}' on the '{}' cloud storage is denied".format(file_name, cloud_storage_instance.name))
msg = str(ex)
raise Exception(msg)
cloud_storage_instance = get_cloud_storage_instance(cloud_provider=db_cloud_storage.provider_type, **details)
for item in reader:
file_name = f"{item['name']}{item['extension']}"
with NamedTemporaryFile(mode='w+b', prefix='cvat', suffix=file_name.replace(os.path.sep, '#'), delete=False) as temp_file:
source_path = temp_file.name
buf = cloud_storage_instance.download_fileobj(file_name)
temp_file.write(buf.getvalue())
temp_file.flush()
checksum = item.get('checksum', None)
if not checksum:
slogger.cloud_storage[db_cloud_storage.id].warning('A manifest file does not contain checksum for image {}'.format(item.get('name')))
if checksum and not md5_hash(source_path) == checksum:
slogger.cloud_storage[db_cloud_storage.id].warning('Hash sums of files {} do not match'.format(file_name))
images.append((source_path, source_path, None))
else:
for item in reader:
source_path = os.path.join(upload_dir, f"{item['name']}{item['extension']}")
Expand Down
100 changes: 60 additions & 40 deletions cvat/apps/engine/cloud_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from abc import ABC, abstractmethod, abstractproperty
from enum import Enum
from io import BytesIO
from rest_framework import serializers
from rest_framework.exceptions import PermissionDenied, NotFound, ValidationError

from boto3.s3.transfer import TransferConfig
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -46,6 +46,45 @@ class Permissions(str, Enum):
def all(cls):
return {i.value for i in cls}


def validate_bucket_status(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
try:
res = func(self, *args, **kwargs)
except Exception as ex:
# check that cloud storage exists
storage_status = self.get_status() if self is not None else None
if storage_status == Status.FORBIDDEN:
raise PermissionDenied('The resource {} is no longer available. Access forbidden.'.format(self.name))
elif storage_status == Status.NOT_FOUND:
raise NotFound('The resource {} not found. It may have been deleted.'.format(self.name))
elif storage_status == Status.AVAILABLE:
raise
raise ValidationError(str(ex))

Check warning

Code scanning / CodeQL

Information exposure through an exception

[Stack trace information](1) flows to this location and may be exposed to an external user.
return res
return wrapper

def validate_file_status(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
try:
res = func(self, *args, **kwargs)
except Exception as ex:
storage_status = self.get_status() if self is not None else None
if storage_status == Status.AVAILABLE:
key = args[0]
file_status = self.get_file_status(key)
if file_status == Status.NOT_FOUND:
raise NotFound("The file '{}' not found on the cloud storage '{}'".format(key, self.name))
elif file_status == Status.FORBIDDEN:
raise PermissionDenied("Access to the file '{}' on the '{}' cloud storage is denied".format(key, self.name))
raise ValidationError(str(ex))

Check warning

Code scanning / CodeQL

Information exposure through an exception

[Stack trace information](1) flows to this location and may be exposed to an external user.
else:
raise
return res
return wrapper

class _CloudStorage(ABC):

def __init__(self):
Expand Down Expand Up @@ -239,16 +278,20 @@ def get_file_status(self, key):
else:
return Status.NOT_FOUND

@validate_file_status
@validate_bucket_status
def get_file_last_modified(self, key):
return self._head_file(key).get('LastModified')

@validate_bucket_status
def upload_fileobj(self, file_obj, file_name):
self._bucket.upload_fileobj(
Fileobj=file_obj,
Key=file_name,
Config=TransferConfig(max_io_queue=self.transfer_config['max_io_queue'])
)

@validate_bucket_status
def upload_file(self, file_path, file_name=None):
if not file_name:
file_name = os.path.basename(file_path)
Expand All @@ -269,6 +312,8 @@ def initialize_content(self):
'name': item.key,
} for item in files]

@validate_file_status
@validate_bucket_status
def download_fileobj(self, key):
buf = BytesIO()
self.bucket.download_fileobj(
Expand Down Expand Up @@ -378,6 +423,8 @@ def _head_file(self, key):
blob_client = self.container.get_blob_client(key)
return blob_client.get_blob_properties()

@validate_file_status
@validate_bucket_status
def get_file_last_modified(self, key):
return self._head_file(key).last_modified

Expand All @@ -401,18 +448,15 @@ def get_file_status(self, key):
else:
return Status.NOT_FOUND

@validate_bucket_status
def upload_fileobj(self, file_obj, file_name):
self._container_client.upload_blob(name=file_name, data=file_obj)

def upload_file(self, file_path, file_name=None):
if not file_name:
file_name = os.path.basename(file_path)
try:
with open(file_path, 'r') as f:
self.upload_fileobj(f, file_name)
except Exception as ex:
slogger.glob.error(str(ex))
raise
with open(file_path, 'r') as f:
self.upload_fileobj(f, file_name)

# TODO:
# def multipart_upload(self, file_obj):
Expand All @@ -424,6 +468,8 @@ def initialize_content(self):
'name': item.name
} for item in files]

@validate_file_status
@validate_bucket_status
def download_fileobj(self, key):
buf = BytesIO()
storage_stream_downloader = self._container_client.download_blob(
Expand Down Expand Up @@ -509,24 +555,24 @@ def initialize_content(self):
)
]

@validate_file_status
@validate_bucket_status
def download_fileobj(self, key):
buf = BytesIO()
blob = self.bucket.blob(key)
self._storage_client.download_blob_to_file(blob, buf)
buf.seek(0)
return buf

@validate_bucket_status
def upload_fileobj(self, file_obj, file_name):
self.bucket.blob(file_name).upload_from_file(file_obj)

@validate_bucket_status
def upload_file(self, file_path, file_name=None):
if not file_name:
file_name = os.path.basename(file_path)
try:
self.bucket.blob(file_name).upload_from_filename(file_path)
except Exception as ex:
slogger.glob.info(str(ex))
raise
self.bucket.blob(file_name).upload_from_filename(file_path)

def create(self):
try:
Expand All @@ -545,6 +591,8 @@ def create(self):
slogger.glob.info(msg)
raise Exception(msg)

@validate_file_status
@validate_bucket_status
def get_file_last_modified(self, key):
blob = self.bucket.blob(key)
blob.reload()
Expand Down Expand Up @@ -616,26 +664,6 @@ def mapping_with_new_values(self, credentials):
def values(self):
return [self.key, self.secret_key, self.session_token, self.account_name, self.key_file_path]


def validate_bucket_status(func):
@functools.wraps(func)
def wrapper(storage, *args, **kwargs):
try:
res = func(storage, *args, **kwargs)
except Exception as ex:
# check that cloud storage exists
storage_status = storage.get_status() if storage is not None else None
if storage_status == Status.FORBIDDEN:
msg = 'The resource {} is no longer available. Access forbidden.'.format(storage.name)
elif storage_status == Status.NOT_FOUND:
msg = 'The resource {} not found. It may have been deleted.'.format(storage.name)
else:
msg = str(ex)
raise serializers.ValidationError(msg)
return res
return wrapper


def db_storage_to_storage_instance(db_storage):
credentials = Credentials()
credentials.convert_from_db({
Expand All @@ -648,11 +676,3 @@ def db_storage_to_storage_instance(db_storage):
'specific_attributes': db_storage.get_specific_attributes()
}
return get_cloud_storage_instance(cloud_provider=db_storage.provider_type, **details)

@validate_bucket_status
def import_from_cloud_storage(storage, file_name):
return storage.download_fileobj(file_name)

@validate_bucket_status
def export_to_cloud_storage(storage, file_path, file_name):
storage.upload_file(file_path, file_name)
17 changes: 15 additions & 2 deletions cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,30 @@ def parse_specific_attributes(specific_attributes):
} if parsed_specific_attributes else dict()


def parse_exception_message(msg):
parsed_msg = msg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need to handle a case when msg is empty by a reason. Please resolve a conflict and see the case.

try:
if 'ErrorDetail' in msg:
# msg like: 'rest_framework.exceptions.ValidationError:
# [ErrorDetail(string="...", code=\'invalid\')]\n'
parsed_msg = msg.split('string=')[1].split(', code=')[0].strip("\"")
elif msg.startswith('rest_framework.exceptions.'):
parsed_msg = msg.split(':')[1].strip()
except Exception: # nosec
pass
return parsed_msg

def process_failed_job(rq_job):
if rq_job.meta['tmp_file_descriptor']:
os.close(rq_job.meta['tmp_file_descriptor'])
if os.path.exists(rq_job.meta['tmp_file']):
os.remove(rq_job.meta['tmp_file'])
exc_info = str(rq_job.exc_info) or str(rq_job.dependency.exc_info)
exc_info = str(rq_job.exc_info or rq_job.dependency.exc_info)
if rq_job.dependency:
rq_job.dependency.delete()
rq_job.delete()

return exc_info
return parse_exception_message(exc_info)

def configure_dependent_job(queue, rq_id, rq_func, db_storage, filename, key):
rq_job_id_download_file = rq_id + f'?action=download_{filename}'
Expand Down
Loading