-
Notifications
You must be signed in to change notification settings - Fork 714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
π Anomalib Pipelines #2005
π Anomalib Pipelines #2005
Changes from 15 commits
a4778d7
29af8fd
246ca2b
051566a
0641e16
a9cb590
b4b981b
eaca521
d279e75
979934c
f45a0fd
6efaf42
b4db761
59a480d
c505236
8d64be8
997fb40
b7ef7dc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
"""Subcommand for pipelines.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from jsonargparse import ArgumentParser, Namespace | ||
|
||
from anomalib.utils.exceptions import try_import | ||
|
||
if try_import("anomalib.pipelines"): | ||
from anomalib.pipelines import Benchmark | ||
from anomalib.pipelines.components.base import Pipeline | ||
|
||
PIPELINE_REGISTRY: dict[str, Pipeline] | None = {"benchmark": Benchmark()} | ||
else: | ||
PIPELINE_REGISTRY = None | ||
|
||
|
||
def add_pipeline_subparsers(parser: ArgumentParser) -> None: | ||
"""Add subparsers for pipelines.""" | ||
if PIPELINE_REGISTRY is not None: | ||
subcommands = parser.add_subcommands(dest="subcommand", help="Run Pipelines", required=True) | ||
for name, pipeline in PIPELINE_REGISTRY.items(): | ||
subcommands.add_subcommand(name, pipeline.get_parser(), help=f"Run {name} pipeline") | ||
|
||
|
||
def run_pipeline(args: Namespace) -> None: | ||
samet-akcay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Run pipeline.""" | ||
if PIPELINE_REGISTRY is not None: | ||
config = args.pipeline[args.pipeline.subcommand] | ||
PIPELINE_REGISTRY[args.pipeline.subcommand].run(config) | ||
else: | ||
msg = "Pipeline is not available" | ||
raise ValueError(msg) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
"""Pipelines for end-to-end usecases.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from .benchmark import Benchmark | ||
|
||
__all__ = ["Benchmark"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
"""Benchmarking.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from .pipeline import Benchmark | ||
|
||
__all__ = ["Benchmark"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
"""Benchmark job generator.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
ashwinvaidya17 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
from argparse import SUPPRESS | ||
from collections.abc import Generator | ||
|
||
from jsonargparse import ArgumentParser, Namespace | ||
from jsonargparse._optionals import get_doc_short_description | ||
|
||
from anomalib.data import AnomalibDataModule, get_datamodule | ||
from anomalib.models import AnomalyModule, get_model | ||
from anomalib.pipelines.components import JobGenerator | ||
from anomalib.pipelines.components.actions import GridSearchAction, get_iterator_from_grid_dict | ||
from anomalib.utils.config import dict_from_namespace | ||
from anomalib.utils.logging import hide_output | ||
|
||
from .job import BenchmarkJob | ||
|
||
|
||
class BenchmarkJobGenerator(JobGenerator): | ||
"""Generate BenchmarkJob.""" | ||
|
||
def __init__(self, accelerator: str) -> None: | ||
self.accelerator = accelerator | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure about the terminology here. We use this variable mainly to distinguish between cpu and gpu, but I'm not sure if cpu is technically considered to be an accelerator. Maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Originally this was called device. I think we discussed on changing this to accelerator to be inline with lightning's terminology. I have no preference here. So, I can rename this once we finalise the name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Lightning, accelerator seems to be also used for CPUs |
||
|
||
@property | ||
def job_class(self) -> type: | ||
"""Return the job class.""" | ||
return BenchmarkJob | ||
|
||
@staticmethod | ||
def add_arguments(parser: ArgumentParser) -> None: | ||
"""Add job specific arguments to the parser.""" | ||
group = parser.add_argument_group("Benchmark job specific arguments.") | ||
group.add_argument( | ||
f"--{BenchmarkJob.name}.seed", | ||
type=int | dict[str, list[int]], | ||
default=42, | ||
help="Seed for reproducibility.", | ||
) | ||
BenchmarkJobGenerator._add_subclass_arguments(group, AnomalyModule, f"{BenchmarkJob.name}.model") | ||
BenchmarkJobGenerator._add_subclass_arguments(group, AnomalibDataModule, f"{BenchmarkJob.name}.data") | ||
ashwinvaidya17 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@hide_output | ||
def generate_jobs(self, args: Namespace) -> Generator[BenchmarkJob, None, None]: | ||
"""Return iterator based on the arguments.""" | ||
container = { | ||
"seed": args.seed, | ||
"data": dict_from_namespace(args.data), | ||
"model": dict_from_namespace(args.model), | ||
} | ||
for _container in get_iterator_from_grid_dict(container): | ||
yield BenchmarkJob( | ||
accelerator=self.accelerator, | ||
seed=_container["seed"], | ||
model=get_model(_container["model"]), | ||
datamodule=get_datamodule(_container["data"]), | ||
) | ||
|
||
@staticmethod | ||
def _add_subclass_arguments(parser: ArgumentParser, base_class: type, key: str) -> None: | ||
"""Adds the subclass of the provided class to the parser under nested_key.""" | ||
doc_group = get_doc_short_description(base_class, logger=parser.logger) | ||
group = parser._create_group_if_requested( # noqa: SLF001 | ||
base_class, | ||
nested_key=key, | ||
as_group=True, | ||
doc_group=doc_group, | ||
config_load=False, | ||
instantiate=False, | ||
) | ||
|
||
with GridSearchAction.allow_default_instance_context(): | ||
action = group.add_argument( | ||
f"--{key}", | ||
metavar="CONFIG | CLASS_PATH_OR_NAME | .INIT_ARG_NAME VALUE", | ||
help=( | ||
'One or more arguments specifying "class_path"' | ||
f' and "init_args" for any subclass of {base_class.__name__}.' | ||
), | ||
default=SUPPRESS, | ||
action=GridSearchAction(typehint=base_class, enable_path=True, logger=parser.logger), | ||
) | ||
action.sub_add_kwargs = {"fail_untyped": True, "sub_configs": True, "instantiate": True} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
"""Benchmarking job.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import logging | ||
from datetime import datetime | ||
from pathlib import Path | ||
from tempfile import TemporaryDirectory | ||
from typing import Any | ||
|
||
import pandas as pd | ||
from lightning import seed_everything | ||
from rich.console import Console | ||
from rich.table import Table | ||
|
||
from anomalib.data import AnomalibDataModule | ||
from anomalib.engine import Engine | ||
from anomalib.models import AnomalyModule | ||
from anomalib.pipelines.components import Job | ||
from anomalib.utils.logging import hide_output | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class BenchmarkJob(Job): | ||
"""Benchmarking job.""" | ||
|
||
name = "benchmark" | ||
|
||
def __init__(self, accelerator: str, model: AnomalyModule, datamodule: AnomalibDataModule, seed: int) -> None: | ||
super().__init__() | ||
self.accelerator = accelerator | ||
self.model = model | ||
self.datamodule = datamodule | ||
self.seed = seed | ||
|
||
@hide_output | ||
def run( | ||
self, | ||
task_id: int | None = None, | ||
) -> dict[str, Any]: | ||
"""Run the benchmark.""" | ||
devices: str | list[int] = "auto" | ||
if task_id is not None: | ||
samet-akcay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
devices = [task_id] | ||
logger.info(f"Running job {self.model.__class__.__name__} with device {task_id}") | ||
with TemporaryDirectory() as temp_dir: | ||
seed_everything(self.seed) | ||
samet-akcay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
engine = Engine( | ||
accelerator=self.accelerator, | ||
devices=devices, | ||
default_root_dir=temp_dir, | ||
) | ||
engine.fit(self.model, self.datamodule) | ||
test_results = engine.test(self.model, self.datamodule) | ||
output = { | ||
"seed": self.seed, | ||
"accelerator": self.accelerator, | ||
"model": self.model.__class__.__name__, | ||
"data": self.datamodule.__class__.__name__, | ||
"category": self.datamodule.category, | ||
**test_results[0], | ||
} | ||
logger.info(f"Completed with result {output}") | ||
return output | ||
|
||
@staticmethod | ||
def collect(results: list[dict[str, Any]]) -> pd.DataFrame: | ||
"""Gather the results returned from run.""" | ||
output: dict[str, Any] = {} | ||
for key in results[0]: | ||
output[key] = [] | ||
for result in results: | ||
for key, value in result.items(): | ||
output[key].append(value) | ||
return pd.DataFrame(output) | ||
|
||
@staticmethod | ||
def save(result: pd.DataFrame) -> None: | ||
"""Save the result to a csv file.""" | ||
BenchmarkJob._print_tabular_results(result) | ||
file_path = Path("runs") / BenchmarkJob.name / datetime.now().strftime("%Y-%m-%d-%H:%M:%S") / "results.csv" | ||
file_path.parent.mkdir(parents=True, exist_ok=True) | ||
result.to_csv(file_path, index=False) | ||
logger.info(f"Saved results to {file_path}") | ||
|
||
@staticmethod | ||
def _print_tabular_results(gathered_result: pd.DataFrame) -> None: | ||
samet-akcay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""Print the tabular results.""" | ||
if gathered_result is not None: | ||
console = Console() | ||
table = Table(title=f"{BenchmarkJob.name} Results", show_header=True, header_style="bold magenta") | ||
_results = gathered_result.to_dict("list") | ||
for column in _results: | ||
table.add_column(column) | ||
for row in zip(*_results.values(), strict=False): | ||
table.add_row(*[str(value) for value in row]) | ||
console.print(table) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
"""Benchmarking.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import torch | ||
from jsonargparse import ArgumentParser, Namespace | ||
|
||
from anomalib.pipelines.components.base import Pipeline, Runner | ||
from anomalib.pipelines.components.runners import ParallelRunner, SerialRunner | ||
|
||
from .generator import BenchmarkJobGenerator | ||
|
||
|
||
class Benchmark(Pipeline): | ||
"""Benchmarking pipeline.""" | ||
|
||
def _setup_runners(self, args: Namespace) -> list[Runner]: | ||
"""Setup the runners for the pipeline.""" | ||
accelerators = args.accelerator if isinstance(args.accelerator, list) else [args.accelerator] | ||
runners: list[Runner] = [] | ||
for accelerator in accelerators: | ||
if accelerator == "cpu": | ||
runners.append(SerialRunner(BenchmarkJobGenerator("cpu"))) | ||
elif accelerator == "cuda": | ||
runners.append(ParallelRunner(BenchmarkJobGenerator("cuda"), n_jobs=torch.cuda.device_count())) | ||
else: | ||
msg = f"Unsupported accelerator: {accelerator}" | ||
raise ValueError(msg) | ||
return runners | ||
|
||
def get_parser(self, parser: ArgumentParser | None = None) -> ArgumentParser: | ||
"""Add arguments to the parser.""" | ||
parser = super().get_parser(parser) | ||
parser.add_argument( | ||
"--accelerator", | ||
type=str | list[str], | ||
default="cuda", | ||
help="Hardware to run the benchmark on.", | ||
) | ||
BenchmarkJobGenerator.add_arguments(parser) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would an implementer always need to pass the parser to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They have to pass it manually each time as job specific arguments are added to the same parser. |
||
return parser |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
"""Utilities for the pipeline modules.""" | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
from .base import Job, JobGenerator, Pipeline, Runner | ||
|
||
__all__ = [ | ||
"Job", | ||
"JobGenerator", | ||
"Pipeline", | ||
"Runner", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which approach would be better?
anomalib pipeline benchmark --arg1 val1 --arg2 val2
or
anomalib benchmark
--arg1 val --arg2 val2`Just setting the stage here for discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer
anomalib benchmark ...
but if someone implements a custom pipeline then I feel they should be able to run it without making changes to the cli. In this case they might have to useanomalib pipeline cutom_pipeline
?