diff --git a/tests/unit/pipelines/tiled_ensemble/__init__.py b/tests/unit/pipelines/tiled_ensemble/__init__.py deleted file mode 100644 index a78a1ad659..0000000000 --- a/tests/unit/pipelines/tiled_ensemble/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -"""Tiled ensemble unit tests.""" - -# Copyright (C) 2023-2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 diff --git a/tests/unit/pipelines/tiled_ensemble/conftest.py b/tests/unit/pipelines/tiled_ensemble/conftest.py deleted file mode 100644 index b4fad61ebb..0000000000 --- a/tests/unit/pipelines/tiled_ensemble/conftest.py +++ /dev/null @@ -1,151 +0,0 @@ -"""Fixtures that are used in tiled ensemble testing.""" - -# Copyright (C) 2023-2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import json -from pathlib import Path -from tempfile import TemporaryDirectory - -import pytest -import torch -import yaml - -from anomalib.data import AnomalibDataModule -from anomalib.models import AnomalyModule -from anomalib.pipelines.tiled_ensemble.components.utils.ensemble_tiling import EnsembleTiler -from anomalib.pipelines.tiled_ensemble.components.utils.helper_functions import ( - get_ensemble_datamodule, - get_ensemble_model, - get_ensemble_tiler, -) -from anomalib.pipelines.tiled_ensemble.components.utils.prediction_data import EnsemblePredictions -from anomalib.pipelines.tiled_ensemble.components.utils.prediction_merging import PredictionMergingMechanism - - -@pytest.fixture(scope="module") -def get_ensemble_config(dataset_path: Path) -> dict: - """Return ensemble dummy config dict with corrected dataset path to dummy temp dir.""" - with Path("tests/unit/pipelines/tiled_ensemble/dummy_config.yaml").open(encoding="utf-8") as file: - config = yaml.safe_load(file) - # dummy dataset - config["data"]["init_args"]["root"] = dataset_path / "mvtec" - - return config - - -@pytest.fixture(scope="module") -def get_tiler(get_ensemble_config: dict) -> EnsembleTiler: - """Return EnsembleTiler object based on test dummy config.""" - config = get_ensemble_config - - return get_ensemble_tiler(config["tiling"], config["data"]) - - -@pytest.fixture(scope="module") -def get_model(get_ensemble_config: dict, get_tiler: EnsembleTiler) -> AnomalyModule: - """Return model prepared for tiled ensemble training.""" - config = get_ensemble_config - tiler = get_tiler - - return get_ensemble_model(config["TrainModels"]["model"], tiler) - - -@pytest.fixture(scope="module") -def get_datamodule(get_ensemble_config: dict, get_tiler: EnsembleTiler) -> AnomalibDataModule: - """Return ensemble datamodule.""" - config = get_ensemble_config - tiler = get_tiler - datamodule = get_ensemble_datamodule(config, tiler, (0, 0)) - datamodule.setup() - - return datamodule - - -@pytest.fixture(scope="module") -def get_tile_predictions(get_datamodule: AnomalibDataModule) -> EnsemblePredictions: - """Return tile predictions inside EnsemblePredictions object.""" - datamodule = get_datamodule - - data = EnsemblePredictions() - - for tile_index in [(0, 0), (0, 1), (1, 0), (1, 1)]: - datamodule.collate_fn.tile_index = tile_index - - tile_prediction = [] - batch = next(iter(datamodule.test_dataloader())) - - # make mock labels and scores - batch["pred_scores"] = torch.rand(batch["label"].shape) - batch["pred_labels"] = batch["pred_scores"] > 0.5 - - # set mock maps to just one channel of image - batch["anomaly_maps"] = batch["image"].clone()[:, 0, :, :].unsqueeze(1) - # set mock pred mask to mask but add channel - batch["pred_masks"] = batch["mask"].clone().unsqueeze(1) - - tile_prediction.append(batch) - - # store to prediction storage object - data.add_tile_prediction(tile_index, tile_prediction) - - return data - - -@pytest.fixture(scope="module") -def get_batch_predictions() -> list[dict]: - """Return mock batched predictions.""" - mock_data = { - "image": torch.rand((5, 3, 100, 100)), - "mask": (torch.rand((5, 100, 100)) > 0.5).type(torch.float32), - "anomaly_maps": torch.rand((5, 1, 100, 100)), - "label": torch.Tensor([0, 1, 1, 0, 1]), - "pred_scores": torch.rand(5), - "pred_labels": torch.ones(5), - "pred_masks": torch.zeros((5, 100, 100)), - } - - return [mock_data, mock_data] - - -@pytest.fixture(scope="module") -def get_merging_mechanism( - get_tile_predictions: EnsemblePredictions, - get_tiler: EnsembleTiler, -) -> PredictionMergingMechanism: - """Return ensemble prediction merging mechanism object.""" - tiler = get_tiler - predictions = get_tile_predictions - return PredictionMergingMechanism(predictions, tiler) - - -@pytest.fixture(scope="module") -def get_mock_stats_dir() -> Path: - """Get temp dir containing statistics.""" - with TemporaryDirectory() as temp_dir: - stats = { - "minmax": { - "anomaly_maps": { - "min": 1.9403648376464844, - "max": 209.91940307617188, - }, - "box_scores": { - "min": 0.5, - "max": 0.45, - }, - "pred_scores": { - "min": 9.390382766723633, - "max": 209.91940307617188, - }, - }, - "image_threshold": 0.1111, - "pixel_threshold": 0.1111, - } - stats_path = Path(temp_dir) / "weights" / "lightning" / "stats.json" - stats_path.parent.mkdir(parents=True) - - # save mock statistics - with stats_path.open("w", encoding="utf-8") as stats_file: - json.dump(stats, stats_file, ensure_ascii=False, indent=4) - - yield Path(temp_dir) diff --git a/tests/unit/pipelines/tiled_ensemble/dummy_config.yaml b/tests/unit/pipelines/tiled_ensemble/dummy_config.yaml deleted file mode 100644 index fcd4b7c716..0000000000 --- a/tests/unit/pipelines/tiled_ensemble/dummy_config.yaml +++ /dev/null @@ -1,52 +0,0 @@ -seed: 42 -accelerator: "cpu" -default_root_dir: "results" - -tiling: - tile_size: [50, 50] - stride: 50 - -normalization_stage: image # on what level we normalize, options: [tile, image, none] -thresholding: - method: F1AdaptiveThreshold # refer to documentation for thresholding methods - stage: image # stage at which we apply threshold, options: [tile, image] - -data: - class_path: anomalib.data.MVTec - init_args: - root: toBeSetup - category: dummy - train_batch_size: 32 - eval_batch_size: 32 - num_workers: 0 - task: segmentation - transform: null - train_transform: null - eval_transform: null - test_split_mode: from_dir - test_split_ratio: 0.2 - val_split_mode: same_as_test - val_split_ratio: 0.5 - image_size: [100, 100] - -SeamSmoothing: - apply: True # if this is applied, area around tile seams are is smoothed - sigma: 2 # sigma of gaussian filter used to smooth this area - width: 0.1 # width factor, multiplied by tile dimension gives the region width around seam which will be smoothed - -TrainModels: - model: - class_path: Fastflow - - metrics: - pixel: AUROC - image: AUROC - - trainer: - max_epochs: 1 - callbacks: - - class_path: lightning.pytorch.callbacks.EarlyStopping - init_args: - patience: 1 - monitor: pixel_AUROC - mode: max diff --git a/tests/unit/pipelines/tiled_ensemble/test_components.py b/tests/unit/pipelines/tiled_ensemble/test_components.py deleted file mode 100644 index 0e3c0dcdd4..0000000000 --- a/tests/unit/pipelines/tiled_ensemble/test_components.py +++ /dev/null @@ -1,387 +0,0 @@ -"""Test working of tiled ensemble pipeline components.""" - -# Copyright (C) 2023-2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import copy -from pathlib import Path -from tempfile import TemporaryDirectory - -import pytest -import torch - -from anomalib.data import get_datamodule -from anomalib.metrics import F1AdaptiveThreshold, ManualThreshold -from anomalib.pipelines.tiled_ensemble.components import ( - MergeJobGenerator, - MetricsCalculationJobGenerator, - NormalizationJobGenerator, - SmoothingJobGenerator, - StatisticsJobGenerator, - ThresholdingJobGenerator, -) -from anomalib.pipelines.tiled_ensemble.components.metrics_calculation import MetricsCalculationJob -from anomalib.pipelines.tiled_ensemble.components.smoothing import SmoothingJob -from anomalib.pipelines.tiled_ensemble.components.utils import NormalizationStage -from anomalib.pipelines.tiled_ensemble.components.utils.prediction_data import EnsemblePredictions -from anomalib.pipelines.tiled_ensemble.components.utils.prediction_merging import PredictionMergingMechanism - - -class TestMerging: - """Test merging mechanism and merging job.""" - - @staticmethod - def test_tile_merging(get_ensemble_config: dict, get_merging_mechanism: PredictionMergingMechanism) -> None: - """Test tiled data merging.""" - config = get_ensemble_config - merger = get_merging_mechanism - - # prepared original data - datamodule = get_datamodule(config) - datamodule.prepare_data() - datamodule.setup() - original_data = next(iter(datamodule.test_dataloader())) - - batch = merger.ensemble_predictions.get_batch_tiles(0) - - merged_image = merger.merge_tiles(batch, "image") - assert merged_image.equal(original_data["image"]) - - merged_mask = merger.merge_tiles(batch, "mask") - assert merged_mask.equal(original_data["mask"]) - - @staticmethod - def test_label_and_score_merging(get_merging_mechanism: PredictionMergingMechanism) -> None: - """Test label and score merging.""" - merger = get_merging_mechanism - scores = torch.rand(4, 10) - labels = scores > 0.5 - - mock_data = {(0, 0): {}, (0, 1): {}, (1, 0): {}, (1, 1): {}} - - for i, data in enumerate(mock_data.values()): - data["pred_scores"] = scores[i] - data["pred_labels"] = labels[i] - - merged = merger.merge_labels_and_scores(mock_data) - - assert merged["pred_scores"].equal(scores.mean(dim=0)) - - assert merged["pred_labels"].equal(labels.any(dim=0)) - - @staticmethod - def test_merge_job( - get_tile_predictions: EnsemblePredictions, - get_ensemble_config: dict, - get_merging_mechanism: PredictionMergingMechanism, - ) -> None: - """Test merging job execution.""" - config = get_ensemble_config - predictions = copy.deepcopy(get_tile_predictions) - merging_mechanism = get_merging_mechanism - - merging_job_generator = MergeJobGenerator(tiling_args=config["tiling"], data_args=config["data"]) - merging_job = next(merging_job_generator.generate_jobs(prev_stage_result=predictions)) - - merged_direct = merging_mechanism.merge_tile_predictions(0) - merged_with_job = merging_job.run()[0] - - # check that merging by job is same as with the mechanism directly - for key, value in merged_direct.items(): - if isinstance(value, torch.Tensor): - assert merged_with_job[key].equal(value) - elif isinstance(value, list) and isinstance(value[0], torch.Tensor): - # boxes - assert all(j.equal(d) for j, d in zip(merged_with_job[key], value, strict=False)) - else: - assert merged_with_job[key] == value - - -class TestStatsCalculation: - """Test post-processing statistics calculations.""" - - @staticmethod - @pytest.mark.parametrize( - ("threshold_str", "threshold_cls"), - [("F1AdaptiveThreshold", F1AdaptiveThreshold), ("ManualThreshold", ManualThreshold)], - ) - def test_threshold_method(threshold_str: str, threshold_cls: type, get_ensemble_config: dict) -> None: - """Test that correct thresholding method is used.""" - config = copy.deepcopy(get_ensemble_config) - config["thresholding"]["method"] = threshold_str - - stats_job_generator = StatisticsJobGenerator(Path("mock"), threshold_str) - stats_job = next(stats_job_generator.generate_jobs(None, None)) - - assert isinstance(stats_job.image_threshold, threshold_cls) - - @staticmethod - def test_stats_run(project_path: Path) -> None: - """Test execution of statistics calc. job.""" - mock_preds = [ - { - "pred_scores": torch.rand(4), - "label": torch.ones(4), - "anomaly_maps": torch.rand(4, 1, 50, 50), - "mask": torch.ones(4, 1, 50, 50), - }, - ] - - stats_job_generator = StatisticsJobGenerator(project_path, "F1AdaptiveThreshold") - stats_job = next(stats_job_generator.generate_jobs(None, mock_preds)) - - results = stats_job.run() - - assert "minmax" in results - assert "image_threshold" in results - assert "pixel_threshold" in results - - # save as it's removed from results - save_path = results["save_path"] - stats_job.save(results) - assert Path(save_path).exists() - - @staticmethod - @pytest.mark.parametrize( - ("key", "values"), - [ - ("anomaly_maps", [torch.rand(5, 1, 50, 50), torch.rand(5, 1, 50, 50)]), - ("pred_scores", [torch.rand(5), torch.rand(5)]), - ], - ) - def test_minmax(key: str, values: list) -> None: - """Test minmax stats calculation.""" - # add given keys to test all possible sources of minmax - data = [ - {"pred_scores": torch.rand(5), "label": torch.ones(5), key: values[0]}, - {"pred_scores": torch.rand(5), "label": torch.ones(5), key: values[1]}, - ] - - stats_job_generator = StatisticsJobGenerator(Path("mock"), "F1AdaptiveThreshold") - stats_job = next(stats_job_generator.generate_jobs(None, data)) - results = stats_job.run() - - if isinstance(values[0], list): - values[0] = torch.cat(values[0]) - values[1] = torch.cat(values[1]) - values = torch.stack(values) - - assert results["minmax"][key]["min"] == torch.min(values) - assert results["minmax"][key]["max"] == torch.max(values) - - @staticmethod - @pytest.mark.parametrize( - ("labels", "preds", "target_threshold"), - [ - (torch.Tensor([0, 0, 0, 1, 1]), torch.Tensor([2.3, 1.6, 2.6, 7.9, 3.3]), 3.3), # standard case - (torch.Tensor([1, 0, 0, 0]), torch.Tensor([4, 3, 2, 1]), 4), # 100% recall for all thresholds - ], - ) - def test_threshold(labels: torch.Tensor, preds: torch.Tensor, target_threshold: float) -> None: - """Test threshold calculation job.""" - data = [ - { - "label": labels, - "mask": labels, - "pred_scores": preds, - "anomaly_maps": preds, - }, - ] - - stats_job_generator = StatisticsJobGenerator(Path("mock"), "F1AdaptiveThreshold") - stats_job = next(stats_job_generator.generate_jobs(None, data)) - results = stats_job.run() - - assert round(results["image_threshold"], 5) == target_threshold - assert round(results["pixel_threshold"], 5) == target_threshold - - -class TestMetrics: - """Test ensemble metrics.""" - - @pytest.fixture(scope="class") - @staticmethod - def get_ensemble_metrics_job( - get_ensemble_config: dict, - get_batch_predictions: list[dict], - ) -> tuple[MetricsCalculationJob, str]: - """Return Metrics calculation job and path to directory where metrics csv will be saved.""" - config = get_ensemble_config - with TemporaryDirectory() as tmp_dir: - metrics = MetricsCalculationJobGenerator( - config["accelerator"], - root_dir=Path(tmp_dir), - task=config["data"]["init_args"]["task"], - metrics=config["TrainModels"]["metrics"], - normalization_stage=NormalizationStage(config["normalization_stage"]), - ) - - mock_predictions = get_batch_predictions - - return next(metrics.generate_jobs(prev_stage_result=copy.deepcopy(mock_predictions))), tmp_dir - - @staticmethod - def test_metrics_result(get_ensemble_metrics_job: tuple[MetricsCalculationJob, str]) -> None: - """Test metrics result.""" - metrics_job, _ = get_ensemble_metrics_job - - result = metrics_job.run() - - assert "pixel_AUROC" in result - assert "image_AUROC" in result - - @staticmethod - def test_metrics_saving(get_ensemble_metrics_job: tuple[MetricsCalculationJob, str]) -> None: - """Test metrics saving to csv.""" - metrics_job, tmp_dir = get_ensemble_metrics_job - - result = metrics_job.run() - metrics_job.save(result) - assert (Path(tmp_dir) / "metric_results.csv").exists() - - -class TestJoinSmoothing: - """Test JoinSmoothing job responsible for smoothing area at tile seams.""" - - @pytest.fixture(scope="class") - @staticmethod - def get_join_smoothing_job(get_ensemble_config: dict, get_batch_predictions: list[dict]) -> SmoothingJob: - """Make and return SmoothingJob instance.""" - config = get_ensemble_config - job_gen = SmoothingJobGenerator( - accelerator=config["accelerator"], - tiling_args=config["tiling"], - data_args=config["data"], - ) - # copy since smoothing changes data - mock_predictions = copy.deepcopy(get_batch_predictions) - return next(job_gen.generate_jobs(config["SeamSmoothing"], mock_predictions)) - - @staticmethod - def test_mask(get_join_smoothing_job: SmoothingJob) -> None: - """Test seam mask in case where tiles don't overlap.""" - smooth = get_join_smoothing_job - - join_index = smooth.tiler.tile_size_h, smooth.tiler.tile_size_w - - # seam should be covered by True - assert smooth.seam_mask[join_index] - - # non-seam region should be false - assert not smooth.seam_mask[0, 0] - assert not smooth.seam_mask[-1, -1] - - @staticmethod - def test_mask_overlapping(get_ensemble_config: dict, get_batch_predictions: list[dict]) -> None: - """Test seam mask in case where tiles overlap.""" - config = copy.deepcopy(get_ensemble_config) - # tile size = 50, stride = 25 -> overlapping - config["tiling"]["stride"] = 25 - job_gen = SmoothingJobGenerator( - accelerator=config["accelerator"], - tiling_args=config["tiling"], - data_args=config["data"], - ) - mock_predictions = copy.deepcopy(get_batch_predictions) - smooth = next(job_gen.generate_jobs(config["SeamSmoothing"], mock_predictions)) - - join_index = smooth.tiler.stride_h, smooth.tiler.stride_w - - # overlap seam should be covered by True - assert smooth.seam_mask[join_index] - assert smooth.seam_mask[-join_index[0], -join_index[1]] - - # non-seam region should be false - assert not smooth.seam_mask[0, 0] - assert not smooth.seam_mask[-1, -1] - - @staticmethod - def test_smoothing(get_join_smoothing_job: SmoothingJob, get_batch_predictions: list[dict]) -> None: - """Test smoothing job run.""" - original_data = get_batch_predictions - # fixture makes a copy of data - smooth = get_join_smoothing_job - - # take first batch - smoothed = smooth.run()[0] - join_index = smooth.tiler.tile_size_h, smooth.tiler.tile_size_w - - # join sections should be processed - assert not smoothed["anomaly_maps"][:, :, join_index].equal(original_data[0]["anomaly_maps"][:, :, join_index]) - - # non-join section shouldn't be changed - assert smoothed["anomaly_maps"][:, :, 0, 0].equal(original_data[0]["anomaly_maps"][:, :, 0, 0]) - - -def test_normalization(get_batch_predictions: list[dict], project_path: Path) -> None: - """Test normalization step.""" - original_predictions = copy.deepcopy(get_batch_predictions) - - for batch in original_predictions: - batch["anomaly_maps"] *= 100 - batch["pred_scores"] *= 100 - - # # get and save stats using stats job on predictions - stats_job_generator = StatisticsJobGenerator(project_path, "F1AdaptiveThreshold") - stats_job = next(stats_job_generator.generate_jobs(prev_stage_result=original_predictions)) - stats = stats_job.run() - stats_job.save(stats) - - # normalize predictions based on obtained stats - norm_job_generator = NormalizationJobGenerator(root_dir=project_path) - # copy as this changes preds - norm_job = next(norm_job_generator.generate_jobs(prev_stage_result=original_predictions)) - normalized_predictions = norm_job.run() - - for batch in normalized_predictions: - assert (batch["anomaly_maps"] >= 0).all() - assert (batch["anomaly_maps"] <= 1).all() - - assert (batch["pred_scores"] >= 0).all() - assert (batch["pred_scores"] <= 1).all() - - -class TestThresholding: - """Test tiled ensemble thresholding stage.""" - - @pytest.fixture(scope="class") - @staticmethod - def get_threshold_job(get_mock_stats_dir: Path) -> callable: - """Return a function that takes prediction data and runs threshold job.""" - thresh_job_generator = ThresholdingJobGenerator( - root_dir=get_mock_stats_dir, - normalization_stage=NormalizationStage.IMAGE, - ) - - def thresh_helper(preds: dict) -> list | None: - thresh_job = next(thresh_job_generator.generate_jobs(prev_stage_result=preds)) - return thresh_job.run() - - return thresh_helper - - @staticmethod - def test_score_threshold(get_threshold_job: callable) -> None: - """Test anomaly score thresholding.""" - thresholding = get_threshold_job - - data = [{"pred_scores": torch.tensor([0.7, 0.8, 0.1, 0.33, 0.5])}] - - thresholded = thresholding(data)[0] - - assert thresholded["pred_labels"].equal(torch.tensor([True, True, False, False, True])) - - @staticmethod - def test_anomap_threshold(get_threshold_job: callable) -> None: - """Test anomaly map thresholding.""" - thresholding = get_threshold_job - - data = [ - { - "pred_scores": torch.tensor([0.7, 0.8, 0.1, 0.33, 0.5]), - "anomaly_maps": torch.tensor([[0.7, 0.8, 0.1], [0.33, 0.5, 0.1]]), - }, - ] - - thresholded = thresholding(data)[0] - - assert thresholded["pred_masks"].equal(torch.tensor([[True, True, False], [False, True, False]])) diff --git a/tests/unit/pipelines/tiled_ensemble/test_helper_functions.py b/tests/unit/pipelines/tiled_ensemble/test_helper_functions.py deleted file mode 100644 index 06e5864cef..0000000000 --- a/tests/unit/pipelines/tiled_ensemble/test_helper_functions.py +++ /dev/null @@ -1,113 +0,0 @@ -"""Test ensemble helper functions.""" - -# Copyright (C) 2023-2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -from pathlib import Path - -import pytest -from jsonargparse import Namespace -from lightning.pytorch.callbacks import EarlyStopping - -from anomalib.callbacks.normalization import _MinMaxNormalizationCallback -from anomalib.models import AnomalyModule -from anomalib.pipelines.tiled_ensemble.components.utils import NormalizationStage -from anomalib.pipelines.tiled_ensemble.components.utils.ensemble_tiling import EnsembleTiler, TileCollater -from anomalib.pipelines.tiled_ensemble.components.utils.helper_functions import ( - get_ensemble_datamodule, - get_ensemble_engine, - get_ensemble_model, - get_ensemble_tiler, - get_threshold_values, - parse_trainer_kwargs, -) - - -class TestHelperFunctions: - """Test ensemble helper functions.""" - - @staticmethod - def test_ensemble_datamodule(get_ensemble_config: dict, get_tiler: EnsembleTiler) -> None: - """Test that datamodule is created and has correct collate function.""" - config = get_ensemble_config - tiler = get_tiler - datamodule = get_ensemble_datamodule(config, tiler, (0, 0)) - - assert isinstance(datamodule.collate_fn, TileCollater) - - @staticmethod - def test_ensemble_model(get_ensemble_config: dict, get_tiler: EnsembleTiler) -> None: - """Test that model is successfully created with correct input shape.""" - config = get_ensemble_config - tiler = get_tiler - model = get_ensemble_model(config["TrainModels"]["model"], tiler) - - assert model.input_size == tuple(config["tiling"]["tile_size"]) - - @staticmethod - def test_tiler(get_ensemble_config: dict) -> None: - """Test that tiler is successfully instantiated.""" - config = get_ensemble_config - - tiler = get_ensemble_tiler(config["tiling"], config["data"]) - assert isinstance(tiler, EnsembleTiler) - - @staticmethod - def test_trainer_kwargs(get_ensemble_config: dict) -> None: - """Test that objects are correctly constructed from kwargs.""" - config = get_ensemble_config - - objects = parse_trainer_kwargs(config["TrainModels"]["trainer"]) - assert isinstance(objects, Namespace) - # verify that early stopping is parsed and added to callbacks - assert isinstance(objects.callbacks[0], EarlyStopping) - - @staticmethod - @pytest.mark.parametrize( - "normalization_stage", - [NormalizationStage.NONE, NormalizationStage.IMAGE, NormalizationStage.TILE], - ) - def test_threshold_values(normalization_stage: NormalizationStage, get_mock_stats_dir: Path) -> None: - """Test that threshold values are correctly set based on normalization stage.""" - stats_dir = get_mock_stats_dir - - i_thresh, p_thresh = get_threshold_values(normalization_stage, stats_dir) - - if normalization_stage != NormalizationStage.NONE: - # minmax normalization sets thresholds to 0.5 - assert i_thresh == p_thresh == 0.5 - else: - assert i_thresh == p_thresh == 0.1111 - - -class TestEnsembleEngine: - """Test ensemble engine configuration.""" - - @staticmethod - @pytest.mark.parametrize( - "normalization_stage", - [NormalizationStage.NONE, NormalizationStage.IMAGE, NormalizationStage.TILE], - ) - def test_normalisation(normalization_stage: NormalizationStage, get_model: AnomalyModule) -> None: - """Test that normalization callback is correctly initialized.""" - engine = get_ensemble_engine( - tile_index=(0, 0), - accelerator="cpu", - devices="1", - root_dir=Path("mock"), - normalization_stage=normalization_stage, - ) - - engine._setup_anomalib_callbacks(get_model) # noqa: SLF001 - - # verify that only in case of tile level normalization the callback is present - if normalization_stage == NormalizationStage.TILE: - assert any( - isinstance(x, _MinMaxNormalizationCallback) - for x in engine._cache.args["callbacks"] # noqa: SLF001 - ) - else: - assert not any( - isinstance(x, _MinMaxNormalizationCallback) - for x in engine._cache.args["callbacks"] # noqa: SLF001 - ) diff --git a/tests/unit/pipelines/tiled_ensemble/test_prediction_data.py b/tests/unit/pipelines/tiled_ensemble/test_prediction_data.py deleted file mode 100644 index 7185f1e2ca..0000000000 --- a/tests/unit/pipelines/tiled_ensemble/test_prediction_data.py +++ /dev/null @@ -1,69 +0,0 @@ -"""Test tiled prediction storage class.""" - -# Copyright (C) 2023-2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import copy -from collections.abc import Callable - -import torch -from torch import Tensor - -from anomalib.data import AnomalibDataModule -from anomalib.pipelines.tiled_ensemble.components.utils.prediction_data import EnsemblePredictions - - -class TestPredictionData: - """Test EnsemblePredictions class, used for tiled prediction storage.""" - - @staticmethod - def store_all(data: EnsemblePredictions, datamodule: AnomalibDataModule) -> dict: - """Store the tiled predictions in the EnsemblePredictions object.""" - tile_dict = {} - for tile_index in [(0, 0), (0, 1), (1, 0), (1, 1)]: - datamodule.collate_fn.tile_index = tile_index - - tile_prediction = [] - for batch in iter(datamodule.train_dataloader()): - # set mock maps to just one channel of image - batch["anomaly_maps"] = batch["image"].clone()[:, 0, :, :].unsqueeze(1) - # set mock pred mask to mask but add channel - batch["pred_masks"] = batch["mask"].clone().unsqueeze(1) - tile_prediction.append(batch) - # save original - tile_dict[tile_index] = copy.deepcopy(tile_prediction) - # store to prediction storage object - data.add_tile_prediction(tile_index, tile_prediction) - - return tile_dict - - @staticmethod - def verify_equal(name: str, tile_dict: dict, storage: EnsemblePredictions, eq_funct: Callable) -> bool: - """Verify that all data at same tile index and same batch index matches.""" - batch_num = len(tile_dict[0, 0]) - - for batch_i in range(batch_num): - # batch is dict where key: tile index and val is batched data of that tile - curr_batch = storage.get_batch_tiles(batch_i) - - # go over all indices of current batch of stored data - for tile_index, stored_data_batch in curr_batch.items(): - stored_data = stored_data_batch[name] - # get original data dict at current tile index and batch index - original_data = tile_dict[tile_index][batch_i][name] - if isinstance(original_data, Tensor): - if not eq_funct(original_data, stored_data): - return False - elif original_data != stored_data: - return False - - return True - - def test_prediction_object(self, get_datamodule: AnomalibDataModule) -> None: - """Test prediction storage class.""" - datamodule = get_datamodule - storage = EnsemblePredictions() - original = self.store_all(storage, datamodule) - - for name in original[0, 0][0]: - assert self.verify_equal(name, original, storage, torch.equal), f"{name} doesn't match" diff --git a/tests/unit/pipelines/tiled_ensemble/test_tiler.py b/tests/unit/pipelines/tiled_ensemble/test_tiler.py deleted file mode 100644 index 96b6c0e7bc..0000000000 --- a/tests/unit/pipelines/tiled_ensemble/test_tiler.py +++ /dev/null @@ -1,119 +0,0 @@ -"""Tiling related tests for tiled ensemble.""" - -# Copyright (C) 2023-2024 Intel Corporation -# SPDX-License-Identifier: Apache-2.0 - -import copy - -import pytest -import torch - -from anomalib.data import AnomalibDataModule -from anomalib.pipelines.tiled_ensemble.components.utils.helper_functions import get_ensemble_tiler - -tiler_config = { - "tiling": { - "tile_size": 256, - "stride": 256, - }, - "data": {"init_args": {"image_size": 512}}, -} - -tiler_config_overlap = { - "tiling": { - "tile_size": 256, - "stride": 128, - }, - "data": {"init_args": {"image_size": 512}}, -} - - -class TestTiler: - """EnsembleTiler tests.""" - - @staticmethod - @pytest.mark.parametrize( - ("input_shape", "config", "expected_shape"), - [ - (torch.Size([5, 3, 512, 512]), tiler_config, torch.Size([2, 2, 5, 3, 256, 256])), - (torch.Size([5, 3, 512, 512]), tiler_config_overlap, torch.Size([3, 3, 5, 3, 256, 256])), - (torch.Size([5, 3, 500, 500]), tiler_config, torch.Size([2, 2, 5, 3, 256, 256])), - (torch.Size([5, 3, 500, 500]), tiler_config_overlap, torch.Size([3, 3, 5, 3, 256, 256])), - ], - ) - def test_basic_tile_for_ensemble(input_shape: torch.Size, config: dict, expected_shape: torch.Size) -> None: - """Test basic tiling of data.""" - config = copy.deepcopy(config) - config["data"]["init_args"]["image_size"] = input_shape[-1] - tiler = get_ensemble_tiler(config["tiling"], config["data"]) - - images = torch.rand(size=input_shape) - tiled = tiler.tile(images) - - assert tiled.shape == expected_shape - - @staticmethod - @pytest.mark.parametrize( - ("input_shape", "config"), - [ - (torch.Size([5, 3, 512, 512]), tiler_config), - (torch.Size([5, 3, 512, 512]), tiler_config_overlap), - (torch.Size([5, 3, 500, 500]), tiler_config), - (torch.Size([5, 3, 500, 500]), tiler_config_overlap), - ], - ) - def test_basic_tile_reconstruction(input_shape: torch.Size, config: dict) -> None: - """Test basic reconstruction of tiled data.""" - config = copy.deepcopy(config) - config["data"]["init_args"]["image_size"] = input_shape[-1] - - tiler = get_ensemble_tiler(config["tiling"], config["data"]) - - images = torch.rand(size=input_shape) - tiled = tiler.tile(images.clone()) - untiled = tiler.untile(tiled) - - assert images.shape == untiled.shape - assert images.equal(untiled) - - @staticmethod - @pytest.mark.parametrize( - ("input_shape", "config"), - [ - (torch.Size([5, 3, 512, 512]), tiler_config), - (torch.Size([5, 3, 500, 500]), tiler_config), - ], - ) - def test_untile_different_instance(input_shape: torch.Size, config: dict) -> None: - """Test untiling with different Tiler instance.""" - config = copy.deepcopy(config) - config["data"]["init_args"]["image_size"] = input_shape[-1] - tiler_1 = get_ensemble_tiler(config["tiling"], config["data"]) - - tiler_2 = get_ensemble_tiler(config["tiling"], config["data"]) - - images = torch.rand(size=input_shape) - tiled = tiler_1.tile(images.clone()) - - untiled = tiler_2.untile(tiled) - - # untiling should work even with different instance of tiler - assert images.shape == untiled.shape - assert images.equal(untiled) - - -class TestTileCollater: - """Test tile collater.""" - - @staticmethod - def test_collate_tile_shape(get_ensemble_config: dict, get_datamodule: AnomalibDataModule) -> None: - """Test that collate function successfully tiles the image.""" - config = get_ensemble_config - # datamodule with tile collater - datamodule = get_datamodule - - tile_w, tile_h = config["tiling"]["tile_size"] - - batch = next(iter(datamodule.train_dataloader())) - assert batch["image"].shape[1:] == (3, tile_w, tile_h) - assert batch["mask"].shape[1:] == (tile_w, tile_h)