Skip to content

Commit

Permalink
Make Perf test available to load pervious Perf test to skip training …
Browse files Browse the repository at this point in the history
…stage (#3556)

* symlink to relative path

* skip training if prev perf result exists

* implement missing part

* align with pre-commit

* udpate test code

* fix typo

* change arg name to resume-from

* revert checkpoint symlink
  • Loading branch information
eunwoosh authored May 30, 2024
1 parent 447cd9c commit ec0f906
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 60 deletions.
2 changes: 1 addition & 1 deletion src/otx/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def update_latest(self, work_dir: Path) -> None:
cache_dir = latest_dir / self.subcommand
if cache_dir.exists():
cache_dir.unlink()
cache_dir.symlink_to(work_dir)
cache_dir.symlink_to(Path("..") / work_dir.relative_to(work_dir.parent))

def set_seed(self) -> None:
"""Set the random seed for reproducibility.
Expand Down
6 changes: 6 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ def pytest_addoption(parser: pytest.Parser):
"`pip install otx[full]@https://github.com/openvinotoolkit/training_extensions.git@{otx_ref}` will be executed before run, "
"and reverted after run. Works only for v2.x assuming CLI compatibility.",
)
parser.addoption(
"--resume-from",
type=str,
help="Previous performance test directory which contains execution results. "
"If training was already done in previous performance test, training is skipped and refer previous result.",
)
parser.addoption(
"--open-subprocess",
action="store_true",
Expand Down
175 changes: 116 additions & 59 deletions tests/perf/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

import gc
import logging
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from time import time
from typing import Any
from typing import Any, Literal

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -132,13 +133,17 @@ def run(
model: Model,
dataset: Dataset,
criteria: list[Criterion],
resume_from: Path | None = None,
) -> pd.DataFrame | None:
"""Run configured benchmark with given dataset and model and return the result.
Args:
model (Model): Target model settings
dataset (Dataset): Target dataset settings
criteria (list[Criterion]): Target criteria settings
resume_from(Path | None, optional):
Previous performance directory to load. If training was already done in previous performance test,
training is skipped and refer previous result.
Retruns:
pd.DataFrame | None: Table with benchmark metrics
Expand Down Expand Up @@ -168,6 +173,13 @@ def run(
tags["seed"] = str(seed)

# Train & test
copied_train_dir = None
if (
resume_from is not None
and (prev_train_dir := self._find_corresponding_dir(resume_from, tags)) is not None
):
copied_train_dir = self._copy_prev_train_dir(prev_train_dir, sub_work_dir)

command = [
"otx",
"train",
Expand All @@ -189,35 +201,27 @@ def run(
command.extend(["--deterministic", str(self.deterministic)])
if self.num_epoch > 0:
command.extend(["--max_epochs", str(self.num_epoch)])
start_time = time()
self._run_command(command)
extra_metrics = {"train/e2e_time": time() - start_time}
self._rename_raw_data(
work_dir=sub_work_dir / ".latest" / "train",
replaces={"train_": "train/", "{pre}": "train/"},
)
extra_metrics = {}
if copied_train_dir is not None:
command.append("--print_config")
with (copied_train_dir / "configs.yaml").open("w") as f:
self._run_command(command, stdout=f) # replace previuos configs.yaml to new one
else:
start_time = time()
self._run_command(command)
extra_metrics["train/e2e_time"] = time() - start_time
self._rename_raw_data(
work_dir=sub_work_dir / ".latest" / "train",
replaces={"train_": "train/", "{pre}": "train/"},
)
self._log_metrics(
work_dir=sub_work_dir / ".latest" / "train",
tags=tags,
criteria=criteria,
extra_metrics=extra_metrics,
)

command = [
"otx",
"test",
"--work_dir",
str(sub_work_dir),
]
for key, value in dataset.extra_overrides.get("test", {}).items():
command.append(f"--{key}")
command.append(str(value))
self._run_command(command)
self._rename_raw_data(
work_dir=sub_work_dir / ".latest" / "test",
replaces={"test_": "test/", "{pre}": "test/"},
)
self._log_metrics(work_dir=sub_work_dir / ".latest" / "test", tags=tags, criteria=criteria)
self._run_test(sub_work_dir, dataset, tags, criteria, what2test="train")

# Export & test
if self.eval_upto in ["export", "optimize"]:
Expand All @@ -236,24 +240,14 @@ def run(
if not exported_model_path.exists():
exported_model_path = sub_work_dir / ".latest" / "export" / "exported_model_decoder.xml"

command = [ # NOTE: not working for h_label_cls. to be fixed
"otx",
"test",
"--checkpoint",
str(exported_model_path),
"--work_dir",
str(sub_work_dir),
]
for key, value in dataset.extra_overrides.get("test", {}).items():
command.append(f"--{key}")
command.append(str(value))
self._run_command(command)

self._rename_raw_data(
work_dir=sub_work_dir / ".latest" / "test",
replaces={"test": "export", "{pre}": "export/"},
self._run_test(
sub_work_dir,
dataset,
tags,
criteria,
checkpoint=exported_model_path,
what2test="export",
)
self._log_metrics(work_dir=sub_work_dir / ".latest" / "test", tags=tags, criteria=criteria)

# Optimize & test
if self.eval_upto == "optimize":
Expand All @@ -274,24 +268,14 @@ def run(
if not optimized_model_path.exists():
optimized_model_path = sub_work_dir / ".latest" / "optimize" / "optimized_model_decoder.xml"

command = [
"otx",
"test",
"--checkpoint",
str(optimized_model_path),
"--work_dir",
str(sub_work_dir),
]
for key, value in dataset.extra_overrides.get("test", {}).items():
command.append(f"--{key}")
command.append(str(value))
self._run_command(command)

self._rename_raw_data(
work_dir=sub_work_dir / ".latest" / "test",
replaces={"test": "optimize", "{pre}": "optimize/"},
self._run_test(
sub_work_dir,
dataset,
tags,
criteria,
checkpoint=optimized_model_path,
what2test="optimize",
)
self._log_metrics(work_dir=sub_work_dir / ".latest" / "test", tags=tags, criteria=criteria)

# Force memory clean up
gc.collect()
Expand All @@ -308,10 +292,83 @@ def run(
result = summary.average(result, keys=["task", "model", "data_group", "data"]) # Average out seeds
return result.set_index(["task", "model", "data_group", "data"])

def _run_command(self, command: list[str]) -> None:
def _find_corresponding_dir(self, resume_from: Path, tags: dict[str, str]) -> Path | None:
for csv_file in resume_from.rglob("benchmark.raw.csv"):
raw_data = pd.read_csv(csv_file)
if (
"train/epoch" in raw_data.columns # check it's csv of train result
and all( # check meta info is same
str(raw_data.iloc[0].get(key, "NOT_IN_CSV")) == tags.get(key, "NOT_IN_TAG")
for key in ["data_group", "data", "model", "task", "seed"]
)
):
return csv_file.parent
return None

def _copy_prev_train_dir(self, prev_train_dir: Path, work_dir: Path) -> Path:
work_dir.mkdir(parents=True, exist_ok=True)
new_train_dir = work_dir / prev_train_dir.name
shutil.copytree(prev_train_dir, new_train_dir, ignore_dangling_symlinks=True)
cache_dir = work_dir / ".latest" / "train"
cache_dir.parent.mkdir(exist_ok=True)
cache_dir.symlink_to(Path("..") / new_train_dir.relative_to(work_dir))

return new_train_dir

def _run_test(
self,
work_dir: Path | str,
dataset: Dataset,
tags: dict[str, str],
criteria: list[Criterion],
checkpoint: Path | str | None = None,
what2test: Literal["train", "export", "optimize"] = "train",
) -> None:
"""Run otx test and update result csv file to align it's indices to the current task."""
replace_map = {
"train": {"test_": "test/", "{pre}": "export/"},
"export": {"test": "export", "{pre}": "export/"},
"optimize": {"test": "optimize", "{pre}": "optimize/"},
}

command = [
"otx",
"test",
"--work_dir",
str(work_dir),
]
if checkpoint is not None:
command.extend(["--checkpoint", str(checkpoint)])
for key, value in dataset.extra_overrides.get("test", {}).items():
command.append(f"--{key}")
command.append(str(value))

start_time = time()
self._run_command(command)
extra_metrics = {f"test({what2test})/e2e_time": time() - start_time}

self._rename_raw_data(
work_dir=work_dir / ".latest" / "test",
replaces=replace_map[what2test],
)
self._log_metrics(
work_dir=work_dir / ".latest" / "test",
tags=tags,
criteria=criteria,
extra_metrics=extra_metrics,
)

def _run_command(self, command: list[str], **kwargs) -> None:
"""Run command using 'subprocess.run'.
Args:
command (list[str]): command to execute.
kwags: arguments to 'subprocess.run'.
"""
print(" ".join(command))
kwargs["check"] = True
if not self.dry_run:
subprocess.run(command, check=True) # noqa: S603
subprocess.run(command, **kwargs) # noqa: S603, PLW1510

def _log_metrics(
self,
Expand Down
12 changes: 12 additions & 0 deletions tests/perf/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,16 @@ def fxt_tags(fxt_user_name: str, fxt_version_tags: dict[str, str]) -> dict[str,
return tags


@pytest.fixture(scope="session")
def fxt_resume_from(request: pytest.FixtureRequest) -> Path | None:
resume_from = request.config.getoption("--resume-from")
if resume_from is not None:
resume_from = Path(resume_from)
msg = f"{resume_from = }"
log.info(msg)
return resume_from


@pytest.fixture()
def fxt_benchmark(
fxt_data_root: Path,
Expand Down Expand Up @@ -356,11 +366,13 @@ def _test_perf(
dataset: Benchmark.Dataset,
benchmark: Benchmark,
criteria: list[Benchmark.Criterion],
resume_from: Path | None,
) -> None:
result = benchmark.run(
model=model,
dataset=dataset,
criteria=criteria,
resume_from=resume_from,
)
benchmark.check(
result=result,
Expand Down
10 changes: 10 additions & 0 deletions tests/perf/test_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class TestPerfActionClassification(PerfTestBase):
Benchmark.Criterion(name="test/iter_time", summary="mean", compare="<", margin=0.1),
Benchmark.Criterion(name="export/iter_time", summary="mean", compare="<", margin=0.1),
Benchmark.Criterion(name="optimize/iter_time", summary="mean", compare="<", margin=0.1),
Benchmark.Criterion(name="test(train)/e2e_time", summary="max", compare=">", margin=0.1),
Benchmark.Criterion(name="test(export)/e2e_time", summary="max", compare=">", margin=0.1),
Benchmark.Criterion(name="test(optimize)/e2e_time", summary="max", compare=">", margin=0.1),
]

@pytest.mark.parametrize(
Expand All @@ -89,12 +92,14 @@ def test_perf(
fxt_model: Benchmark.Model,
fxt_dataset: Benchmark.Dataset,
fxt_benchmark: Benchmark,
fxt_resume_from: Path | None,
):
self._test_perf(
model=fxt_model,
dataset=fxt_dataset,
benchmark=fxt_benchmark,
criteria=self.BENCHMARK_CRITERIA,
resume_from=fxt_resume_from,
)


Expand Down Expand Up @@ -154,6 +159,9 @@ class TestPerfActionDetection(PerfTestBase):
Benchmark.Criterion(name="test/iter_time", summary="mean", compare="<", margin=0.1),
Benchmark.Criterion(name="export/iter_time", summary="mean", compare="<", margin=0.1),
Benchmark.Criterion(name="optimize/iter_time", summary="mean", compare="<", margin=0.1),
Benchmark.Criterion(name="test(train)/e2e_time", summary="max", compare=">", margin=0.1),
Benchmark.Criterion(name="test(export)/e2e_time", summary="max", compare=">", margin=0.1),
Benchmark.Criterion(name="test(optimize)/e2e_time", summary="max", compare=">", margin=0.1),
]

@pytest.mark.parametrize(
Expand All @@ -173,10 +181,12 @@ def test_perf(
fxt_model: Benchmark.Model,
fxt_dataset: Benchmark.Dataset,
fxt_benchmark: Benchmark,
fxt_resume_from: Path | None,
):
self._test_perf(
model=fxt_model,
dataset=fxt_dataset,
benchmark=fxt_benchmark,
criteria=self.BENCHMARK_CRITERIA,
resume_from=fxt_resume_from,
)
Loading

0 comments on commit ec0f906

Please sign in to comment.