diff --git a/README.md b/README.md index d39ba80f..09a9519f 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ If you have [Anaconda](https://www.anaconda.com/products/distribution#download-s cd FedScale # Please replace ~/.bashrc with ~/.bash_profile for MacOS +FEDSCALE_HOME=$(pwd) echo export FEDSCALE_HOME=$(pwd) >> ~/.bashrc echo alias fedscale=\'bash $FEDSCALE_HOME/fedscale.sh\' >> ~/.bashrc conda init bash diff --git a/fedscale/core/aggregation/aggregator.py b/fedscale/core/aggregation/aggregator.py index 769ca8fe..970cb1cf 100755 --- a/fedscale/core/aggregation/aggregator.py +++ b/fedscale/core/aggregation/aggregator.py @@ -236,7 +236,7 @@ def client_register_handler(self, executorId, info): clientId = ( self.num_of_clients+1) if self.experiment_mode == commons.SIMULATION_MODE else executorId - self.client_manager.registerClient( + self.client_manager.register_client( executorId, clientId, size=_size, speed=systemProfile) self.client_manager.registerDuration(clientId, batch_size=self.args.batch_size, upload_step=self.args.local_steps, upload_size=self.model_update_size, download_size=self.model_update_size) @@ -356,7 +356,7 @@ def select_participants(self, select_num_participants, overcommitment=1.3): list of int: The list of sampled clients id. """ - return sorted(self.client_manager.resampleClients( + return sorted(self.client_manager.select_participants( int(select_num_participants*overcommitment), cur_time=self.global_virtual_clock), ) @@ -379,7 +379,7 @@ def client_completion_handler(self, results): self.stats_util_accumulator.append(results['utility']) self.loss_accumulator.append(results['moving_loss']) - self.client_manager.registerScore(results['clientId'], results['utility'], + self.client_manager.register_feedback(results['clientId'], results['utility'], auxi=math.sqrt( results['moving_loss']), time_stamp=self.round, @@ -508,7 +508,7 @@ def round_completion_handler(self): max(1, len(self.stats_util_accumulator)) # assign avg reward to explored, but not ran workers for clientId in self.round_stragglers: - self.client_manager.registerScore(clientId, avgUtilLastround, + self.client_manager.register_feedback(clientId, avgUtilLastround, time_stamp=self.round, duration=self.virtual_client_clock[clientId]['computation'] + self.virtual_client_clock[clientId]['communication'], diff --git a/fedscale/core/client_manager.py b/fedscale/core/client_manager.py index a3e0cff7..630e3c3d 100644 --- a/fedscale/core/client_manager.py +++ b/fedscale/core/client_manager.py @@ -2,6 +2,7 @@ import math import pickle from random import Random +from typing import Dict, List from fedscale.core.internal.client import Client @@ -41,7 +42,19 @@ def __init__(self, mode, args, sample_seed=233): self.user_trace_keys = list(self.user_trace.keys()) def registerClient(self, hostId, clientId, size, speed, duration=1): + self.register_client(hostId, clientId, size, speed, duration) + def register_client(self, hostId: int, clientId: int, size: int, speed: Dict[str, float], duration: float=1) -> None: + """Register client information to the client manager. + + Args: + hostId (int): executor Id. + clientId (int): client Id. + size (int): number of samples on this client. + speed (Dict[str, float]): device speed (e.g., compuutation and communication). + duration (float): execution latency. + + """ uniqueId = self.getUniqueId(hostId, clientId) user_trace = None if self.user_trace is None else self.user_trace[self.user_trace_keys[int( clientId) % len(self.user_trace)]] @@ -60,7 +73,7 @@ def registerClient(self, hostId, clientId, size, speed, duration=1): self.ucbSampler.register_client(clientId, feedbacks=feedbacks) else: del self.Clients[uniqueId] - + def getAllClients(self): return self.feasibleClients @@ -80,7 +93,6 @@ def registerDuration(self, clientId, batch_size, upload_step, upload_size, downl clientId, exe_cost['computation']+exe_cost['communication']) def getCompletionTime(self, clientId, batch_size, upload_step, upload_size, download_size): - return self.Clients[self.getUniqueId(0, clientId)].getCompletionTime( batch_size=batch_size, upload_step=upload_step, upload_size=upload_size, download_size=download_size @@ -91,6 +103,20 @@ def registerSpeed(self, hostId, clientId, speed): self.Clients[uniqueId].speed = speed def registerScore(self, clientId, reward, auxi=1.0, time_stamp=0, duration=1., success=True): + self.register_feedback(clientId, reward, auxi=auxi, time_stamp=time_stamp, duration=duration, success=success) + + def register_feedback(self, clientId: int, reward: float, auxi: float=1.0, time_stamp: float=0, duration: float=1., success: bool=True) -> None: + """Collect client execution feedbacks of last round. + + Args: + clientId (int): client Id. + reward (float): execution utilities (processed feedbacks). + auxi (float): unprocessed feedbacks. + time_stamp (float): current wall clock time. + duration (float): system execution duration. + success (bool): whether this client runs successfully. + + """ # currently, we only use distance as reward if self.mode == "oort": feedbacks = { @@ -180,12 +206,22 @@ def getFeasibleClients(self, cur_time): def isClientActive(self, clientId, cur_time): return self.Clients[self.getUniqueId(0, clientId)].isActive(cur_time) - def resampleClients(self, numOfClients, cur_time=0): + def select_participants(self, num_of_clients: int, cur_time: float=0) -> List[int]: + """Select participating clients for current execution task. + + Args: + num_of_clients (int): number of participants to select. + cur_time (float): current wall clock time. + + Returns: + List[int]: indices of selected clients. + + """ self.count += 1 clients_online = self.getFeasibleClients(cur_time) - if len(clients_online) <= numOfClients: + if len(clients_online) <= num_of_clients: return clients_online pickled_clients = None @@ -193,14 +229,17 @@ def resampleClients(self, numOfClients, cur_time=0): if self.mode == "oort" and self.count > 1: pickled_clients = self.ucbSampler.select_participant( - numOfClients, feasible_clients=clients_online_set) + num_of_clients, feasible_clients=clients_online_set) else: self.rng.shuffle(clients_online) - client_len = min(numOfClients, len(clients_online) - 1) + client_len = min(num_of_clients, len(clients_online) - 1) pickled_clients = clients_online[:client_len] return pickled_clients + def resampleClients(self, numOfClients, cur_time=0): + return self.select_participants(numOfClients, cur_time) + def getAllMetrics(self): if self.mode == "oort": return self.ucbSampler.getAllMetrics() diff --git a/install.sh b/install.sh index e50c374d..ed914bf6 100644 --- a/install.sh +++ b/install.sh @@ -1,6 +1,7 @@ #!/bin/bash +FEDSCALE_HOME=$(pwd) echo export FEDSCALE_HOME=$(pwd) >> ~/.bashrc -echo alias fedscale=\'bash ${FEDSCALE_HOME}/fedscale.sh\' >> ~/.bashrc +echo alias fedscale=\'bash ${FEDSCALE_HOME}/fedscale.sh\' >> ~/.bashrc isPackageNotInstalled() { $1 --version &> /dev/null