Skip to content

Commit

Permalink
Fix the incorrect progress information when AdaptiveRepeatDataHook is…
Browse files Browse the repository at this point in the history
… enabled (#2510)

* Fix hook's ordering issue. AdaptiveRepeatHook changes the runner.max_iters before the ProgressHook

* Change the expression

* Fix typo

* Fix multi-label, h-label issue

* Make black happy

* Fix precommit

* Fix auto_bs issue

* Apply suggestions from code review

Co-authored-by: Eunwoo Shin <eunwoo.shin@intel.com>

* Reflecting reviews

* Refactor the name of get_data_cfg

* Revert adaptive hook sampler init

* Refactor the function name: get_data_cfg -> get_subset_data_cfg

* Fix unit test errors

* Fix black

* Remove useless line

* Remove adding AdaptiveRepeatDataHook for autobs

* Remove unused import

---------

Co-authored-by: Eunwoo Shin <eunwoo.shin@intel.com>
  • Loading branch information
sungmanc and eunwoosh authored Oct 4, 2023
1 parent f5dc66a commit 0e82e1e
Show file tree
Hide file tree
Showing 19 changed files with 171 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,6 @@ def get_sampler_type(self, cfg):
sampler_type = "cls_incr"
return sampler_type

def use_adaptive_repeat(self, cfg) -> bool:
"""Return whether using adaptive repeat.
Currently, only multi class classification supports adaptive repeat.
"""
return self._is_multiclass(cfg)

@staticmethod
def _is_multiclass(cfg) -> bool:
return not cfg.model.get("multilabel", False) and not cfg.model.get("hierarchical", False)
Expand Down
8 changes: 0 additions & 8 deletions src/otx/algorithms/common/adapters/mmcv/clsincr_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def configure_task_adapt_hook(self, cfg):
sampler_flag=sampler_flag,
sampler_type=self.get_sampler_type(cfg),
efficient_mode=cfg["task_adapt"].get("efficient_mode", False),
use_adaptive_repeat=self.use_adaptive_repeat(cfg),
priority="NORMAL",
),
)
Expand All @@ -50,10 +49,3 @@ def is_incremental(self) -> bool:
def get_sampler_type(self, cfg) -> str:
"""Return sampler type."""
return "cls_incr"

