diff --git a/dataquality/integrations/cv/torch/semantic_segmentation.py b/dataquality/integrations/cv/torch/semantic_segmentation.py index b3432e6b5..84f0fedd9 100644 --- a/dataquality/integrations/cv/torch/semantic_segmentation.py +++ b/dataquality/integrations/cv/torch/semantic_segmentation.py @@ -1,4 +1,6 @@ +import json import os +from tempfile import NamedTemporaryFile from typing import Any, Callable, Dict, List, Optional, Tuple, Union import numpy as np @@ -8,9 +10,11 @@ from torch.utils.data import DataLoader import dataquality as dq +from dataquality import config from dataquality.analytics import Analytics from dataquality.clients.api import ApiClient from dataquality.clients.objectstore import ObjectStore +from dataquality.core._config import GALILEO_DEFAULT_RESULT_BUCKET_NAME from dataquality.exceptions import GalileoException from dataquality.integrations.torch import TorchLogger, unwatch from dataquality.loggers.model_logger.semantic_segmentation import ( @@ -27,6 +31,7 @@ fill_confident_counts, ) from dataquality.utils.semantic_segmentation.utils import mask_to_boundary +from dataquality.utils.thread_pool import ThreadPoolManager, lock from dataquality.utils.torch import ModelHookManager, store_batch_indices a = Analytics(ApiClient, dq.config) # type: ignore @@ -37,7 +42,7 @@ # Heuristic used to calculate Likely Mislabeled for Semantic Segmentation # A larger queue size corresponds to a more accurate estimate of LM. # We keep a queue size to overcome memory issues with large SemSeg datasets. -LIKELY_MISLABELED_QUEUE_SIZE = 100 +LIKELY_MISLABELED_QUEUE_SIZE = 1000 class SemanticTorchLogger(TorchLogger): @@ -372,6 +377,35 @@ def _on_step_end(self) -> None: ) logger.log() + def upload_contours_split(self, split: str) -> None: + """Uploads all contours for a given split to minio + + Args: + split (str): split name + """ + model_logger = dq.get_model_logger() + project_path = f"{model_logger.LOG_FILE_DIR}/{config.current_project_id}" + local_contour_path = f"{project_path}/{config.current_run_id}/{split}/contours" + + files = os.listdir(local_contour_path) + all_contours = {} + for file in files: + with open(f"{local_contour_path}/{file}") as f: + contours = json.load(f) + # uuid is the key for each contour from the polygon schema + all_contours[file.replace(".json", "")] = contours + with NamedTemporaryFile(mode="w+", delete=False) as temp_file: + json.dump(all_contours, temp_file) + + obj_name = f"{model_logger.proj_run}/{split}/contours/contours.json" + object_store.create_object( + object_name=obj_name, + file_path=temp_file.name, + content_type="application/json", + progress=False, + bucket_name=GALILEO_DEFAULT_RESULT_BUCKET_NAME, + ) + def finish(self) -> None: # call to eval to make sure we are not in train mode for batch norm # in batch norm with 1 example can get an error if we are in train mode @@ -384,6 +418,11 @@ def finish(self) -> None: dq.set_epoch_and_split(0, Split[split]) with torch.no_grad(): self.run_one_epoch(dataloader, device) + split = self.logger_config.cur_split.lower() # type: ignore + # Ensure all contours are written to disk before starting upload + ThreadPoolManager.wait_for_threads() + with lock: + self.upload_contours_split(split) self.model.train() def run_one_epoch(self, dataloader: DataLoader, device: torch.device) -> None: @@ -501,6 +540,9 @@ def watch( assert key in Split.__members__, GalileoException( f"Dataloader key {key} is not a valid split" ) + current_split = Split[key].value + logger_config = dq.get_model_logger().logger_config + setattr(logger_config, f"{current_split}_logged", True) assert isinstance(dataloader, DataLoader), GalileoException( "Invalid dataloader. Must be a pytorch dataloader" "from torch.utils.data import DataLoader..." diff --git a/dataquality/loggers/model_logger/semantic_segmentation.py b/dataquality/loggers/model_logger/semantic_segmentation.py index 9cb408f35..0d398e1e9 100644 --- a/dataquality/loggers/model_logger/semantic_segmentation.py +++ b/dataquality/loggers/model_logger/semantic_segmentation.py @@ -4,6 +4,8 @@ import torch import dataquality as dq +from dataquality import config +from dataquality.clients.objectstore import ObjectStore from dataquality.loggers.logger_config.semantic_segmentation import ( SemanticSegmentationLoggerConfig, semantic_segmentation_logger_config, @@ -26,9 +28,11 @@ ) from dataquality.utils.semantic_segmentation.polygons import ( find_polygons_batch, - upload_polygon_contours, + write_polygon_contours_to_disk, ) +object_store = ObjectStore() + class SemanticSegmentationModelLogger(BaseGalileoModelLogger): __logger_name__ = "semantic_segmentation" @@ -106,8 +110,14 @@ def dep_path(self) -> str: return f"{self.proj_run}/{self.split_name_path}/dep" @property - def contours_path(self) -> str: - return f"{self.proj_run}/{self.split_name_path}/contours" + def local_proj_run_path(self) -> str: + return ( + f"{self.LOG_FILE_DIR}/{config.current_project_id}/{config.current_run_id}" + ) + + @property + def local_contours_path(self) -> str: + return f"{self.local_proj_run_path}/{self.split_name_path}/contours" def get_polygon_data( self, @@ -160,9 +170,8 @@ def get_polygon_data( mislabeled_class_pcts.append( polygon.cls_error_data.mislabeled_class_pct ) - upload_polygon_contours(polygon, self.contours_path) + write_polygon_contours_to_disk(polygon, self.local_contours_path) polygon_ids.append(polygon.uuid) - polygon_data = { "polygon_uuid": polygon_ids, "image_id": image_ids, diff --git a/dataquality/utils/semantic_segmentation/polygons.py b/dataquality/utils/semantic_segmentation/polygons.py index 9f77a2cc8..2d72e42ba 100644 --- a/dataquality/utils/semantic_segmentation/polygons.py +++ b/dataquality/utils/semantic_segmentation/polygons.py @@ -1,6 +1,6 @@ import json +import os from collections import defaultdict -from tempfile import NamedTemporaryFile from typing import List, Tuple from uuid import uuid4 @@ -9,7 +9,6 @@ import torch from dataquality.clients.objectstore import ObjectStore -from dataquality.core._config import GALILEO_DEFAULT_RESULT_BUCKET_NAME from dataquality.schemas.ml import ClassType from dataquality.schemas.semantic_segmentation import Contour, Pixel, Polygon @@ -146,26 +145,21 @@ def draw_polygon(polygon: Polygon, shape: Tuple[int, ...]) -> np.ndarray: ) -def upload_polygon_contours( +def write_polygon_contours_to_disk( polygon: Polygon, prefix: str, ) -> None: - """Uploads a Polygon's contours to the cloud + """Writes polygons to disk in json format Args: polygon(Polygon): A Polygon object prefix(str): prefix of the object name in storage - - /proj-id/run-id/training/contours/1.json + \"{proj_run_path}/{split_name_path}/contours" """ - obj_name = f"{prefix}/{polygon.uuid}.json" - with NamedTemporaryFile(mode="w+", delete=False) as f: - json.dump(polygon.contours_json, f) + os.makedirs(prefix, exist_ok=True) + local_path = f"{prefix}/{polygon.uuid}.json" - object_store.create_object( - object_name=obj_name, - file_path=f.name, - content_type="application/json", - progress=False, - bucket_name=GALILEO_DEFAULT_RESULT_BUCKET_NAME, - ) + with open(local_path, "w") as f: + json.dump(polygon.contours_json, f) + f.close()