diff --git a/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/__init__.py b/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/__init__.py
deleted file mode 100644
index 5bcf71d66ca..00000000000
--- a/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/__init__.py
+++ /dev/null
@@ -1,12 +0,0 @@
-"""Exportable code for Anomaly tasks."""
-
-# Copyright (C) 2021-2022 Intel Corporation
-# SPDX-License-Identifier: Apache-2.0
-#
-
-from .anomaly_classification import AnomalyClassification
-from .anomaly_detection import AnomalyDetection
-from .anomaly_segmentation import AnomalySegmentation
-from .base import AnomalyBase
-
-__all__ = ["AnomalyBase", "AnomalyClassification", "AnomalyDetection", "AnomalySegmentation"]
diff --git a/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/anomaly_classification.py b/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/anomaly_classification.py
deleted file mode 100644
index bd61d0f3c05..00000000000
--- a/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/anomaly_classification.py
+++ /dev/null
@@ -1,47 +0,0 @@
-"""Wrapper for Open Model Zoo for Anomaly Classification tasks."""
-
-# Copyright (C) 2021-2022 Intel Corporation
-# SPDX-License-Identifier: Apache-2.0
-#
-
-from typing import Any, Dict
-
-import cv2
-import numpy as np
-
-from .base import AnomalyBase
-
-
-class AnomalyClassification(AnomalyBase):
- """Wrapper for anomaly classification task."""
-
- __model__ = "anomaly_classification"
-
- def postprocess(self, outputs: Dict[str, np.ndarray], meta: Dict[str, Any]) -> float:
- """Resize the outputs of the model to original image size.
-
- Args:
- outputs (Dict[str, np.ndarray]): Raw outputs of the model after ``infer_sync`` is called.
- meta (Dict[str, Any]): Metadata which contains values such as threshold, original image size.
-
- Returns:
- float: Normalized anomaly score
- """
- anomaly_map: np.ndarray = outputs[self.output_blob_name].squeeze()
- pred_score = anomaly_map.reshape(-1).max()
-
- meta["image_threshold"] = self.metadata["image_threshold"] # pylint: disable=no-member
- meta["min"] = self.metadata["min"] # pylint: disable=no-member
- meta["max"] = self.metadata["max"] # pylint: disable=no-member
- meta["threshold"] = self.threshold # pylint: disable=no-member
-
- anomaly_map = self._normalize(anomaly_map, meta["image_threshold"], meta["min"], meta["max"])
- pred_score = self._normalize(pred_score, meta["image_threshold"], meta["min"], meta["max"])
-
- input_image_height = meta["original_shape"][0]
- input_image_width = meta["original_shape"][1]
- result = cv2.resize(anomaly_map, (input_image_width, input_image_height))
-
- meta["anomaly_map"] = result
-
- return np.array(pred_score)
diff --git a/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/anomaly_detection.py b/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/anomaly_detection.py
deleted file mode 100644
index 47e8b49697e..00000000000
--- a/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/anomaly_detection.py
+++ /dev/null
@@ -1,43 +0,0 @@
-"""Wrapper for Open Model Zoo for Anomaly Detection tasks."""
-
-# Copyright (C) 2021-2022 Intel Corporation
-# SPDX-License-Identifier: Apache-2.0
-#
-
-from typing import Any, Dict
-
-import cv2
-import numpy as np
-
-from .base import AnomalyBase
-
-
-class AnomalyDetection(AnomalyBase):
- """Wrapper for anomaly detection task."""
-
- __model__ = "anomaly_detection"
-
- def postprocess(self, outputs: Dict[str, np.ndarray], meta: Dict[str, Any]) -> np.ndarray:
- """Resize the outputs of the model to original image size.
-
- Args:
- outputs (Dict[str, np.ndarray]): Raw outputs of the model after ``infer_sync`` is called.
- meta (Dict[str, Any]): Metadata which contains values such as threshold, original image size.
-
- Returns:
- np.ndarray: Detection Mask
- """
- anomaly_map: np.ndarray = outputs[self.output_blob_name].squeeze()
-
- meta["pixel_threshold"] = self.metadata["pixel_threshold"] # pylint: disable=no-member
- meta["min"] = self.metadata["min"] # pylint: disable=no-member
- meta["max"] = self.metadata["max"] # pylint: disable=no-member
- meta["threshold"] = self.threshold # pylint: disable=no-member
-
- anomaly_map = self._normalize(anomaly_map, meta["pixel_threshold"], meta["min"], meta["max"])
-
- input_image_height = meta["original_shape"][0]
- input_image_width = meta["original_shape"][1]
- result = cv2.resize(anomaly_map, (input_image_width, input_image_height))
-
- return result
diff --git a/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/anomaly_segmentation.py b/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/anomaly_segmentation.py
deleted file mode 100644
index 7335e914024..00000000000
--- a/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/anomaly_segmentation.py
+++ /dev/null
@@ -1,43 +0,0 @@
-"""Wrapper for Open Model Zoo for Anomaly Segmentation tasks."""
-
-# Copyright (C) 2021-2022 Intel Corporation
-# SPDX-License-Identifier: Apache-2.0
-#
-
-from typing import Any, Dict
-
-import cv2
-import numpy as np
-
-from .base import AnomalyBase
-
-
-class AnomalySegmentation(AnomalyBase):
- """Wrapper for anomaly segmentation task."""
-
- __model__ = "anomaly_segmentation"
-
- def postprocess(self, outputs: Dict[str, np.ndarray], meta: Dict[str, Any]) -> np.ndarray:
- """Resize the outputs of the model to original image size.
-
- Args:
- outputs (Dict[str, np.ndarray]): Raw outputs of the model after ``infer_sync`` is called.
- meta (Dict[str, Any]): Metadata which contains values such as threshold, original image size.
-
- Returns:
- np.ndarray: Segmentation Mask
- """
- anomaly_map: np.ndarray = outputs[self.output_blob_name].squeeze()
-
- meta["pixel_threshold"] = self.metadata["pixel_threshold"] # pylint: disable=no-member
- meta["min"] = self.metadata["min"] # pylint: disable=no-member
- meta["max"] = self.metadata["max"] # pylint: disable=no-member
- meta["threshold"] = self.threshold # pylint: disable=no-member
-
- anomaly_map = self._normalize(anomaly_map, meta["pixel_threshold"], meta["min"], meta["max"])
-
- input_image_height = meta["original_shape"][0]
- input_image_width = meta["original_shape"][1]
- result = cv2.resize(anomaly_map, (input_image_width, input_image_height))
-
- return result
diff --git a/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/base.py b/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/base.py
deleted file mode 100644
index cf25d59fff2..00000000000
--- a/src/otx/algorithms/anomaly/adapters/anomalib/exportable_code/base.py
+++ /dev/null
@@ -1,47 +0,0 @@
-"""Wrapper for Open Model Zoo for Anomaly tasks."""
-
-# Copyright (C) 2021-2022 Intel Corporation
-# SPDX-License-Identifier: Apache-2.0
-#
-
-from typing import Union
-
-import numpy as np
-from openvino.model_api.models import SegmentationModel
-from openvino.model_api.models.types import DictValue, NumericalValue
-
-
-class AnomalyBase(SegmentationModel):
- """Wrapper for anomaly tasks."""
-
- __model__ = "anomaly_base"
-
- @classmethod
- def parameters(cls):
- """Dictionary containing model parameters."""
- parameters = super().parameters()
- parameters["resize_type"].update_default_value("standard")
- parameters.update(
- {
- "metadata": DictValue(description="Metadata for inference"),
- "threshold": NumericalValue(description="Threshold used to classify anomaly"),
- }
- )
-
- return parameters
-
- @staticmethod
- def _normalize(
- targets: Union[np.ndarray, np.float32],
- threshold: Union[np.ndarray, float],
- min_val: Union[np.ndarray, float],
- max_val: Union[np.ndarray, float],
- ) -> np.ndarray:
- """Apply min-max normalization and shift the values such that the threshold value is centered at 0.5."""
- normalized = ((targets - threshold) / (max_val - min_val)) + 0.5
- if isinstance(targets, (np.ndarray, np.float32)):
- normalized = np.minimum(normalized, 1)
- normalized = np.maximum(normalized, 0)
- else:
- raise ValueError(f"Targets must be either Tensor or Numpy array. Received {type(targets)}")
- return normalized
diff --git a/src/otx/algorithms/anomaly/tasks/inference.py b/src/otx/algorithms/anomaly/tasks/inference.py
index 0dc871c4cca..50c5a4b81f7 100644
--- a/src/otx/algorithms/anomaly/tasks/inference.py
+++ b/src/otx/algorithms/anomaly/tasks/inference.py
@@ -357,7 +357,7 @@ def _add_metadata_to_ir(self, model_file: str, export_type: ExportType) -> None:
if "min" in metadata and "max" in metadata:
extra_model_data[("model_info", "normalization_scale")] = metadata["max"] - metadata["min"]
- extra_model_data[("model_info", "reverse_input_channels")] = True
+ extra_model_data[("model_info", "reverse_input_channels")] = False
extra_model_data[("model_info", "model_type")] = "AnomalyDetection"
extra_model_data[("model_info", "labels")] = "Normal Anomaly"
if export_type == ExportType.OPENVINO:
diff --git a/src/otx/algorithms/anomaly/tasks/openvino.py b/src/otx/algorithms/anomaly/tasks/openvino.py
index 3800b264e0d..ecdc8e7cd28 100644
--- a/src/otx/algorithms/anomaly/tasks/openvino.py
+++ b/src/otx/algorithms/anomaly/tasks/openvino.py
@@ -19,7 +19,7 @@
import os
import random
import tempfile
-from typing import Any, Dict, Optional
+from typing import Any, Dict, List, Optional, Tuple, Union
from zipfile import ZipFile
import nncf
@@ -27,14 +27,14 @@
import openvino.runtime as ov
from addict import Dict as ADDict
from anomalib.data.utils.transform import get_transforms
-from anomalib.deploy import OpenVINOInferencer
from nncf.common.quantization.structs import QuantizationPreset
from omegaconf import OmegaConf
+from openvino.model_api.models import AnomalyDetection, AnomalyResult
-import otx.algorithms.anomaly.adapters.anomalib.exportable_code
from otx.algorithms.anomaly.adapters.anomalib.config import get_anomalib_config
from otx.algorithms.anomaly.adapters.anomalib.logger import get_logger
from otx.algorithms.anomaly.configs.base.configuration import BaseAnomalyConfig
+from otx.algorithms.common.utils import embed_ir_model_data
from otx.algorithms.common.utils.ir import check_if_quantized
from otx.algorithms.common.utils.utils import read_py_config
from otx.api.configuration.configurable_parameters import ConfigurableParameters
@@ -80,17 +80,15 @@ class OTXOpenVINOAnomalyDataloader:
Args:
dataset (DatasetEntity): OTX dataset entity
- inferencer (OpenVINOInferencer): OpenVINO Inferencer
+ shuffle (bool, optional): Shuffle dataset. Defaults to True.
"""
def __init__(
self,
dataset: DatasetEntity,
- inferencer: OpenVINOInferencer,
shuffle: bool = True,
):
self.dataset = dataset
- self.inferencer = inferencer
self.shuffler = None
if shuffle:
self.shuffler = list(range(len(dataset)))
@@ -110,9 +108,8 @@ def __getitem__(self, index: int):
image = self.dataset[index].numpy
annotation = self.dataset[index].annotation_scene
- inputs = self.inferencer.pre_process(image)
- return (index, annotation), inputs
+ return (index, annotation), image
def __len__(self) -> int:
"""Get size of the dataset.
@@ -135,7 +132,7 @@ def __init__(self, task_environment: TaskEnvironment) -> None:
self.task_environment = task_environment
self.task_type = self.task_environment.model_template.task_type
self.config = self.get_config()
- self.inferencer = self.load_inferencer()
+ self.inference_model = self.get_openvino_model()
labels = self.task_environment.get_labels()
self.normal_label = [label for label in labels if not label.is_anomalous][0]
@@ -173,15 +170,13 @@ def infer(self, dataset: DatasetEntity, inference_parameters: InferenceParameter
if inference_parameters is not None:
update_progress_callback = inference_parameters.update_progress # type: ignore
- # This always assumes that threshold is available in the task environment's model
- meta_data = self.get_metadata()
for idx, dataset_item in enumerate(dataset):
- image_result = self.inferencer.predict(dataset_item.numpy, metadata=meta_data)
+ image_result: AnomalyResult = self.inference_model(dataset_item.numpy)
# TODO: inferencer should return predicted label and mask
- pred_label = image_result.pred_score >= 0.5
- pred_mask = (image_result.anomaly_map >= 0.5).astype(np.uint8)
- probability = image_result.pred_score if pred_label else 1 - image_result.pred_score
+ pred_label = image_result.pred_label
+ pred_mask = image_result.pred_mask
+ probability = image_result.pred_score if pred_label == "Anomaly" else 1 - image_result.pred_score
if self.task_type == TaskType.ANOMALY_CLASSIFICATION:
label = self.anomalous_label if image_result.pred_score >= 0.5 else self.normal_label
elif self.task_type == TaskType.ANOMALY_SEGMENTATION:
@@ -320,7 +315,7 @@ def optimize(
)
logger.info("Starting PTQ optimization.")
- data_loader = OTXOpenVINOAnomalyDataloader(dataset=dataset, inferencer=self.inferencer)
+ data_loader = OTXOpenVINOAnomalyDataloader(dataset=dataset)
quantization_dataset = nncf.Dataset(data_loader, lambda data: data[1])
with tempfile.TemporaryDirectory() as tempdir:
@@ -355,34 +350,100 @@ def optimize(
self.__load_weights(path=os.path.join(tempdir, "model.bin"), output_model=output_model, key="openvino.bin")
output_model.set_data("label_schema.json", label_schema_to_bytes(self.task_environment.label_schema))
- output_model.set_data("metadata", self.task_environment.model.get_data("metadata"))
output_model.model_format = ModelFormat.OPENVINO
output_model.optimization_type = ModelOptimizationType.POT
output_model.optimization_methods = [OptimizationMethod.QUANTIZATION]
output_model.precision = [ModelPrecision.INT8]
self.task_environment.model = output_model
- self.inferencer = self.load_inferencer()
+ self.inference_model = self.get_openvino_model()
if optimization_parameters is not None:
optimization_parameters.update_progress(100, None)
logger.info("PTQ optimization completed")
- def load_inferencer(self) -> OpenVINOInferencer:
+ def get_openvino_model(self) -> AnomalyDetection:
"""Create the OpenVINO inferencer object.
Returns:
- OpenVINOInferencer object
+ AnomalyDetection model
"""
if self.task_environment.model is None:
raise Exception("task_environment.model is None. Cannot load weights.")
- return OpenVINOInferencer(
- path=(
- self.task_environment.model.get_data("openvino.xml"),
- self.task_environment.model.get_data("openvino.bin"),
- ),
- metadata=self.get_metadata(),
- )
+ try:
+ model = AnomalyDetection.create_model(
+ model=self.task_environment.model.get_data("openvino.xml"),
+ weights_path=self.task_environment.model.get_data("openvino.bin"),
+ )
+ except RuntimeError as exception:
+ logger.exception(exception)
+ logger.info("Possibly a legacy model is being loaded.")
+ self._create_from_legacy()
+ model = AnomalyDetection.create_model(
+ model=self.task_environment.model.get_data("openvino.xml"),
+ weights_path=self.task_environment.model.get_data("openvino.bin"),
+ )
+
+ return model
+
+ def _create_from_legacy(self) -> None:
+ """Generates an OpenVINO model in new format from the legacy model.
+
+ TODO: This needs to be removed once all projects in Geti have been migrated to the newer version.
+
+ Args:
+ model_file (str): The XML model file.
+ """
+ metadata = self.get_metadata()
+ extra_model_data: Dict[Tuple[str, str], Any] = {}
+ for key, value in metadata.items():
+ if key in ("transform", "min", "max"):
+ continue
+ extra_model_data[("model_info", key)] = value
+ # Add transforms
+ if "transform" in metadata:
+ for transform_dict in metadata["transform"]["transform"]["transforms"]:
+ transform = transform_dict.pop("__class_fullname__")
+ if transform == "Normalize":
+ extra_model_data[("model_info", "mean_values")] = self._serialize_list(
+ [x * 255.0 for x in transform_dict["mean"]]
+ )
+ extra_model_data[("model_info", "scale_values")] = self._serialize_list(
+ [x * 255.0 for x in transform_dict["std"]]
+ )
+ elif transform == "Resize":
+ extra_model_data[("model_info", "orig_height")] = transform_dict["height"]
+ extra_model_data[("model_info", "orig_width")] = transform_dict["width"]
+ else:
+ logger.warn(f"Transform {transform} is not supported currently")
+ # Since we only need the diff of max and min, we fuse the min and max into one op
+ if "min" in metadata and "max" in metadata:
+ extra_model_data[("model_info", "normalization_scale")] = metadata["max"] - metadata["min"]
+
+ extra_model_data[("model_info", "reverse_input_channels")] = False
+ extra_model_data[("model_info", "model_type")] = "AnomalyDetection"
+ extra_model_data[("model_info", "labels")] = "Normal Anomaly"
+
+ for key, value in extra_model_data.items():
+ if isinstance(value, np.ndarray):
+ extra_model_data[key] = value.tolist()
+
+ with tempfile.TemporaryDirectory() as temp_dir:
+ xml_data = self.task_environment.model.get_data("openvino.xml")
+ bin_data = self.task_environment.model.get_data("openvino.bin")
+ with open(f"{temp_dir}/openvino.xml", "wb") as file:
+ file.write(xml_data)
+ with open(f"{temp_dir}/openvino.bin", "wb") as file:
+ file.write(bin_data)
+ embed_ir_model_data(f"{temp_dir}/openvino.xml", extra_model_data)
+ with open(f"{temp_dir}/openvino.xml", "rb") as file:
+ self.task_environment.model.set_data("openvino.xml", file.read())
+ with open(f"{temp_dir}/openvino.bin", "rb") as file:
+ self.task_environment.model.set_data("openvino.bin", file.read())
+
+ def _serialize_list(self, arr: Union[Tuple, List]) -> str:
+ """Converts a list to space separated string."""
+ return " ".join(map(str, arr))
@staticmethod
def __save_weights(path: str, data: bytes) -> None:
@@ -412,19 +473,10 @@ def _get_openvino_configuration(self) -> Dict[str, Any]:
if self.task_environment.model is None:
raise Exception("task_environment.model is None. Cannot get configuration.")
- configuration = {
- "metadata": self.get_metadata(),
+ configuration: Dict[str, Any] = {
"labels": LabelSchemaMapper.forward(self.task_environment.label_schema),
- "threshold": 0.5,
}
- if "transforms" not in self.config.keys():
- configuration["mean_values"] = list(np.array([0.485, 0.456, 0.406]) * 255)
- configuration["scale_values"] = list(np.array([0.229, 0.224, 0.225]) * 255)
- else:
- configuration["mean_values"] = self.config.transforms.mean
- configuration["scale_values"] = self.config.transforms.std
-
return configuration
def deploy(self, output_model: ModelEntity) -> None:
@@ -446,7 +498,7 @@ def deploy(self, output_model: ModelEntity) -> None:
task_type = str(self.task_type).lower()
- parameters["type_of_model"] = task_type
+ parameters["type_of_model"] = "AnomalyDetection"
parameters["converter_type"] = task_type.upper()
parameters["model_parameters"] = self._get_openvino_configuration()
zip_buffer = io.BytesIO()
@@ -455,17 +507,6 @@ def deploy(self, output_model: ModelEntity) -> None:
arch.writestr(os.path.join("model", "model.xml"), self.task_environment.model.get_data("openvino.xml"))
arch.writestr(os.path.join("model", "model.bin"), self.task_environment.model.get_data("openvino.bin"))
arch.writestr(os.path.join("model", "config.json"), json.dumps(parameters, ensure_ascii=False, indent=4))
- # model_wrappers files
- for root, _, files in os.walk(
- os.path.dirname(otx.algorithms.anomaly.adapters.anomalib.exportable_code.__file__)
- ):
- if "__pycache__" in root:
- continue
- for file in files:
- file_path = os.path.join(root, file)
- arch.write(
- file_path, os.path.join("python", "model_wrappers", file_path.split("exportable_code/")[1])
- )
# other python files
arch.write(os.path.join(work_dir, "requirements.txt"), os.path.join("python", "requirements.txt"))
arch.write(os.path.join(work_dir, "LICENSE"), os.path.join("python", "LICENSE"))
diff --git a/src/otx/api/usecases/exportable_code/prediction_to_annotation_converter.py b/src/otx/api/usecases/exportable_code/prediction_to_annotation_converter.py
index d31964a80fe..7ea4c568b63 100644
--- a/src/otx/api/usecases/exportable_code/prediction_to_annotation_converter.py
+++ b/src/otx/api/usecases/exportable_code/prediction_to_annotation_converter.py
@@ -11,10 +11,11 @@
import numpy as np
from openvino.model_api.models import utils
from openvino.model_api.models.utils import (
+ AnomalyResult,
ClassificationResult,
+ DetectionResult,
ImageResultWithSoftPrediction,
InstanceSegmentationResult,
- DetectionResult,
)
from otx.api.entities.annotation import (
@@ -26,16 +27,14 @@
from otx.api.entities.label import Domain
from otx.api.entities.label_schema import LabelSchemaEntity
from otx.api.entities.scored_label import ScoredLabel
-from otx.api.entities.shapes.polygon import Point, Polygon
from otx.api.entities.shapes.ellipse import Ellipse
+from otx.api.entities.shapes.polygon import Point, Polygon
from otx.api.entities.shapes.rectangle import Rectangle
-from otx.api.utils.anomaly_utils import create_detection_annotation_from_anomaly_heatmap
+from otx.api.utils.detection_utils import detection2array
from otx.api.utils.labels_utils import get_empty_label
from otx.api.utils.segmentation_utils import create_annotation_from_segmentation_map
from otx.api.utils.time_utils import now
-from otx.api.utils.detection_utils import detection2array
-
def convert_bbox_to_ellipse(x1, y1, x2, y2) -> Ellipse:
"""Convert bbox to ellipse."""
@@ -332,7 +331,7 @@ def __init__(self, label_schema: LabelSchemaEntity):
self.normal_label = [label for label in labels if not label.is_anomalous][0]
self.anomalous_label = [label for label in labels if label.is_anomalous][0]
- def convert_to_annotation(self, predictions: np.ndarray, metadata: Dict[str, Any]) -> AnnotationSceneEntity:
+ def convert_to_annotation(self, predictions: AnomalyResult, metadata: Dict[str, Any]) -> AnnotationSceneEntity:
"""Convert predictions to OTX Annotation Scene using the metadata.
Args:
@@ -342,15 +341,14 @@ def convert_to_annotation(self, predictions: np.ndarray, metadata: Dict[str, Any
Returns:
AnnotationSceneEntity: OTX annotation scene entity object.
"""
- pred_label = predictions >= metadata.get("threshold", 0.5)
-
- label = self.anomalous_label if pred_label else self.normal_label
- probability = (1 - predictions) if predictions < 0.5 else predictions
+ assert predictions.pred_score is not None
+ assert predictions.pred_label is not None
+ label = self.anomalous_label if predictions.pred_label == "Anomaly" else self.normal_label
annotations = [
Annotation(
Rectangle.generate_full_box(),
- labels=[ScoredLabel(label=label, probability=float(probability))],
+ labels=[ScoredLabel(label=label, probability=float(predictions.pred_score))],
)
]
return AnnotationSceneEntity(kind=AnnotationSceneKind.PREDICTION, annotations=annotations)
@@ -369,7 +367,7 @@ def __init__(self, label_schema: LabelSchemaEntity):
self.anomalous_label = [label for label in labels if label.is_anomalous][0]
self.label_map = {0: self.normal_label, 1: self.anomalous_label}
- def convert_to_annotation(self, predictions: np.ndarray, metadata: Dict[str, Any]) -> AnnotationSceneEntity:
+ def convert_to_annotation(self, predictions: AnomalyResult, metadata: Dict[str, Any]) -> AnnotationSceneEntity:
"""Convert predictions to OTX Annotation Scene using the metadata.
Args:
@@ -379,9 +377,11 @@ def convert_to_annotation(self, predictions: np.ndarray, metadata: Dict[str, Any
Returns:
AnnotationSceneEntity: OTX annotation scene entity object.
"""
- pred_mask = predictions >= 0.5
- mask = pred_mask.squeeze().astype(np.uint8)
- annotations = create_annotation_from_segmentation_map(mask, predictions, self.label_map)
+ assert predictions.pred_mask is not None
+ assert predictions.anomaly_map is not None
+ annotations = create_annotation_from_segmentation_map(
+ predictions.pred_mask, predictions.anomaly_map, self.label_map
+ )
if len(annotations) == 0:
# TODO: add confidence to this label
annotations = [
@@ -411,7 +411,7 @@ def __init__(self, label_schema: LabelSchemaEntity):
self.anomalous_label = [label for label in labels if label.is_anomalous][0]
self.label_map = {0: self.normal_label, 1: self.anomalous_label}
- def convert_to_annotation(self, predictions: np.ndarray, metadata: Dict[str, Any]) -> AnnotationSceneEntity:
+ def convert_to_annotation(self, predictions: AnomalyResult, metadata: Dict[str, Any]) -> AnnotationSceneEntity:
"""Convert predictions to OTX Annotation Scene using the metadata.
Args:
@@ -421,9 +421,18 @@ def convert_to_annotation(self, predictions: np.ndarray, metadata: Dict[str, Any
Returns:
AnnotationSceneEntity: OTX annotation scene entity object.
"""
- pred_mask = predictions >= 0.5
- mask = pred_mask.squeeze().astype(np.uint8)
- annotations = create_detection_annotation_from_anomaly_heatmap(mask, predictions, self.label_map)
+ assert predictions.pred_boxes is not None
+ assert predictions.pred_score is not None
+ assert predictions.pred_mask is not None
+ annotations = []
+ image_h, image_w = predictions.pred_mask.shape
+ for box in predictions.pred_boxes:
+ annotations.append(
+ Annotation(
+ Rectangle(box[0] / image_w, box[1] / image_h, box[2] / image_w, box[3] / image_h),
+ labels=[ScoredLabel(label=self.anomalous_label, probability=predictions.pred_score)],
+ )
+ )
if len(annotations) == 0:
# TODO: add confidence to this label
annotations = [
diff --git a/tests/unit/algorithms/anomaly/tasks/test_openvino.py b/tests/unit/algorithms/anomaly/tasks/test_openvino.py
index 82d1174bb97..8fb222189aa 100644
--- a/tests/unit/algorithms/anomaly/tasks/test_openvino.py
+++ b/tests/unit/algorithms/anomaly/tasks/test_openvino.py
@@ -91,7 +91,7 @@ def test_openvino(self, tmpdir, setup_task_environment):
openvino_task.deploy(output_model)
assert output_model.exportable_code is not None
- @patch.multiple(OpenVINOTask, get_config=MagicMock(), load_inferencer=MagicMock())
+ @patch.multiple(OpenVINOTask, get_config=MagicMock(), get_openvino_model=MagicMock())
@patch("otx.algorithms.anomaly.tasks.openvino.get_transforms", MagicMock())
def test_anomaly_legacy_keys(self, mocker, tmp_dir):
"""Checks whether the model is loaded correctly with legacy and current keys."""
diff --git a/tests/unit/api/usecases/exportable_code/test_prediction_to_annotation_converter.py b/tests/unit/api/usecases/exportable_code/test_prediction_to_annotation_converter.py
index 5a6518e8689..f48729b1775 100644
--- a/tests/unit/api/usecases/exportable_code/test_prediction_to_annotation_converter.py
+++ b/tests/unit/api/usecases/exportable_code/test_prediction_to_annotation_converter.py
@@ -6,9 +6,11 @@
import numpy as np
import pytest
-from openvino.model_api.models.utils import Detection
-from openvino.model_api.models.utils import ClassificationResult
-from openvino.model_api.models.utils import ImageResultWithSoftPrediction
+from openvino.model_api.models.utils import (
+ ClassificationResult,
+ Detection,
+ ImageResultWithSoftPrediction,
+)
from otx.api.entities.annotation import (
Annotation,
@@ -943,69 +945,3 @@ def test_anomaly_classification_to_annotation_init(
converter = AnomalyClassificationToAnnotationConverter(label_schema=label_schema)
assert converter.normal_label == non_empty_labels[0]
assert converter.anomalous_label == non_empty_labels[2]
-
- @pytest.mark.priority_medium
- @pytest.mark.unit
- @pytest.mark.reqids(Requirements.REQ_1)
- def test_anomaly_classification_to_annotation_convert(
- self,
- ):
- """
- Description:
- Check "AnomalyClassificationToAnnotationConverter" class "convert_to_annotation" method
-
- Input data:
- "AnomalyClassificationToAnnotationConverter" class object, "predictions" array
-
- Expected results:
- Test passes if "AnnotationSceneEntity" object returned by "convert_to_annotation" method is equal to
- expected
-
- Steps
- 1. Check attributes of "AnnotationSceneEntity" object returned by "convert_to_annotation" method for
- "metadata" dictionary with specified "threshold" key
- 2. Check attributes of "AnnotationSceneEntity" object returned by "convert_to_annotation" method for
- "metadata" dictionary without specified "threshold" key
- """
-
- def check_annotation(actual_annotation: Annotation, expected_labels: list):
- assert isinstance(actual_annotation, Annotation)
- assert actual_annotation.get_labels() == expected_labels
- assert isinstance(actual_annotation.shape, Rectangle)
- assert Rectangle.is_full_box(rectangle=actual_annotation.shape)
-
- non_empty_labels = [
- LabelEntity(name="Normal", domain=Domain.CLASSIFICATION, id=ID("1")),
- LabelEntity(
- name="Anomalous",
- domain=Domain.CLASSIFICATION,
- id=ID("2"),
- is_anomalous=True,
- ),
- ]
- label_group = LabelGroup(name="Anomaly classification labels group", labels=non_empty_labels)
- label_schema = LabelSchemaEntity(label_groups=[label_group])
- converter = AnomalyClassificationToAnnotationConverter(label_schema=label_schema)
- predictions = np.array([0.7])
- # Checking attributes of "AnnotationSceneEntity" returned by "convert_to_annotation" for "metadata" with
- # specified "threshold" key
- metadata = {
- "non-required key": 1,
- "other non-required key": 2,
- "threshold": 0.8,
- }
- predictions_to_annotations = converter.convert_to_annotation(predictions=predictions, metadata=metadata)
- check_annotation_scene(annotation_scene=predictions_to_annotations, expected_length=1)
- check_annotation(
- predictions_to_annotations.annotations[0],
- expected_labels=[ScoredLabel(label=non_empty_labels[0], probability=0.7)],
- )
- # Checking attributes of "AnnotationSceneEntity" returned by "convert_to_annotation" for "metadata" without
- # specified "threshold" key
- metadata = {"non-required key": 1, "other non-required key": 2}
- predictions_to_annotations = converter.convert_to_annotation(predictions=predictions, metadata=metadata)
- check_annotation_scene(annotation_scene=predictions_to_annotations, expected_length=1)
- check_annotation(
- predictions_to_annotations.annotations[0],
- expected_labels=[ScoredLabel(label=non_empty_labels[1], probability=0.7)],
- )