-
Notifications
You must be signed in to change notification settings - Fork 714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
π Anomalib Pipelines #2005
Merged
ashwinvaidya17
merged 18 commits into
openvinotoolkit:feature/pipelines
from
ashwinvaidya17:feature/pipeline_v2
May 15, 2024
+992
β12
Merged
π Anomalib Pipelines #2005
Changes from 6 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
a4778d7
Add initial design
ashwinvaidya17 29af8fd
Refactor + add to CLI
ashwinvaidya17 246ca2b
Support grid search on class path
ashwinvaidya17 051566a
redirect outputs
ashwinvaidya17 0641e16
design v2
ashwinvaidya17 a9cb590
remove commented code
ashwinvaidya17 b4b981b
add dummy experiment
ashwinvaidya17 eaca521
add config
ashwinvaidya17 d279e75
Refactor
ashwinvaidya17 979934c
Add tests
ashwinvaidya17 f45a0fd
Merge branch 'main' of github.com:openvinotoolkit/anomalib into featuβ¦
ashwinvaidya17 6efaf42
Apply suggestions from code review
ashwinvaidya17 b4db761
address pr comments
ashwinvaidya17 59a480d
Apply suggestions from code review
ashwinvaidya17 c505236
refactor
ashwinvaidya17 8d64be8
Simplify argparse
ashwinvaidya17 997fb40
modify logger redirect
ashwinvaidya17 b7ef7dc
update docstrings
ashwinvaidya17 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
"""Subcommand for pipelines.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from jsonargparse import ArgumentParser, Namespace | ||
|
||
from anomalib.utils.exceptions import try_import | ||
|
||
if try_import("anomalib.pipelines.orchestrators"): | ||
from anomalib.pipelines.orchestrators import Benchmark, Orchestrator | ||
|
||
PIPELINE_REGISTRY: dict[str, Orchestrator] | None = { | ||
"benchmark": Benchmark(), | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this fit into a single line? |
||
else: | ||
PIPELINE_REGISTRY = None | ||
|
||
|
||
def add_pipeline_subparsers(parser: ArgumentParser) -> None: | ||
"""Add subparsers for pipelines.""" | ||
if PIPELINE_REGISTRY is not None: | ||
subcommands = parser.add_subcommands(dest="subcommand", help="Run Pipelines", required=True) | ||
for name, pipeline in PIPELINE_REGISTRY.items(): | ||
subcommands.add_subcommand(name, pipeline.get_parser(), help=f"Run {name} pipeline") | ||
|
||
|
||
def run_pipeline(args: Namespace) -> None: | ||
samet-akcay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Run pipeline.""" | ||
if PIPELINE_REGISTRY is not None: | ||
config = args.pipeline[args.pipeline.subcommand] | ||
PIPELINE_REGISTRY[args.pipeline.subcommand].run(config) | ||
else: | ||
msg = "Pipeline is not available" | ||
raise ValueError(msg) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
"""Pipelines for end-to-end usecases.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from .runners import ParallelRunner, SerialRunner | ||
|
||
__all__ = ["ParallelRunner", "SerialRunner"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
"""Pipeline jobs.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from .benchmark import BenchmarkJob | ||
|
||
__all__ = ["BenchmarkJob"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
"""Job from which all the jobs inherit from.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import logging | ||
from abc import ABC, abstractmethod | ||
from collections.abc import Iterator | ||
|
||
from jsonargparse import ArgumentParser, Namespace | ||
|
||
from anomalib.pipelines.types import GATHERED_RESULTS, RUN_RESULTS | ||
|
||
|
||
class Job(ABC): | ||
"""A job is an atomic unit of work that can be run in parallel with other jobs.""" | ||
|
||
name: str | ||
|
||
def __init__(self) -> None: | ||
self.logger = logging.getLogger(self.name) | ||
|
||
@abstractmethod | ||
def run(self, task_id: int | None = None, **kwargs) -> RUN_RESULTS: | ||
"""A job is a single unit of work that can be run in parallel with other jobs. | ||
|
||
``task_id`` is optional and is only passed when the job is run in parallel. | ||
""" | ||
|
||
@abstractmethod | ||
def on_collect(self, results: list[RUN_RESULTS]) -> GATHERED_RESULTS: | ||
"""Gather the results returned from run. | ||
|
||
This can be used to combine the results from multiple runs or to save/process individual job results. | ||
|
||
Args: | ||
results (list): List of results returned from run. | ||
""" | ||
|
||
@abstractmethod | ||
def on_save(self, results: GATHERED_RESULTS) -> None: | ||
"""Save the gathered results. | ||
|
||
This can be used to save the results in a file or a database. | ||
|
||
Args: | ||
results: The gathered result returned from gather_results. | ||
""" | ||
|
||
@staticmethod | ||
@abstractmethod | ||
def add_arguments(parser: ArgumentParser) -> None: | ||
"""Add arguments to the parser. | ||
|
||
This can be used to add arguments that are specific to the job. | ||
""" | ||
|
||
@staticmethod | ||
@abstractmethod | ||
def get_iterator(args: Namespace) -> Iterator: | ||
"""Return an iterator based on the arguments. | ||
|
||
This can be used to generate the configurations that will be passed to run. | ||
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
"""Benchmarking.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from .benchmark import BenchmarkJob | ||
|
||
__all__ = ["BenchmarkJob"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
"""Benchmarking job.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from argparse import SUPPRESS | ||
from collections.abc import Iterator | ||
from itertools import product | ||
from pathlib import Path | ||
from tempfile import TemporaryDirectory | ||
from typing import Any | ||
|
||
import pandas as pd | ||
from jsonargparse import ArgumentParser, Namespace | ||
from jsonargparse._optionals import get_doc_short_description | ||
from lightning import seed_everything | ||
from rich.console import Console | ||
from rich.table import Table | ||
|
||
from anomalib.data import AnomalibDataModule, get_datamodule | ||
from anomalib.engine import Engine | ||
from anomalib.models import AnomalyModule, get_model | ||
from anomalib.pipelines.jobs.base import Job | ||
from anomalib.pipelines.utils import hide_output | ||
|
||
from .utils import _GridSearchAction, convert_to_tuple, dict_from_namespace, flatten_dict, to_nested_dict | ||
|
||
|
||
class BenchmarkJob(Job): | ||
"""Benchmarking job.""" | ||
|
||
name = "benchmark" | ||
|
||
def __init__(self, accelerator: str) -> None: | ||
super().__init__() | ||
self.accelerator = accelerator | ||
|
||
@hide_output | ||
def run( | ||
self, | ||
model: AnomalyModule, | ||
datamodule: AnomalibDataModule, | ||
seed: int, | ||
task_id: int | None = None, | ||
) -> dict[str, Any]: | ||
"""Run the benchmark.""" | ||
devices: str | list[int] = "auto" | ||
if task_id is not None: | ||
devices = [task_id] | ||
with TemporaryDirectory() as temp_dir: | ||
seed_everything(seed) | ||
engine = Engine( | ||
accelerator=self.accelerator, | ||
devices=devices, | ||
default_root_dir=temp_dir, | ||
) | ||
engine.fit(model, datamodule) | ||
test_results = engine.test(model, datamodule) | ||
output = { | ||
"seed": seed, | ||
"model": model.__class__.__name__, | ||
"data": datamodule.__class__.__name__, | ||
"category": datamodule.category, | ||
**test_results[0], | ||
} | ||
self.logger.info(f"Completed with result {output}") | ||
return output | ||
|
||
def on_collect(self, results: list[dict[str, Any]]) -> pd.DataFrame: | ||
"""Gather the results returned from run.""" | ||
output: dict[str, Any] = {} | ||
for key in results[0]: | ||
output[key] = [] | ||
for result in results: | ||
for key, value in result.items(): | ||
output[key].append(value) | ||
result = pd.DataFrame(output) | ||
self._print_tabular_results(result) | ||
return result | ||
|
||
def on_save(self, result: pd.DataFrame) -> None: | ||
"""Save the result to a csv file.""" | ||
file_path = Path("runs") / self.accelerator / self.name / "results.csv" | ||
file_path.parent.mkdir(parents=True, exist_ok=True) | ||
result.to_csv(file_path, index=False) | ||
self.logger.info(f"Saved results to {file_path}") | ||
|
||
def _print_tabular_results(self, gathered_result: pd.DataFrame) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this be used anywhere else? If so, maybe we could move this to a util function to keep this class cleaner? |
||
"""Print the tabular results.""" | ||
if gathered_result is not None: | ||
console = Console() | ||
table = Table(title=f"{self.name} Results", show_header=True, header_style="bold magenta") | ||
_results = gathered_result.to_dict("list") | ||
for column in _results: | ||
table.add_column(column) | ||
for row in zip(*_results.values(), strict=False): | ||
table.add_row(*[str(value) for value in row]) | ||
console.print(table) | ||
|
||
@staticmethod | ||
def add_arguments(parser: ArgumentParser) -> None: | ||
"""Add job specific arguments to the parser.""" | ||
group = parser.add_argument_group("Benchmark job specific arguments.") | ||
group.add_argument( | ||
f"--{BenchmarkJob.name}.seed", | ||
type=int | dict[str, list[int]], | ||
default=42, | ||
help="Seed for reproducibility.", | ||
) | ||
BenchmarkJob._add_subclass_arguments(group, AnomalyModule, f"{BenchmarkJob.name}.model") | ||
BenchmarkJob._add_subclass_arguments(group, AnomalibDataModule, f"{BenchmarkJob.name}.data") | ||
|
||
@staticmethod | ||
def get_iterator(args: Namespace) -> Iterator: | ||
"""Return iterator based on the arguments.""" | ||
container = { | ||
"seed": args.seed, | ||
"data": dict_from_namespace(args.data), | ||
"model": dict_from_namespace(args.model), | ||
} | ||
# extract all grid keys and return cross product of all grid values | ||
container = flatten_dict(container) | ||
grid_dict = {key: value for key, value in container.items() if "grid" in key} | ||
container = {key: value for key, value in container.items() if key not in grid_dict} | ||
combinations = list(product(*convert_to_tuple(grid_dict.values()))) | ||
for combination in combinations: | ||
_container = container.copy() | ||
for key, value in zip(grid_dict.keys(), combination, strict=True): | ||
_container[key.removesuffix(".grid")] = value | ||
_container = to_nested_dict(_container) | ||
yield { | ||
"seed": _container["seed"], | ||
"model": get_model(_container["model"]), | ||
"datamodule": get_datamodule(_container["data"]), | ||
} | ||
|
||
@staticmethod | ||
def _add_subclass_arguments(parser: ArgumentParser, baseclass: type, key: str) -> None: | ||
"""Adds the subclass of the provided class to the parser under nested_key.""" | ||
doc_group = get_doc_short_description(baseclass, logger=parser.logger) | ||
group = parser._create_group_if_requested( # noqa: SLF001 | ||
baseclass, | ||
nested_key=key, | ||
as_group=True, | ||
doc_group=doc_group, | ||
config_load=False, | ||
instantiate=False, | ||
) | ||
|
||
with _GridSearchAction.allow_default_instance_context(): | ||
action = group.add_argument( | ||
f"--{key}", | ||
metavar="CONFIG | CLASS_PATH_OR_NAME | .INIT_ARG_NAME VALUE", | ||
help=( | ||
'One or more arguments specifying "class_path"' | ||
f' and "init_args" for any subclass of {baseclass.__name__}.' | ||
), | ||
default=SUPPRESS, | ||
action=_GridSearchAction(typehint=baseclass, enable_path=True, logger=parser.logger), | ||
) | ||
action.sub_add_kwargs = {"fail_untyped": True, "sub_configs": True, "instantiate": True} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which approach would be better?
anomalib pipeline benchmark --arg1 val1 --arg2 val2
or
anomalib benchmark
--arg1 val --arg2 val2`Just setting the stage here for discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer
anomalib benchmark ...
but if someone implements a custom pipeline then I feel they should be able to run it without making changes to the cli. In this case they might have to useanomalib pipeline cutom_pipeline
?