diff --git a/otx/cli/manager/config_manager.py b/otx/cli/manager/config_manager.py index d0c0e17f12d..86d4b83b3f9 100644 --- a/otx/cli/manager/config_manager.py +++ b/otx/cli/manager/config_manager.py @@ -406,6 +406,11 @@ def get_dataset_config(self, subsets: List[str], hyper_parameters: Optional[Conf if learning_parameters: num_workers = getattr(learning_parameters, "num_workers", 0) dataset_config["cache_config"]["num_workers"] = num_workers + if str(self.task_type).upper() == "SEGMENTATION" and str(self.train_type).upper() == "SELFSUPERVISED": + # FIXME: manually set a path to save pseudo masks in workspace + train_type_rel_path = TASK_TYPE_TO_SUB_DIR_NAME[self.train_type] + train_type_dir = self.workspace_root / train_type_rel_path + dataset_config["pseudo_mask_dir"] = train_type_dir / "detcon_mask" return dataset_config def update_data_config(self, data_yaml: dict) -> None: diff --git a/otx/core/data/adapter/base_dataset_adapter.py b/otx/core/data/adapter/base_dataset_adapter.py index 7355e7c7708..378611980b6 100644 --- a/otx/core/data/adapter/base_dataset_adapter.py +++ b/otx/core/data/adapter/base_dataset_adapter.py @@ -82,6 +82,7 @@ def __init__( unlabeled_data_roots: Optional[str] = None, unlabeled_file_list: Optional[str] = None, cache_config: Optional[Dict[str, Any]] = None, + **kwargs, ): self.task_type = task_type self.domain = task_type.domain @@ -97,6 +98,7 @@ def __init__( test_ann_files=test_ann_files, unlabeled_data_roots=unlabeled_data_roots, unlabeled_file_list=unlabeled_file_list, + **kwargs, ) cache_config = cache_config if cache_config is not None else {} diff --git a/otx/core/data/adapter/segmentation_dataset_adapter.py b/otx/core/data/adapter/segmentation_dataset_adapter.py index 555e6fb3e09..e09342e6ebb 100644 --- a/otx/core/data/adapter/segmentation_dataset_adapter.py +++ b/otx/core/data/adapter/segmentation_dataset_adapter.py @@ -6,6 +6,7 @@ import json import os +from pathlib import Path from typing import Any, Dict, List, Optional import cv2 @@ -53,6 +54,7 @@ def __init__( unlabeled_data_roots: Optional[str] = None, unlabeled_file_list: Optional[str] = None, cache_config: Optional[Dict[str, Any]] = None, + **kwargs, ): super().__init__( task_type, @@ -65,6 +67,7 @@ def __init__( unlabeled_data_roots, unlabeled_file_list, cache_config, + **kwargs, ) self.updated_label_id: Dict[int, int] = {} @@ -166,7 +169,7 @@ def _import_dataset( test_ann_files: Optional[str] = None, unlabeled_data_roots: Optional[str] = None, unlabeled_file_list: Optional[str] = None, - pseudo_mask_dir: str = "detcon_mask", + pseudo_mask_dir: Path = None, ) -> Dict[Subset, DatumDataset]: """Import custom Self-SL dataset for using DetCon. @@ -183,11 +186,13 @@ def _import_dataset( test_ann_files (Optional[str]): Path for test annotation file unlabeled_data_roots (Optional[str]): Path for unlabeled data. unlabeled_file_list (Optional[str]): Path of unlabeled file list - pseudo_mask_dir (str): Directory to save pseudo masks. Defaults to "detcon_mask". + pseudo_mask_dir (Path): Directory to save pseudo masks. Defaults to None. Returns: DatumaroDataset: Datumaro Dataset """ + if pseudo_mask_dir is None: + raise ValueError("pseudo_mask_dir must be set.") if train_data_roots is None: raise ValueError("train_data_root must be set.") @@ -199,23 +204,20 @@ def _import_dataset( self.is_train_phase = True # Load pseudo masks - img_dir = None total_labels = [] + os.makedirs(pseudo_mask_dir, exist_ok=True) for item in dataset[Subset.TRAINING]: img_path = item.media.path - if img_dir is None: - # Get image directory - img_dir = train_data_roots.split("/")[-1] - pseudo_mask_path = img_path.replace(img_dir, pseudo_mask_dir) - if pseudo_mask_path.endswith(".jpg"): - pseudo_mask_path = pseudo_mask_path.replace(".jpg", ".png") + pseudo_mask_path = pseudo_mask_dir / os.path.basename(img_path) + if pseudo_mask_path.suffix == ".jpg": + pseudo_mask_path = pseudo_mask_path.with_name(f"{pseudo_mask_path.stem}.png") if not os.path.isfile(pseudo_mask_path): # Create pseudo mask - pseudo_mask = self.create_pseudo_masks(item.media.data, pseudo_mask_path) # type: ignore + pseudo_mask = self.create_pseudo_masks(item.media.data, str(pseudo_mask_path)) # type: ignore else: # Load created pseudo mask - pseudo_mask = cv2.imread(pseudo_mask_path, cv2.IMREAD_GRAYSCALE) + pseudo_mask = cv2.imread(str(pseudo_mask_path), cv2.IMREAD_GRAYSCALE) # Set annotations into each item annotations = [] @@ -229,28 +231,27 @@ def _import_dataset( ) item.annotations = annotations - pseudo_mask_roots = train_data_roots.replace(img_dir, pseudo_mask_dir) # type: ignore - if not os.path.isfile(os.path.join(pseudo_mask_roots, "dataset_meta.json")): + if not os.path.isfile(os.path.join(pseudo_mask_dir, "dataset_meta.json")): # Save dataset_meta.json for newly created pseudo masks # FIXME: Because background class is ignored when generating polygons, meta is set with len(labels)-1. # It must be considered to set the whole labels later. # (-> {i: f"target{i+1}" for i in range(max(total_labels)+1)}) meta = {"label_map": {i + 1: f"target{i+1}" for i in range(max(total_labels))}} - with open(os.path.join(pseudo_mask_roots, "dataset_meta.json"), "w", encoding="UTF-8") as f: + with open(os.path.join(pseudo_mask_dir, "dataset_meta.json"), "w", encoding="UTF-8") as f: json.dump(meta, f, indent=4) # Make categories for pseudo masks - label_map = parse_meta_file(os.path.join(pseudo_mask_roots, "dataset_meta.json")) + label_map = parse_meta_file(os.path.join(pseudo_mask_dir, "dataset_meta.json")) dataset[Subset.TRAINING].define_categories(make_categories(label_map)) return dataset - def create_pseudo_masks(self, img: np.array, pseudo_mask_path: str, mode: str = "FH") -> None: + def create_pseudo_masks(self, img: np.ndarray, pseudo_mask_path: str, mode: str = "FH") -> None: """Create pseudo masks for self-sl for semantic segmentation using DetCon. Args: - img (np.array) : A sample to create a pseudo mask. - pseudo_mask_path (str): The path to save a pseudo mask. + img (np.ndarray) : A sample to create a pseudo mask. + pseudo_mask_path (Path): The path to save a pseudo mask. mode (str): The mode to create a pseudo mask. Defaults to "FH". Returns: @@ -261,7 +262,6 @@ def create_pseudo_masks(self, img: np.array, pseudo_mask_path: str, mode: str = else: raise ValueError((f'{mode} is not supported to create pseudo masks for DetCon. Choose one of ["FH"].')) - os.makedirs(os.path.dirname(pseudo_mask_path), exist_ok=True) cv2.imwrite(pseudo_mask_path, pseudo_mask.astype(np.uint8)) return pseudo_mask diff --git a/tests/unit/core/data/adapter/test_init.py b/tests/unit/core/data/adapter/test_init.py index 62f676eeb8b..29a66f84ff2 100644 --- a/tests/unit/core/data/adapter/test_init.py +++ b/tests/unit/core/data/adapter/test_init.py @@ -11,6 +11,9 @@ TASK_NAME_TO_TASK_TYPE, ) +from pathlib import Path +import shutil + @e2e_pytest_unit @pytest.mark.parametrize("task_name", TASK_NAME_TO_TASK_TYPE.keys()) @@ -63,19 +66,28 @@ def test_get_dataset_adapter_selfsl_segmentation(task_name, train_type): task_type = TASK_NAME_TO_TASK_TYPE[task_name] data_root = TASK_NAME_TO_DATA_ROOT[task_name] - get_dataset_adapter( - task_type=task_type, - train_type=train_type, - train_data_roots=os.path.join(root_path, data_root["train"]), - ) + with pytest.raises(ValueError, match=r"pseudo_mask_dir must be set."): + get_dataset_adapter( + task_type=task_type, + train_type=train_type, + train_data_roots=os.path.join(root_path, data_root["train"]), + ) - with pytest.raises(ValueError): get_dataset_adapter( task_type=task_type, train_type=train_type, test_data_roots=os.path.join(root_path, data_root["test"]), ) + tmp_supcon_mask_dir = Path("/tmp/selfsl_supcon_unit_test") + get_dataset_adapter( + task_type=task_type, + train_type=train_type, + train_data_roots=os.path.join(root_path, data_root["train"]), + pseudo_mask_dir=tmp_supcon_mask_dir, + ) + shutil.rmtree(str(tmp_supcon_mask_dir)) + # TODO: direct annotation function is only supported in COCO format for now. @e2e_pytest_unit diff --git a/tests/unit/core/data/adapter/test_segmentation_adapter.py b/tests/unit/core/data/adapter/test_segmentation_adapter.py index 1d4479d2d0e..7da02388fe0 100644 --- a/tests/unit/core/data/adapter/test_segmentation_adapter.py +++ b/tests/unit/core/data/adapter/test_segmentation_adapter.py @@ -4,6 +4,7 @@ # import os import shutil +from pathlib import Path from typing import Optional import numpy as np @@ -66,7 +67,7 @@ def test_get_otx_dataset(self): class TestSelfSLSegmentationDatasetAdapter: - def setup_method(self, method) -> None: + def setup_class(self) -> None: self.root_path = os.getcwd() task = "segmentation" @@ -74,7 +75,10 @@ def setup_method(self, method) -> None: data_root_dict: dict = TASK_NAME_TO_DATA_ROOT[task] self.train_data_roots: str = os.path.join(self.root_path, data_root_dict["train"], "images") - self.pseudo_mask_roots = os.path.abspath(self.train_data_roots.replace("images", "detcon_mask")) + self.pseudo_mask_dir = Path(os.path.abspath(self.train_data_roots.replace("images", "detcon_mask"))) + + def teardown_class(self) -> None: + shutil.rmtree(self.pseudo_mask_dir, ignore_errors=True) @e2e_pytest_unit def test_import_dataset_create_all_masks(self, mocker): @@ -82,12 +86,11 @@ def test_import_dataset_create_all_masks(self, mocker): This test is for when all masks are not created and it is required to create masks. """ - shutil.rmtree(self.pseudo_mask_roots, ignore_errors=True) + shutil.rmtree(self.pseudo_mask_dir, ignore_errors=True) spy_create_pseudo_masks = mocker.spy(SelfSLSegmentationDatasetAdapter, "create_pseudo_masks") dataset_adapter = SelfSLSegmentationDatasetAdapter( - task_type=self.task_type, - train_data_roots=self.train_data_roots, + task_type=self.task_type, train_data_roots=self.train_data_roots, pseudo_mask_dir=self.pseudo_mask_dir ) spy_create_pseudo_masks.assert_called() @@ -102,20 +105,19 @@ def test_import_dataset_create_some_uncreated_masks(self, mocker, idx_remove: in and it is required to either create or just load masks. In this test, remove a mask created before and check if `create_pseudo_masks` is called once. """ - shutil.rmtree(self.pseudo_mask_roots, ignore_errors=True) + shutil.rmtree(self.pseudo_mask_dir, ignore_errors=True) dataset_adapter = SelfSLSegmentationDatasetAdapter( - task_type=self.task_type, - train_data_roots=self.train_data_roots, + task_type=self.task_type, train_data_roots=self.train_data_roots, pseudo_mask_dir=self.pseudo_mask_dir ) - assert os.path.isdir(self.pseudo_mask_roots) - assert len(os.listdir(self.pseudo_mask_roots)) == 4 + assert os.path.isdir(self.pseudo_mask_dir) + assert len(os.listdir(self.pseudo_mask_dir)) == 4 # remove a mask - os.remove(os.path.join(self.pseudo_mask_roots, f"000{idx_remove}.png")) + os.remove(os.path.join(self.pseudo_mask_dir, f"000{idx_remove}.png")) spy_create_pseudo_masks = mocker.spy(SelfSLSegmentationDatasetAdapter, "create_pseudo_masks") _ = dataset_adapter._import_dataset( - train_data_roots=self.train_data_roots, + train_data_roots=self.train_data_roots, pseudo_mask_dir=self.pseudo_mask_dir ) spy_create_pseudo_masks.assert_called() @@ -127,8 +129,7 @@ def test_import_dataset_just_load_masks(self, mocker): spy_create_pseudo_masks = mocker.spy(SelfSLSegmentationDatasetAdapter, "create_pseudo_masks") _ = SelfSLSegmentationDatasetAdapter( - task_type=self.task_type, - train_data_roots=self.train_data_roots, + task_type=self.task_type, train_data_roots=self.train_data_roots, pseudo_mask_dir=self.pseudo_mask_dir ) spy_create_pseudo_masks.assert_not_called() @@ -148,8 +149,7 @@ def test_create_pseudo_masks(self, mocker): mocker.patch("otx.core.data.adapter.segmentation_dataset_adapter.os.makedirs") mocker.patch("otx.core.data.adapter.segmentation_dataset_adapter.cv2.imwrite") dataset_adapter = SelfSLSegmentationDatasetAdapter( - task_type=self.task_type, - train_data_roots=self.train_data_roots, + task_type=self.task_type, train_data_roots=self.train_data_roots, pseudo_mask_dir=self.pseudo_mask_dir ) pseudo_mask = dataset_adapter.create_pseudo_masks(img=np.ones((2, 2)), pseudo_mask_path="")