diff --git a/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md b/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md new file mode 100644 index 000000000000..8efcd99d7bf8 --- /dev/null +++ b/changelog.d/20241113_130658_dmitrii.lavrukhin_no_queryset_cache.md @@ -0,0 +1,5 @@ +### Fixed + +- Optimized memory consumption and reduced the number of database queries + when importing annotations to a task with a lot of jobs and images + () diff --git a/cvat/apps/dataset_manager/bindings.py b/cvat/apps/dataset_manager/bindings.py index 9b01dced2a94..3b2ccd782a88 100644 --- a/cvat/apps/dataset_manager/bindings.py +++ b/cvat/apps/dataset_manager/bindings.py @@ -22,7 +22,7 @@ import rq from attr import attrib, attrs from datumaro.components.format_detection import RejectionReason -from django.db.models import QuerySet +from django.db.models import Prefetch, QuerySet from django.utils import timezone from django.conf import settings @@ -859,7 +859,9 @@ def __init__(self, annotation_ir: AnnotationIR, db_task: Task, **kwargs): @staticmethod def meta_for_task(db_task, host, label_mapping=None): - db_segments = db_task.segment_set.all().prefetch_related('job_set') + db_segments = db_task.segment_set.all().prefetch_related( + Prefetch('job_set', models.Job.objects.order_by("pk")) + ) meta = OrderedDict([ ("id", str(db_task.id)), diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 5b72f92a1ebc..45f1eaff4e8e 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -13,7 +13,7 @@ from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError from django.db import transaction -from django.db.models.query import Prefetch +from django.db.models.query import Prefetch, QuerySet from django.conf import settings from rest_framework.exceptions import ValidationError @@ -81,9 +81,10 @@ def merge_table_rows(rows, keys_for_merge, field_id): return list(merged_rows.values()) + class JobAnnotation: @classmethod - def add_prefetch_info(cls, queryset): + def add_prefetch_info(cls, queryset: QuerySet, prefetch_images: bool = True): assert issubclass(queryset.model, models.Job) label_qs = add_prefetch_fields(models.Label.objects.all(), [ @@ -93,6 +94,12 @@ def add_prefetch_info(cls, queryset): ]) label_qs = JobData.add_prefetch_info(label_qs) + task_data_queryset = models.Data.objects.all() + if prefetch_images: + task_data_queryset = task_data_queryset.select_related('video').prefetch_related( + Prefetch('images', queryset=models.Image.objects.order_by('frame')) + ) + return queryset.select_related( 'segment', 'segment__task', @@ -100,28 +107,35 @@ def add_prefetch_info(cls, queryset): 'segment__task__project', 'segment__task__owner', 'segment__task__assignee', - 'segment__task__project__owner', - 'segment__task__project__assignee', - Prefetch('segment__task__data', - queryset=models.Data.objects.select_related('video').prefetch_related( - Prefetch('images', queryset=models.Image.objects.order_by('frame')) - )), + Prefetch('segment__task__data', queryset=task_data_queryset), Prefetch('segment__task__label_set', queryset=label_qs), Prefetch('segment__task__project__label_set', queryset=label_qs), ) - def __init__(self, pk, *, is_prefetched=False, queryset=None): - if queryset is None: - queryset = self.add_prefetch_info(models.Job.objects) + def __init__( + self, + pk, + *, + lock_job_in_db: bool = False, + queryset: QuerySet | None = None, + prefetch_images: bool = False, + db_job: models.Job | None = None + ): + assert db_job is None or lock_job_in_db is False + assert (db_job is None and queryset is None) or prefetch_images is False + assert db_job is None or queryset is None + if db_job is None: + if queryset is None: + queryset = self.add_prefetch_info(models.Job.objects, prefetch_images=prefetch_images) + + if lock_job_in_db: + queryset = queryset.select_for_update() - if is_prefetched: - self.db_job: models.Job = queryset.select_related( - 'segment__task' - ).select_for_update().get(id=pk) - else: self.db_job: models.Job = get_cached(queryset, pk=int(pk)) + else: + self.db_job: models.Job = db_job db_segment = self.db_job.segment self.start_frame = db_segment.start_frame @@ -786,6 +800,7 @@ def import_annotations(self, src_file, importer, **options): self.create(job_data.data.slice(self.start_frame, self.stop_frame).serialize()) + class TaskAnnotation: def __init__(self, pk): self.db_task = models.Task.objects.prefetch_related( @@ -797,8 +812,7 @@ def __init__(self, pk): requested_job_types.append(models.JobType.GROUND_TRUTH) self.db_jobs = ( - models.Job.objects - .select_related("segment") + JobAnnotation.add_prefetch_info(models.Job.objects, prefetch_images=False) .filter(segment__task_id=pk, type__in=requested_job_types) ) @@ -821,14 +835,14 @@ def _patch_data(self, data: Union[AnnotationIR, dict], action: Optional[PatchAct start = db_job.segment.start_frame stop = db_job.segment.stop_frame jobs[jid] = { "start": start, "stop": stop } - splitted_data[jid] = data.slice(start, stop) + splitted_data[jid] = (data.slice(start, stop), db_job) - for jid, job_data in splitted_data.items(): + for jid, (job_data, db_job) in splitted_data.items(): data = AnnotationIR(self.db_task.dimension) if action is None: - data.data = put_job_data(jid, job_data) + data.data = put_job_data(jid, job_data, db_job=db_job) else: - data.data = patch_job_data(jid, job_data, action) + data.data = patch_job_data(jid, job_data, action, db_job=db_job) if data.version > self.ir_data.version: self.ir_data.version = data.version @@ -936,18 +950,18 @@ def delete(self, data=None): self._patch_data(data, PatchAction.DELETE) else: for db_job in self.db_jobs: - delete_job_data(db_job.id) + delete_job_data(db_job.id, db_job=db_job) def init_from_db(self): self.reset() - for db_job in self.db_jobs: + for db_job in self.db_jobs.select_for_update(): if db_job.type == models.JobType.GROUND_TRUTH and not ( self.db_task.data.validation_mode == models.ValidationMode.GT_POOL ): continue - gt_annotation = JobAnnotation(db_job.id, is_prefetched=True) + gt_annotation = JobAnnotation(db_job.id, db_job=db_job) gt_annotation.init_from_db() if gt_annotation.ir_data.version > self.ir_data.version: self.ir_data.version = gt_annotation.ir_data.version @@ -1006,19 +1020,21 @@ def get_job_data(pk): return annotation.data + @silk_profile(name="POST job data") @transaction.atomic -def put_job_data(pk, data): - annotation = JobAnnotation(pk) +def put_job_data(pk, data: AnnotationIR | dict, *, db_job: models.Job | None = None): + annotation = JobAnnotation(pk, db_job=db_job) annotation.put(data) return annotation.data + @silk_profile(name="UPDATE job data") @plugin_decorator @transaction.atomic -def patch_job_data(pk, data, action): - annotation = JobAnnotation(pk) +def patch_job_data(pk, data: AnnotationIR | dict, action: PatchAction, *, db_job: models.Job | None = None): + annotation = JobAnnotation(pk, db_job=db_job) if action == PatchAction.CREATE: annotation.create(data) elif action == PatchAction.UPDATE: @@ -1028,12 +1044,14 @@ def patch_job_data(pk, data, action): return annotation.data + @silk_profile(name="DELETE job data") @transaction.atomic -def delete_job_data(pk): - annotation = JobAnnotation(pk) +def delete_job_data(pk, *, db_job: models.Job | None = None): + annotation = JobAnnotation(pk, db_job=db_job) annotation.delete() + def export_job(job_id, dst_file, format_name, server_url=None, save_images=False): # For big tasks dump function may run for a long time and # we dont need to acquire lock after the task has been initialized from DB. @@ -1041,13 +1059,14 @@ def export_job(job_id, dst_file, format_name, server_url=None, save_images=False # more dump request received at the same time: # https://github.com/cvat-ai/cvat/issues/217 with transaction.atomic(): - job = JobAnnotation(job_id) + job = JobAnnotation(job_id, prefetch_images=True, lock_job_in_db=True) job.init_from_db() exporter = make_exporter(format_name) with open(dst_file, 'wb') as f: job.export(f, exporter, host=server_url, save_images=save_images) + @silk_profile(name="GET task data") @transaction.atomic def get_task_data(pk): @@ -1056,6 +1075,7 @@ def get_task_data(pk): return annotation.data + @silk_profile(name="POST task data") @transaction.atomic def put_task_data(pk, data): @@ -1064,6 +1084,7 @@ def put_task_data(pk, data): return annotation.data + @silk_profile(name="UPDATE task data") @transaction.atomic def patch_task_data(pk, data, action): @@ -1077,12 +1098,14 @@ def patch_task_data(pk, data, action): return annotation.data + @silk_profile(name="DELETE task data") @transaction.atomic def delete_task_data(pk): annotation = TaskAnnotation(pk) annotation.delete() + def export_task(task_id, dst_file, format_name, server_url=None, save_images=False): # For big tasks dump function may run for a long time and # we dont need to acquire lock after the task has been initialized from DB. @@ -1097,6 +1120,7 @@ def export_task(task_id, dst_file, format_name, server_url=None, save_images=Fal with open(dst_file, 'wb') as f: task.export(f, exporter, host=server_url, save_images=save_images) + @transaction.atomic def import_task_annotations(src_file, task_id, format_name, conv_mask_to_poly): task = TaskAnnotation(task_id) @@ -1108,9 +1132,10 @@ def import_task_annotations(src_file, task_id, format_name, conv_mask_to_poly): except (DatasetError, DatasetImportError, DatasetNotFoundError) as ex: raise CvatImportError(str(ex)) + @transaction.atomic def import_job_annotations(src_file, job_id, format_name, conv_mask_to_poly): - job = JobAnnotation(job_id) + job = JobAnnotation(job_id, prefetch_images=True) importer = make_importer(format_name) with open(src_file, 'rb') as f: