Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the incorrect progress information when AdaptiveRepeatDataHook is enabled #2510

Merged
merged 20 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,6 @@ def get_sampler_type(self, cfg):
sampler_type = "cls_incr"
return sampler_type

def use_adaptive_repeat(self, cfg) -> bool:
"""Return whether using adaptive repeat.

Currently, only multi class classification supports adaptive repeat.
"""
return self._is_multiclass(cfg)

@staticmethod
def _is_multiclass(cfg) -> bool:
return not cfg.model.get("multilabel", False) and not cfg.model.get("hierarchical", False)
Expand Down
8 changes: 0 additions & 8 deletions src/otx/algorithms/common/adapters/mmcv/clsincr_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def configure_task_adapt_hook(self, cfg):
sampler_flag=sampler_flag,
sampler_type=self.get_sampler_type(cfg),
efficient_mode=cfg["task_adapt"].get("efficient_mode", False),
use_adaptive_repeat=self.use_adaptive_repeat(cfg),
priority="NORMAL",
),
)
Expand All @@ -50,10 +49,3 @@ def is_incremental(self) -> bool:
def get_sampler_type(self, cfg) -> str:
"""Return sampler type."""
return "cls_incr"

def use_adaptive_repeat(self, cfg) -> bool:
"""Return whether using adaptive repeat.

Currently, only multi class classification supports adaptive repeat.
"""
return False
29 changes: 21 additions & 8 deletions src/otx/algorithms/common/adapters/mmcv/configurer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
patch_adaptive_interval_training,
patch_early_stopping,
patch_persistent_workers,
remove_from_configs_by_type,
)
from otx.algorithms.common.adapters.mmcv.utils.config_utils import (
InputSizeManager,
Expand Down Expand Up @@ -111,8 +112,8 @@ def merge_configs(self, cfg, data_cfg, data_pipeline_path, hyperparams_from_otx,
if data_cfg:
for subset in data_cfg.data:
if subset in cfg.data:
src_data_cfg = self.get_data_cfg(cfg, subset)
new_data_cfg = self.get_data_cfg(data_cfg, subset)
src_data_cfg = self.get_subset_data_cfg(cfg, subset)
new_data_cfg = self.get_subset_data_cfg(data_cfg, subset)
for key in new_data_cfg:
src_data_cfg[key] = new_data_cfg[key]
else:
Expand Down Expand Up @@ -198,13 +199,12 @@ def configure_samples_per_gpu(

samples_per_gpu can be changed if it is larger than length of datset
"""

for subset in subsets:
if cfg.data.get(subset, None):
dataloader_cfg = cfg.data.get(f"{subset}_dataloader", ConfigDict())
samples_per_gpu = dataloader_cfg.get("samples_per_gpu", cfg.data.get("samples_per_gpu", 1))

data_cfg = self.get_data_cfg(cfg, subset)
data_cfg = self.get_subset_data_cfg(cfg, subset)
if data_cfg.get("otx_dataset") is not None:
dataset_len = len(data_cfg.otx_dataset)

Expand Down Expand Up @@ -269,7 +269,7 @@ def configure_model(self, cfg, data_classes, model_classes, ir_options, **kwargs
self.model_classes = model_classes
self.data_classes = data_classes
if data_classes is not None:
train_data_cfg = self.get_data_cfg(cfg, "train")
train_data_cfg = self.get_subset_data_cfg(cfg, "train")
train_data_cfg["data_classes"] = data_classes
new_classes = np.setdiff1d(data_classes, model_classes).tolist()
train_data_cfg["new_classes"] = new_classes
Expand Down Expand Up @@ -413,6 +413,19 @@ def configure_hooks(
if hasattr(cfg, "algo_backend"):
self._update_caching_modules(cfg)

# Update adaptive repeat
if not self.training:
remove_from_configs_by_type(cfg.custom_hooks, "AdaptiveRepeatDataHook")
return
for custom_hook in cfg.custom_hooks:
if custom_hook["type"] == "AdaptiveRepeatDataHook":
data_cfg = cfg.get("data", {})
bs = data_cfg.get("train_dataloader", {}).get("samples_per_gpu", None)
bs = bs if bs is not None else data_cfg.get("samples_per_gpu", 0)
custom_hook["train_batch_size"] = bs
custom_hook["train_data_size"] = len(data_cfg.get("train", {}).get("otx_dataset", []))
break

@staticmethod
def _update_caching_modules(cfg: Config) -> None:
def _find_max_num_workers(cfg: dict):
Expand Down Expand Up @@ -478,15 +491,15 @@ def get_model_meta(cfg):
def get_data_classes(self, cfg):
"""Get data classes from train cfg."""
data_classes = []
train_cfg = self.get_data_cfg(cfg, "train")
train_cfg = self.get_subset_data_cfg(cfg, "train")
if "data_classes" in train_cfg:
data_classes = list(train_cfg.pop("data_classes", []))
elif "classes" in train_cfg:
data_classes = list(train_cfg.classes)
return data_classes

@staticmethod
def get_data_cfg(cfg, subset):
def get_subset_data_cfg(cfg, subset):
"""Get subset's data cfg."""
assert subset in ["train", "val", "test", "unlabeled"], f"Unknown subset:{subset}"
if "dataset" in cfg.data[subset]: # Concat|RepeatDataset
Expand All @@ -512,7 +525,7 @@ def adapt_input_size_to_dataset(
Tuple[int, int]: (width, height) or None
"""

data_cfg = BaseConfigurer.get_data_cfg(cfg, "train")
data_cfg = BaseConfigurer.get_subset_data_cfg(cfg, "train")
dataset = data_cfg.get("otx_dataset", None)
if dataset is None:
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from mmcv.runner import HOOKS, Hook, get_dist_info
from torch.utils.data import DataLoader

from otx.algorithms.common.adapters.mmcv.utils.config_utils import get_proper_repeat_times
from otx.algorithms.common.adapters.torch.dataloaders.samplers import OTXSampler
from otx.algorithms.common.utils.logger import get_logger

Expand All @@ -17,38 +18,61 @@ class AdaptiveRepeatDataHook(Hook):
"""Hook that adaptively repeats the dataset to control the number of iterations.

Args:
train_batch_size (int) : The batch size of the train dataloader
train_data_size (int) : The number of the training dataset
coef (float, optional) : coefficient that effects to number of repeats
(coef * math.sqrt(num_iters-1)) +5
min_repeat (float, optional) : minimum repeats
"""

def __init__(self, coef: float = -0.7, min_repeat: float = 1.0):
def __init__(self, train_batch_size: int, train_data_size: int, coef: float = -0.7, min_repeat: float = 1.0):
self.coef = coef
self.min_repeat = min_repeat

self.train_batch_size = train_batch_size
self.train_data_size = train_data_size

self.n_repeats = get_proper_repeat_times(
self.train_data_size, self.train_batch_size, self.coef, self.min_repeat
)
self.rank, self.world_size = get_dist_info()

def before_run(self, runner):
"""Change the runner's max_iter."""
if self.n_repeats > 1:
iter_per_epoch = int(self.train_data_size / self.train_batch_size)

logger.info("Adaptive repeat is enabled")
logger.info(f"- Repeat times: {self.n_repeats}")
logger.info(f"- Batch size: {self.train_batch_size}")
logger.info(f"- Num iters per epoch: {iter_per_epoch} -> {iter_per_epoch * self.n_repeats}")
logger.info(f"- Total iters: {runner.max_iters} -> {runner.max_iters * self.n_repeats}")

# FIXME, although runner._max_iters is the protected attribute,
# There is no way to control the max_iters of runner.
runner._max_iters = int(runner.max_iters * self.n_repeats)
eunwoosh marked this conversation as resolved.
Show resolved Hide resolved

def before_epoch(self, runner):
sungmanc marked this conversation as resolved.
Show resolved Hide resolved
"""Convert to OTX Sampler."""
dataset = runner.data_loader.dataset
batch_size = runner.data_loader.batch_size
num_workers = runner.data_loader.num_workers
collate_fn = runner.data_loader.collate_fn
worker_init_fn = runner.data_loader.worker_init_fn
rank, world_size = get_dist_info()

sampler = OTXSampler(
dataset=dataset,
samples_per_gpu=batch_size,
use_adaptive_repeats=True,
num_replicas=world_size,
rank=rank,
samples_per_gpu=self.train_batch_size,
num_replicas=self.world_size,
rank=self.rank,
shuffle=True,
coef=self.coef,
min_repeat=self.min_repeat,
n_repeats=self.n_repeats,
)

runner.data_loader = DataLoader(
dataset,
batch_size=batch_size,
batch_size=self.train_batch_size,
sampler=sampler,
num_workers=num_workers,
collate_fn=collate_fn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,18 @@ def __init__(
sampler_flag=False,
sampler_type="cls_incr",
efficient_mode=False,
use_adaptive_repeat=False,
):
self.src_classes = src_classes
self.dst_classes = dst_classes
self.model_type = model_type
self.sampler_flag = sampler_flag
self.sampler_type = sampler_type
self.efficient_mode = efficient_mode
self.use_adaptive_repeat = use_adaptive_repeat

logger.info(f"Task Adaptation: {self.src_classes} => {self.dst_classes}")
logger.info(f"- Efficient Mode: {self.efficient_mode}")
logger.info(f"- Sampler type: {self.sampler_type}")
logger.info(f"- Sampler flag: {self.sampler_flag}")
logger.info(f"- Adaptive repeat: {self.use_adaptive_repeat}")

def before_epoch(self, runner):
"""Produce a proper sampler for task-adaptation."""
Expand All @@ -68,7 +65,6 @@ def before_epoch(self, runner):
efficient_mode=self.efficient_mode,
num_replicas=world_size,
rank=rank,
use_adaptive_repeats=self.use_adaptive_repeat,
)
else:
sampler = ClsIncrSampler(
Expand All @@ -77,7 +73,6 @@ def before_epoch(self, runner):
efficient_mode=self.efficient_mode,
num_replicas=world_size,
rank=rank,
use_adaptive_repeats=self.use_adaptive_repeat,
)
runner.data_loader = DataLoader(
dataset,
Expand Down
2 changes: 0 additions & 2 deletions src/otx/algorithms/common/adapters/mmcv/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
InputSizeManager,
OTXConfig,
config_from_string,
get_data_cfg,
get_dataset_configs,
is_epoch_based_runner,
patch_adaptive_interval_training,
Expand Down Expand Up @@ -45,7 +44,6 @@
"patch_early_stopping",
"patch_persistent_workers",
"prepare_work_dir",
"get_data_cfg",
"OTXConfig",
"adapt_batch_size",
"InputSizeManager",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import numpy as np
from torch.cuda import is_available as cuda_available

from otx.algorithms.common.adapters.mmcv.utils.config_utils import update_or_add_custom_hook
from otx.algorithms.common.adapters.torch.utils import BsSearchAlgo
from otx.algorithms.common.utils.logger import get_logger

Expand Down Expand Up @@ -68,7 +69,7 @@ def train_func_single_iter(batch_size):
# earlystoppinghook => if eval hook is excluded, this hook makes an error due to absence of score history
# CustomEvalHook => exclude validation in classification task
idx_hooks_to_remove = []
hooks_to_remove = ["OTXProgressHook", "earlystoppinghook", "CustomEvalHook"]
hooks_to_remove = ["OTXProgressHook", "earlystoppinghook", "CustomEvalHook", "AdaptiveRepeatDataHook"]
for i, hook in enumerate(copied_cfg.custom_hooks):
if not validate and hook["type"] == "AdaptiveTrainSchedulingHook":
hook["enable_eval_before_run"] = False
Expand All @@ -79,6 +80,7 @@ def train_func_single_iter(batch_size):
if idx_hooks_to_remove:
idx_hooks_to_remove.sort()
for i in reversed(idx_hooks_to_remove):
print(f"{copied_cfg.custom_hooks[i]} to be deleted.")
del copied_cfg.custom_hooks[i]

new_datasets = [SubDataset(datasets[0], batch_size)]
Expand All @@ -90,7 +92,6 @@ def train_func_single_iter(batch_size):
)

default_bs = _get_batch_size(cfg)

bs_search_algo = BsSearchAlgo(
train_func=train_func_single_iter,
default_bs=default_bs,
Expand Down Expand Up @@ -126,6 +127,10 @@ def _set_batch_size(cfg, batch_size: int):
cfg.data.videos_per_gpu = batch_size
else:
cfg.data.train_dataloader["samples_per_gpu"] = batch_size
update_or_add_custom_hook(cfg, {"type": "AdaptiveRepeatDataHook", "train_batch_size": batch_size})
sungmanc marked this conversation as resolved.
Show resolved Hide resolved
for custom_hook in cfg.custom_hooks:
if custom_hook["type"] == "AdaptiveRepeatDataHook":
custom_hook["train_batch_size"] = batch_size


def _set_max_epoch(cfg, max_epoch: int):
Expand Down
46 changes: 28 additions & 18 deletions src/otx/algorithms/common/adapters/mmcv/utils/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import copy
import glob
import math
import multiprocessing
import os
import os.path as osp
Expand Down Expand Up @@ -517,15 +518,11 @@ def patch_from_hyperparams(config: Config, hyperparams, **kwargs):
algo_backend = hyperparams.algo_backend
warmup_iters = int(params.learning_rate_warmup_iters)

model_label_type = config.filename.split("/")[-1]
if "multilabel" in model_label_type:
lr_config = ConfigDict(max_lr=params.learning_rate, warmup=None)
else:
lr_config = (
ConfigDict(warmup_iters=warmup_iters)
if warmup_iters > 0
else ConfigDict(warmup_iters=warmup_iters, warmup=None)
)
lr_config = (
ConfigDict(warmup_iters=warmup_iters)
if warmup_iters > 0
else ConfigDict(warmup_iters=warmup_iters, warmup=None)
)

if params.enable_early_stopping and config.get("evaluation", None):
early_stop = ConfigDict(
Expand Down Expand Up @@ -599,14 +596,6 @@ def prepare_work_dir(config: Union[Config, ConfigDict]) -> str:
return train_round_checkpoint_dir


def get_data_cfg(config: Union[Config, ConfigDict], subset: str = "train") -> Config:
"""Return dataset configs."""
data_cfg = config.data[subset]
while "dataset" in data_cfg:
data_cfg = data_cfg.dataset
return data_cfg


class InputSizeManager:
"""Class for changing input size and getting input size value by checking data pipeline.

Expand Down Expand Up @@ -899,7 +888,6 @@ def get_configured_input_size(

if input_size == InputSizePreset.DEFAULT.value:
return None

logger.info("Given model weight was trained with {} input size.".format(input_size))

else:
Expand Down Expand Up @@ -984,3 +972,25 @@ def area(x):
input_size = self.select_closest_size(input_size, input_size_preset)
logger.info(f"-> Closest preset: {input_size}")
return input_size


def get_proper_repeat_times(
eunwoosh marked this conversation as resolved.
Show resolved Hide resolved
data_size: int,
batch_size: int,
coef: float,
min_repeat: float,
) -> float:
"""Get proper repeat times for adaptive training.

Args:
data_size (int): The total number of the training dataset
batch_size (int): The batch size for the training data loader
coef (float) : coefficient that effects to number of repeats
(coef * math.sqrt(num_iters-1)) +5
min_repeat (float) : minimum repeats
"""
if data_size == 0 or batch_size == 0:
logger.info("Repeat dataset enabled, but not a train mode. repeat times set to 1.")
return 1
n_iters_per_epoch = math.ceil(data_size / batch_size)
return math.floor(max(coef * math.sqrt(n_iters_per_epoch - 1) + 5, min_repeat))
Loading