Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature]Modification of the finetune mechanism #177

Merged
merged 17 commits into from
Jul 11, 2022
Merged
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Note that FederatedScope provides a unified interface for both standalone mode a

The standalone mode in FederatedScope means to simulate multiple participants (servers and clients) in a single device, while participants' data are isolated from each other and their models might be shared via message passing.

Here we demonstrate how to run a standard FL task with FederatedScope, with setting `cfg.data.type = 'FEMNIST'`and `cfg.model.type = 'ConvNet2'` to run vanilla FedAvg for an image classification task. Users can customize training configurations, such as `cfg.federated.total_round_num`, `cfg.data.batch_size`, and `cfg.optimizer.lr`, in the configuration (a .yaml file), and run a standard FL task as:
Here we demonstrate how to run a standard FL task with FederatedScope, with setting `cfg.data.type = 'FEMNIST'`and `cfg.model.type = 'ConvNet2'` to run vanilla FedAvg for an image classification task. Users can customize training configurations, such as `cfg.federated.total_round_num`, `cfg.data.batch_size`, and `cfg.train.optimizer.lr`, in the configuration (a .yaml file), and run a standard FL task as:

```bash
# Run with default configurations
Expand Down
2 changes: 1 addition & 1 deletion demo/bbo.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def eval_fl_algo(x):
init_cfg = global_cfg.clone()
init_cfg.merge_from_file(
"federatedscope/example_configs/single_process.yaml")
init_cfg.merge_from_list(["optimizer.lr", float(x[0])])
init_cfg.merge_from_list(["train.optimizer.lr", float(x[0])])

update_logger(init_cfg, True)
setup_seed(init_cfg.seed)
Expand Down
4 changes: 2 additions & 2 deletions demo/hpbandster/rs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def eval_fl_algo(x, b):
"federatedscope/example_configs/single_process.yaml")
# specify the configuration of interest
init_cfg.merge_from_list([
"optimizer.lr",
float(x['lr']), "optimizer.weight_decay",
"train.optimizer.lr",
float(x['lr']), "train.optimizer.weight_decay",
float(x['wd']), "model.dropout",
float(x["dropout"])
])
Expand Down
6 changes: 3 additions & 3 deletions federatedscope/attack/worker_as_attacker/server_attacker.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def _get_reconstructor(self):
lr=self._cfg.attack.reconstruct_lr,
federate_loss_fn=self.model_criterion,
device=self.device,
federate_lr=self._cfg.optimizer.lr,
federate_lr=self._cfg.train.optimizer.lr,
optim=self._cfg.attack.reconstruct_optim,
info_diff_type=self._cfg.attack.info_diff_type,
federate_method=self._cfg.federate.method,
Expand Down Expand Up @@ -166,9 +166,9 @@ def __init__(self,
device=self.device,
grad_clip=self._cfg.grad.grad_clip,
dataset_name=self._cfg.data.type,
fl_local_update_num=self._cfg.federate.local_update_steps,
fl_local_update_num=self._cfg.train.local_update_steps,
fl_type_optimizer=self._cfg.fedopt.optimizer.type,
fl_lr=self._cfg.optimizer.lr,
fl_lr=self._cfg.train.optimizer.lr,
batch_size=100)

# self.optimizer = get_optimizer(type=self._cfg.fedopt.type_optimizer, model=self.model,lr=self._cfg.fedopt.optimizer.lr)
Expand Down
2 changes: 1 addition & 1 deletion federatedscope/autotune/fedex/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def callback_funcs_for_evaluate(self, message: Message):
if message.content != None:
model_params = message.content["model_param"]
self.trainer.update(model_params)
if self._cfg.trainer.finetune.before_eval:
if self._cfg.finetune.before_eval:
self.trainer.finetune()
metrics = {}
for split in self._cfg.eval.split:
Expand Down
20 changes: 20 additions & 0 deletions federatedscope/core/auxiliaries/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
def use_diff(func):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rayrayraykk Please help me check this modification. Make sure it is consistent with my original implementation. Thanks!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part looks good to me.

def wrapper(self, *args, **kwargs):
if self.cfg.federate.use_diff:
# TODO: any issue for subclasses?
before_metric = self.evaluate(target_data_split_name='val')

num_samples_train, model_para, result_metric = func(
self, *args, **kwargs)

if self.cfg.federate.use_diff:
# TODO: any issue for subclasses?
after_metric = self.evaluate(target_data_split_name='val')
result_metric['val_total'] = before_metric['val_total']
result_metric['val_avg_loss_before'] = before_metric[
'val_avg_loss']
result_metric['val_avg_loss_after'] = after_metric['val_avg_loss']

return num_samples_train, model_para, result_metric

return wrapper
35 changes: 35 additions & 0 deletions federatedscope/core/auxiliaries/eunms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
class MODE:
"""

Note:
Currently StrEnum cannot be imported with the environment `sys.version_info < (3, 11)`, so we simply create a
MODE class here.
"""
TRAIN = 'train'
TEST = 'test'
VAL = 'val'
FINETUNE = 'finetune'


class TRIGGER:
ON_FIT_START = 'on_fit_start'
ON_EPOCH_START = 'on_epoch_start'
ON_BATCH_START = 'on_batch_start'
ON_BATCH_FORWARD = 'on_batch_forward'
ON_BATCH_BACKWARD = 'on_batch_backward'
ON_BATCH_END = 'on_batch_end'
ON_EPOCH_END = 'on_epoch_end'
ON_FIT_END = 'on_fit_end'

@classmethod
def contains(cls, item):
return item in [
"on_fit_start",
"on_epoch_start",
"on_batch_start",
"on_batch_forward",
"on_batch_backward",
"on_batch_end",
"on_epoch_end",
"on_fit_end"
]
2 changes: 1 addition & 1 deletion federatedscope/core/auxiliaries/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def update_logger(cfg, clear_before_add=False):
if cfg.outdir == "":
cfg.outdir = os.path.join(os.getcwd(), "exp")
if cfg.expname == "":
cfg.expname = f"{cfg.federate.method}_{cfg.model.type}_on_{cfg.data.type}_lr{cfg.optimizer.lr}_lstep{cfg.federate.local_update_steps}"
cfg.expname = f"{cfg.federate.method}_{cfg.model.type}_on_{cfg.data.type}_lr{cfg.train.optimizer.lr}_lstep{cfg.train.local_update_steps}"
cfg.expname = f"{cfg.expname}_{cfg.expname_tag}"
cfg.outdir = os.path.join(cfg.outdir, cfg.expname)

Expand Down
4 changes: 2 additions & 2 deletions federatedscope/core/configs/cfg_fl_algo.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ def extend_fl_algo_cfg(cfg):
def assert_fl_algo_cfg(cfg):
if cfg.personalization.local_update_steps == -1:
# By default, use the same step to normal mode
cfg.personalization.local_update_steps = cfg.federate.local_update_steps
cfg.personalization.local_update_steps = cfg.train.local_update_steps

if cfg.personalization.lr <= 0.0:
# By default, use the same lr to normal mode
cfg.personalization.lr = cfg.optimizer.lr
cfg.personalization.lr = cfg.train.optimizer.lr


register_config("fl_algo", extend_fl_algo_cfg)
7 changes: 0 additions & 7 deletions federatedscope/core/configs/cfg_fl_setting.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ def extend_fl_setting_cfg(cfg):
cfg.federate.sample_client_rate = -1.0
cfg.federate.total_round_num = 50
cfg.federate.mode = 'standalone'
cfg.federate.local_update_steps = 1
cfg.federate.batch_or_epoch = 'batch'
cfg.federate.share_local_model = False
cfg.federate.data_weighted_aggr = False # If True, the weight of aggr is the number of training samples in dataset.
cfg.federate.online_aggr = False
Expand Down Expand Up @@ -64,11 +62,6 @@ def extend_fl_setting_cfg(cfg):


def assert_fl_setting_cfg(cfg):
if cfg.federate.batch_or_epoch not in ['batch', 'epoch']:
raise ValueError(
"Value of 'cfg.federate.batch_or_epoch' must be chosen from ['batch', 'epoch']."
)

assert cfg.federate.mode in ["standalone", "distributed"], \
f"Please specify the cfg.federate.mode as the string standalone or distributed. But got {cfg.federate.mode}."

Expand Down
2 changes: 1 addition & 1 deletion federatedscope/core/configs/cfg_hpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def assert_hpo_cfg(cfg):

assert not (cfg.hpo.fedex.use and cfg.federate.use_ss
), "Cannot use secret sharing and FedEx at the same time"
assert cfg.optimizer.type == 'SGD' or not cfg.hpo.fedex.use, "SGD is required if FedEx is considered"
assert cfg.train.optimizer.type == 'SGD' or not cfg.hpo.fedex.use, "SGD is required if FedEx is considered"
assert cfg.hpo.fedex.sched in [
'adaptive', 'aggressive', 'auto', 'constant', 'scale'
], "schedule of FedEx must be choice from {}".format(
Expand Down
54 changes: 35 additions & 19 deletions federatedscope/core/configs/cfg_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,38 @@ def extend_training_cfg(cfg):
cfg.trainer = CN()

cfg.trainer.type = 'general'
cfg.trainer.finetune = CN()
cfg.trainer.finetune.before_eval = False
cfg.trainer.finetune.steps = 5
cfg.trainer.finetune.lr = 0.01
cfg.trainer.finetune.freeze_param = "" # parameters frozen in fine-tuning stage
# cfg.trainer.finetune.only_psn = True

# ------------------------------------------------------------------------ #
# Optimizer related options
# Training related options
# ------------------------------------------------------------------------ #
cfg.optimizer = CN(new_allowed=True)
cfg.train = CN()

cfg.optimizer.type = 'SGD'
cfg.optimizer.lr = 0.1
cfg.train.local_update_steps = 1
cfg.train.batch_or_epoch = 'batch'

cfg.train.optimizer = CN(new_allowed=True)
cfg.train.optimizer.type = 'SGD'
cfg.train.optimizer.lr = 0.1

# ------------------------------------------------------------------------ #
# Gradient related options
# Finetune related options
# ------------------------------------------------------------------------ #
cfg.grad = CN()
cfg.grad.grad_clip = -1.0 # negative numbers indicate we do not clip grad
cfg.finetune = CN()

cfg.finetune.before_eval = False
cfg.finetune.local_update_steps = 1
cfg.finetune.batch_or_epoch = 'epoch'
cfg.finetune.freeze_param = ""

cfg.finetune.optimizer = CN(new_allowed=True)
cfg.finetune.optimizer.type = 'SGD'
cfg.finetune.optimizer.lr = 0.1

# ------------------------------------------------------------------------ #
# lr_scheduler related options
# Gradient related options
# ------------------------------------------------------------------------ #
# cfg.lr_scheduler = CN()
# cfg.lr_scheduler.type = 'StepLR'
# cfg.lr_scheduler.schlr_params = dict()
cfg.grad = CN()
cfg.grad.grad_clip = -1.0 # negative numbers indicate we do not clip grad

# ------------------------------------------------------------------------ #
# Early stopping related options
Expand All @@ -57,6 +62,17 @@ def extend_training_cfg(cfg):


def assert_training_cfg(cfg):
if cfg.train.batch_or_epoch not in ['batch', 'epoch']:
raise ValueError(
"Value of 'cfg.train.batch_or_epoch' must be chosen from ['batch', 'epoch']."
)

if cfg.finetune.batch_or_epoch not in ['batch', 'epoch']:
raise ValueError(
"Value of 'cfg.finetune.batch_or_epoch' must be chosen from ['batch', 'epoch']."
)

# TODO: should not be here?
if cfg.backend not in ['torch', 'tensorflow']:
raise ValueError(
"Value of 'cfg.backend' must be chosen from ['torch', 'tensorflow']."
Expand All @@ -69,9 +85,9 @@ def assert_training_cfg(cfg):
raise ValueError(
"We only support run with cpu when backend is tensorflow")

if cfg.trainer.finetune.before_eval is False and cfg.trainer.finetune.steps <= 0:
if cfg.finetune.before_eval is False and cfg.finetune.local_update_steps <= 0:
raise ValueError(
f"When adopting fine-tuning, please set a valid local fine-tune steps, got {cfg.trainer.finetune.steps}"
f"When adopting fine-tuning, please set a valid local fine-tune steps, got {cfg.finetune.local_update_steps}"
)


Expand Down
33 changes: 26 additions & 7 deletions federatedscope/core/trainers/context.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import logging

import math
import logging

from federatedscope.core.auxiliaries.criterion_builder import get_criterion
from federatedscope.core.auxiliaries.optimizer_builder import get_optimizer
from federatedscope.core.auxiliaries.model_builder import get_trainable_para_names
from federatedscope.core.auxiliaries.regularizer_builder import get_regularizer
from federatedscope.core.auxiliaries.eunms import MODE

logger = logging.getLogger(__name__)


class Context(dict):
Expand Down Expand Up @@ -102,7 +103,6 @@ def setup_vars(self):
self.criterion = get_criterion(self.cfg.criterion.type,
self.device)
self.regularizer = get_regularizer(self.cfg.regularizer.type)
self.optimizer = get_optimizer(self.model, **self.cfg.optimizer)
self.grad_clip = self.cfg.grad.grad_clip
elif self.cfg.backend == 'tensorflow':
self.trainable_para_names = self.model.trainable_variables()
Expand All @@ -118,7 +118,7 @@ def setup_vars(self):
if self.train_data is not None or self.train_loader is not None:
# Calculate the number of update steps during training given the local_update_steps
num_train_batch, num_train_batch_last_epoch, num_train_epoch, num_total_train_batch = self.pre_calculate_batch_epoch_num(
self.cfg.federate.local_update_steps)
self.cfg.train.local_update_steps)

self.num_train_epoch = num_train_epoch
self.num_train_batch = num_train_batch
Expand All @@ -142,7 +142,7 @@ def pre_calculate_batch_epoch_num(self, local_update_steps):
num_train_batch = self.num_train_data // self.cfg.data.batch_size + int(
not self.cfg.data.drop_last
and bool(self.num_train_data % self.cfg.data.batch_size))
if self.cfg.federate.batch_or_epoch == "epoch":
if self.cfg.train.batch_or_epoch == "epoch":
num_train_epoch = local_update_steps
num_train_batch_last_epoch = num_train_batch
num_total_train_batch = local_update_steps * num_train_batch
Expand Down Expand Up @@ -170,7 +170,9 @@ def pop_mode(self):
def change_mode(self, mode):
# change state
if self.cfg.backend == 'torch':
getattr(self.model, mode if mode == 'train' else 'eval')()
getattr(
self.model, 'train'
if mode == MODE.TRAIN or mode == MODE.FINETUNE else 'eval')()
else:
pass

Expand All @@ -183,3 +185,20 @@ def reset_used_dataset(self):
self.cur_data_splits_used_by_routine.pop()
self.cur_data_split = self.cur_data_splits_used_by_routine[-1] if \
len(self.cur_data_splits_used_by_routine) != 0 else None

def check_data_split(self, target_data_split_name, skip=False):
if self.get(
f"{target_data_split_name}_data") is None and self.get(
f"{target_data_split_name}_loader") is None:
if skip:
logger.warning(
f"No {target_data_split_name}_data or {target_data_split_name}_loader in the trainer, will skip evaluation"
f"If this is not the case you want, please check whether there is typo for the name"
)
return False
else:
raise ValueError(
f"No {target_data_split_name}_data or {target_data_split_name}_loader in the trainer"
)
else:
return True
Loading