def use_adaptive_repeat(self, cfg) -> bool:
"""Return whether using adaptive repeat.
Currently, only multi class classification supports adaptive repeat.
"""
return False
29 changes: 21 additions & 8 deletions src/otx/algorithms/common/adapters/mmcv/configurer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
patch_adaptive_interval_training,
patch_early_stopping,
patch_persistent_workers,
remove_from_configs_by_type,
)
from otx.algorithms.common.adapters.mmcv.utils.config_utils import (
InputSizeManager,
Expand Down Expand Up @@ -111,8 +112,8 @@ def merge_configs(self, cfg, data_cfg, data_pipeline_path, hyperparams_from_otx,
if data_cfg:
for subset in data_cfg.data:
if subset in cfg.data:
src_data_cfg = self.get_data_cfg(cfg, subset)
new_data_cfg = self.get_data_cfg(data_cfg, subset)
src_data_cfg = self.get_subset_data_cfg(cfg, subset)
new_data_cfg = self.get_subset_data_cfg(data_cfg, subset)
for key in new_data_cfg:
src_data_cfg[key] = new_data_cfg[key]
else:
Expand Down Expand Up @@ -198,13 +199,12 @@ def configure_samples_per_gpu(
samples_per_gpu can be changed if it is larger than length of datset
"""

for subset in subsets:
if cfg.data.get(subset, None):
dataloader_cfg = cfg.data.get(f"{subset}_dataloader", ConfigDict())
samples_per_gpu = dataloader_cfg.get("samples_per_gpu", cfg.data.get("samples_per_gpu", 1))

data_cfg = self.get_data_cfg(cfg, subset)
data_cfg = self.get_subset_data_cfg(cfg, subset)
if data_cfg.get("otx_dataset") is not None:
dataset_len = len(data_cfg.otx_dataset)

Expand Down Expand Up @@ -269,7 +269,7 @@ def configure_model(self, cfg, data_classes, model_classes, ir_options, **kwargs
self.model_classes = model_classes
self.data_classes = data_classes
if data_classes is not None:
train_data_cfg = self.get_data_cfg(cfg, "train")
train_data_cfg = self.get_subset_data_cfg(cfg, "train")
train_data_cfg["data_classes"] = data_classes
new_classes = np.setdiff1d(data_classes, model_classes).tolist()
train_data_cfg["new_classes"] = new_classes
Expand Down Expand Up @@ -413,6 +413,19 @@ def configure_hooks(
if hasattr(cfg, "algo_backend"):
self._update_caching_modules(cfg)

# Update adaptive repeat
if not self.training:
remove_from_configs_by_type(cfg.custom_hooks, "AdaptiveRepeatDataHook")
return
for custom_hook in cfg.custom_hooks:
if custom_hook["type"] == "AdaptiveRepeatDataHook":
data_cfg = cfg.get("data", {})
bs = data_cfg.get("train_dataloader", {}).get("samples_per_gpu", None)
bs = bs if bs is not None else data_cfg.get("samples_per_gpu", 0)
custom_hook["train_batch_size"] = bs
custom_hook["train_data_size"] = len(data_cfg.get("train", {}).get("otx_dataset", []))
break

@staticmethod
def _update_caching_modules(cfg: Config) -> None:
def _find_max_num_workers(cfg: dict):
Expand Down Expand Up @@ -478,15 +491,15 @@ def get_model_meta(cfg):
def get_data_classes(self, cfg):
"""Get data classes from train cfg."""
data_classes = []
train_cfg = self.get_data_cfg(cfg, "train")
train_cfg = self.get_subset_data_cfg(cfg, "train")
if "data_classes" in train_cfg:
data_classes = list(train_cfg.pop("data_classes", []))
elif "classes" in train_cfg:
data_classes = list(train_cfg.classes)
return data_classes

@staticmethod
def get_data_cfg(cfg, subset):
def get_subset_data_cfg(cfg, subset):
"""Get subset's data cfg."""
assert subset in ["train", "val", "test", "unlabeled"], f"Unknown subset:{subset}"
if "dataset" in cfg.data[subset]: # Concat|RepeatDataset
Expand All @@ -512,7 +525,7 @@ def adapt_input_size_to_dataset(
Tuple[int, int]: (width, height) or None
"""

data_cfg = BaseConfigurer.get_data_cfg(cfg, "train")
data_cfg = BaseConfigurer.get_subset_data_cfg(cfg, "train")
dataset = data_cfg.get("otx_dataset", None)
if dataset is None:
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from mmcv.runner import HOOKS, Hook, get_dist_info
from torch.utils.data import DataLoader

from otx.algorithms.common.adapters.mmcv.utils.config_utils import get_proper_repeat_times
from otx.algorithms.common.adapters.torch.dataloaders.samplers import OTXSampler
from otx.algorithms.common.utils.logger import get_logger

Expand All @@ -17,38 +18,61 @@ class AdaptiveRepeatDataHook(Hook):
"""Hook that adaptively repeats the dataset to control the number of iterations.
Args:
train_batch_size (int) : The batch size of the train dataloader
train_data_size (int) : The number of the training dataset
coef (float, optional) : coefficient that effects to number of repeats
(coef * math.sqrt(num_iters-1)) +5
min_repeat (float, optional) : minimum repeats
"""

def __init__(self, coef: float = -0.7, min_repeat: float = 1.0):
def __init__(self, train_batch_size: int, train_data_size: int, coef: float = -0.7, min_repeat: float = 1.0):
self.coef = coef
self.min_repeat = min_repeat

self.train_batch_size = train_batch_size
self.train_data_size = train_data_size

self.n_repeats = get_proper_repeat_times(
self.train_data_size, self.train_batch_size, self.coef, self.min_repeat
)
self.rank, self.world_size = get_dist_info()

def before_run(self, runner):
"""Change the runner's max_iter."""
if self.n_repeats > 1:
iter_per_epoch = int(self.train_data_size / self.train_batch_size)

logger.info("Adaptive repeat is enabled")
logger.info(f"- Repeat times: {self.n_repeats}")
logger.info(f"- Batch size: {self.train_batch_size}")
logger.info(f"- Num iters per epoch: {iter_per_epoch} -> {iter_per_epoch * self.n_repeats}")
logger.info(f"- Total iters: {runner.max_iters} -> {runner.max_iters * self.n_repeats}")

# FIXME, although runner._max_iters is the protected attribute,
# There is no way to control the max_iters of runner.
runner._max_iters = int(runner.max_iters * self.n_repeats)

def before_epoch(self, runner):
"""Convert to OTX Sampler."""
dataset = runner.data_loader.dataset
batch_size = runner.data_loader.batch_size
num_workers = runner.data_loader.num_workers
collate_fn = runner.data_loader.collate_fn
worker_init_fn = runner.data_loader.worker_init_fn
rank, world_size = get_dist_info()

sampler = OTXSampler(
dataset=dataset,
samples_per_gpu=batch_size,
use_adaptive_repeats=True,
num_replicas=world_size,
rank=rank,
samples_per_gpu=self.train_batch_size,
num_replicas=self.world_size,
rank=self.rank,
shuffle=True,
coef=self.coef,
min_repeat=self.min_repeat,
n_repeats=self.n_repeats,
)

runner.data_loader = DataLoader(
dataset,
batch_size=batch_size,
batch_size=self.train_batch_size,
sampler=sampler,
num_workers=num_workers,
collate_fn=collate_fn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,18 @@ def __init__(
sampler_flag=False,
sampler_type="cls_incr",
efficient_mode=False,
use_adaptive_repeat=False,
):
self.src_classes = src_classes
self.dst_classes = dst_classes
self.model_type = model_type
self.sampler_flag = sampler_flag
self.sampler_type = sampler_type
self.efficient_mode = efficient_mode
self.use_adaptive_repeat = use_adaptive_repeat

logger.info(f"Task Adaptation: {self.src_classes} => {self.dst_classes}")
logger.info(f"- Efficient Mode: {self.efficient_mode}")
logger.info(f"- Sampler type: {self.sampler_type}")
logger.info(f"- Sampler flag: {self.sampler_flag}")
logger.info(f"- Adaptive repeat: {self.use_adaptive_repeat}")

def before_epoch(self, runner):
"""Produce a proper sampler for task-adaptation."""
Expand All @@ -68,7 +65,6 @@ def before_epoch(self, runner):
efficient_mode=self.efficient_mode,
num_replicas=world_size,
rank=rank,
use_adaptive_repeats=self.use_adaptive_repeat,
)
else:
sampler = ClsIncrSampler(
Expand All @@ -77,7 +73,6 @@ def before_epoch(self, runner):
efficient_mode=self.efficient_mode,
num_replicas=world_size,
rank=rank,
use_adaptive_repeats=self.use_adaptive_repeat,
)
runner.data_loader = DataLoader(
dataset,
Expand Down
2 changes: 0 additions & 2 deletions src/otx/algorithms/common/adapters/mmcv/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
InputSizeManager,
OTXConfig,
config_from_string,
get_data_cfg,
get_dataset_configs,
is_epoch_based_runner,
patch_adaptive_interval_training,
Expand Down Expand Up @@ -45,7 +44,6 @@
"patch_early_stopping",
"patch_persistent_workers",
"prepare_work_dir",
"get_data_cfg",
"OTXConfig",
"adapt_batch_size",
"InputSizeManager",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ def train_func_single_iter(batch_size):
)

default_bs = _get_batch_size(cfg)

bs_search_algo = BsSearchAlgo(
train_func=train_func_single_iter,
default_bs=default_bs,
Expand Down Expand Up @@ -126,6 +125,9 @@ def _set_batch_size(cfg, batch_size: int):
cfg.data.videos_per_gpu = batch_size
else:
cfg.data.train_dataloader["samples_per_gpu"] = batch_size
for custom_hook in cfg.custom_hooks:
if custom_hook["type"] == "AdaptiveRepeatDataHook":
custom_hook["train_batch_size"] = batch_size


def _set_max_epoch(cfg, max_epoch: int):
Expand Down
46 changes: 28 additions & 18 deletions src/otx/algorithms/common/adapters/mmcv/utils/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import copy
import glob
import math
import multiprocessing
import os
import os.path as osp
Expand Down Expand Up @@ -517,15 +518,11 @@ def patch_from_hyperparams(config: Config, hyperparams, **kwargs):
algo_backend = hyperparams.algo_backend
warmup_iters = int(params.learning_rate_warmup_iters)

model_label_type = config.filename.split("/")[-1]
if "multilabel" in model_label_type:
lr_config = ConfigDict(max_lr=params.learning_rate, warmup=None)
else:
lr_config = (
ConfigDict(warmup_iters=warmup_iters)
if warmup_iters > 0
else ConfigDict(warmup_iters=warmup_iters, warmup=None)
)
lr_config = (
ConfigDict(warmup_iters=warmup_iters)
if warmup_iters > 0
else ConfigDict(warmup_iters=warmup_iters, warmup=None)
)

if params.enable_early_stopping and config.get("evaluation", None):
early_stop = ConfigDict(
Expand Down Expand Up @@ -599,14 +596,6 @@ def prepare_work_dir(config: Union[Config, ConfigDict]) -> str:
return train_round_checkpoint_dir


def get_data_cfg(config: Union[Config, ConfigDict], subset: str = "train") -> Config:
"""Return dataset configs."""
data_cfg = config.data[subset]
while "dataset" in data_cfg:
data_cfg = data_cfg.dataset
return data_cfg


class InputSizeManager:
"""Class for changing input size and getting input size value by checking data pipeline.
Expand Down Expand Up @@ -899,7 +888,6 @@ def get_configured_input_size(

if input_size == InputSizePreset.DEFAULT.value:
return None

logger.info("Given model weight was trained with {} input size.".format(input_size))

else:
Expand Down Expand Up @@ -984,3 +972,25 @@ def area(x):
input_size = self.select_closest_size(input_size, input_size_preset)
logger.info(f"-> Closest preset: {input_size}")
return input_size


def get_proper_repeat_times(
data_size: int,
batch_size: int,
coef: float,
min_repeat: float,
) -> float:
"""Get proper repeat times for adaptive training.
Args:
data_size (int): The total number of the training dataset
batch_size (int): The batch size for the training data loader
coef (float) : coefficient that effects to number of repeats
(coef * math.sqrt(num_iters-1)) +5
min_repeat (float) : minimum repeats
"""
if data_size == 0 or batch_size == 0:
logger.info("Repeat dataset enabled, but not a train mode. repeat times set to 1.")
return 1
n_iters_per_epoch = math.ceil(data_size / batch_size)
return math.floor(max(coef * math.sqrt(n_iters_per_epoch - 1) + 5, min_repeat))
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#

import math
from typing import Union

import numpy as np
from torch.utils.data import Dataset
Expand Down Expand Up @@ -37,7 +38,7 @@ class BalancedSampler(OTXSampler): # pylint: disable=too-many-instance-attribut
tail of the data to make it evenly divisible across the number of
replicas. If ``False``, the sampler will add extra indices to make
the data evenly divisible across the replicas. Default: ``False``.
use_adaptive_repeats (bool, optional): Flag about using adaptive repeats
n_repeats (Union[float, int, str], optional) : number of iterations for manual setting
"""

def __init__(
Expand All @@ -48,14 +49,14 @@ def __init__(
num_replicas: int = 1,
rank: int = 0,
drop_last: bool = False,
use_adaptive_repeats: bool = False,
n_repeats: Union[float, int, str] = "auto",
):
self.samples_per_gpu = samples_per_gpu
self.num_replicas = num_replicas
self.rank = rank
self.drop_last = drop_last

super().__init__(dataset, samples_per_gpu, use_adaptive_repeats)
super().__init__(dataset, samples_per_gpu, n_repeats=n_repeats)

self.img_indices = self.dataset.img_indices # type: ignore[attr-defined]
self.num_cls = len(self.img_indices.keys())
Expand Down
Loading

0 comments on commit 0e82e1e

Please sign in to comment.