-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathfo_fl_main.py
132 lines (113 loc) · 4.1 KB
/
fo_fl_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from os import path
import torch
from tensorboardX import SummaryWriter
from tqdm import tqdm
from cezo_fl.fl_helpers import get_client_name, get_server_name
from cezo_fl.util import model_helpers
from experiment_helper import prepare_settings
from experiment_helper.cli_parser import (
GeneralSetting,
DeviceSetting,
DataSetting,
ModelSetting,
OptimizerSetting,
FederatedLearningSetting,
)
from fed_avg.client import FedAvgClient
from fed_avg.server import FedAvgServer
from experiment_helper.data import get_dataloaders
from experiment_helper.device import use_device
class CliSetting(
GeneralSetting,
DeviceSetting,
DataSetting,
ModelSetting,
OptimizerSetting,
FederatedLearningSetting,
):
"""
This is a replacement for regular argparse module.
We used a third party library pydantic_setting to make command line interface easier to manage.
Example:
if __name__ == "__main__":
args = CliSetting()
args will have all parameters defined by all components.
"""
pass
def setup_server_and_clients(
args, device_map: dict[str, torch.device], train_loaders
) -> FedAvgServer:
model_inferences, metrics = prepare_settings.get_model_inferences_and_metrics(
args.dataset, args.model_setting
)
clients = []
for i in range(args.num_clients):
client_name = get_client_name(i)
client_device = device_map[client_name]
client_model = prepare_settings.get_model(args.dataset, args.model_setting, args.seed)
client_model.to(client_device)
client_optimizer = prepare_settings.get_optimizer(
client_model, args.dataset, args.optimizer_setting
)
client = FedAvgClient(
client_model,
model_inferences.train_inference,
train_loaders[i],
client_optimizer,
metrics.train_loss,
metrics.train_acc,
client_device,
)
clients.append(client)
server_device = device_map[get_server_name()]
server_model = prepare_settings.get_model(args.dataset, args.model_setting, args.seed)
server_model.to(server_device)
server = FedAvgServer(
clients,
server_device,
server_model=server_model,
server_model_inference=model_inferences.test_inference,
server_criterion=metrics.test_loss,
server_accuracy_func=metrics.test_acc,
num_sample_clients=args.num_sample_clients,
local_update_steps=args.local_update_steps,
)
return server
if __name__ == "__main__":
args = CliSetting()
print(args)
device_map = use_device(args.device_setting, args.num_clients)
train_loaders, test_loader = get_dataloaders(
args.data_setting, args.num_clients, args.seed, args.get_hf_model_name()
)
server = setup_server_and_clients(args, device_map, train_loaders)
if args.log_to_tensorboard:
tensorboard_sub_folder = "-".join(
[
server.server_model.model_name,
model_helpers.get_current_datetime_str(),
]
)
writer = SummaryWriter(
path.join(
"tensorboards",
"fed_avg",
args.dataset.value,
args.log_to_tensorboard,
tensorboard_sub_folder,
)
)
with tqdm(total=args.iterations, desc="Training:") as t:
for ite in range(args.iterations):
step_loss, step_accuracy = server.train_one_step()
t.set_postfix({"Loss": step_loss, "Accuracy": step_accuracy})
t.update(1)
if args.log_to_tensorboard:
writer.add_scalar("Loss/train", step_loss, ite)
writer.add_scalar("Accuracy/train", step_accuracy, ite)
# eval
if args.eval_iterations != 0 and (ite + 1) % args.eval_iterations == 0:
eval_loss, eval_accuracy = server.eval_model(test_loader, ite)
if args.log_to_tensorboard:
writer.add_scalar("Loss/test", eval_loss, ite)
writer.add_scalar("Accuracy/test", eval_accuracy, ite)