Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

support multi-phase in hyperband #1257

Merged
merged 9 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/sdk/pynni/nni/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ def main():

if args.advisor_class_name:
# advisor is enabled and starts to run
if args.multi_phase:
raise AssertionError('multi_phase has not been supported in advisor')
if args.advisor_class_name in AdvisorModuleName:
dispatcher = create_builtin_class_instance(
args.advisor_class_name,
Expand Down
152 changes: 106 additions & 46 deletions src/sdk/pynni/nni/bohb_advisor/bohb_advisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@

from nni.protocol import CommandType, send
from nni.msg_dispatcher_base import MsgDispatcherBase
from nni.utils import OptimizeMode, extract_scalar_reward, randint_to_quniform
from nni.utils import OptimizeMode, MetricType, extract_scalar_reward, randint_to_quniform
from nni.common import multi_phase_enabled

from .config_generator import CG_BOHB

Expand Down Expand Up @@ -79,7 +80,7 @@ def create_bracket_parameter_id(brackets_id, brackets_curr_decay, increased_id=-
return params_id


class Bracket():
class Bracket(object):
"""
A bracket in BOHB, all the information of a bracket is managed by
an instance of this class.
Expand Down Expand Up @@ -328,11 +329,12 @@ def __init__(self,
# config generator
self.cg = None

def load_checkpoint(self):
pass

def save_checkpoint(self):
pass
# record the latest parameter_id of the trial job trial_job_id.
# if there is no running parameter_id, self.job_id_para_id_map[trial_job_id] == None
# new trial job is added to this dict and finished trial job is removed from it.
self.job_id_para_id_map = dict()
# record the unsatisfied parameter request from trial jobs
self.unsatisfied_jobs = []

def handle_initialize(self, data):
"""Initialize Tuner, including creating Bayesian optimization-based parametric models
Expand Down Expand Up @@ -398,7 +400,7 @@ def handle_request_trial_jobs(self, data):
for _ in range(self.credit):
self._request_one_trial_job()

def _request_one_trial_job(self):
def _get_one_trial_job(self):
"""get one trial job, i.e., one hyperparameter configuration.

If this function is called, Command will be sent by BOHB:
Expand All @@ -422,7 +424,7 @@ def _request_one_trial_job(self):
'parameters': ''
}
send(CommandType.NoMoreTrialJobs, json_tricks.dumps(ret))
return
return None
assert self.generated_hyper_configs
params = self.generated_hyper_configs.pop()
ret = {
Expand All @@ -431,8 +433,29 @@ def _request_one_trial_job(self):
'parameters': params[1]
}
self.parameters[params[0]] = params[1]
send(CommandType.NewTrialJob, json_tricks.dumps(ret))
self.credit -= 1
return ret

def _request_one_trial_job(self):
"""get one trial job, i.e., one hyperparameter configuration.

If this function is called, Command will be sent by BOHB:
a. If there is a parameter need to run, will return "NewTrialJob" with a dict:
{
'parameter_id': id of new hyperparameter
'parameter_source': 'algorithm'
'parameters': value of new hyperparameter
}
b. If BOHB don't have parameter waiting, will return "NoMoreTrialJobs" with
{
'parameter_id': '-1_0_0',
'parameter_source': 'algorithm',
'parameters': ''
}
"""
ret = self._get_one_trial_job()
if ret is not None:
send(CommandType.NewTrialJob, json_tricks.dumps(ret))
self.credit -= 1

def handle_update_search_space(self, data):
"""change json format to ConfigSpace format dict<dict> -> configspace
Expand Down Expand Up @@ -501,23 +524,38 @@ def handle_trial_end(self, data):
hyper_params: the hyperparameters (a string) generated and returned by tuner
"""
logger.debug('Tuner handle trial end, result is %s', data)

hyper_params = json_tricks.loads(data['hyper_params'])
s, i, _ = hyper_params['parameter_id'].split('_')
self._handle_trial_end(hyper_params['parameter_id'])
if data['trial_job_id'] in self.job_id_para_id_map:
del self.job_id_para_id_map[data['trial_job_id']]

def _send_new_trial(self):
while self.unsatisfied_jobs:
ret = self._get_one_trial_job()
if ret is None:
break
one_unsatisfied = self.unsatisfied_jobs.pop(0)
ret['trial_job_id'] = one_unsatisfied['trial_job_id']
ret['parameter_index'] = one_unsatisfied['parameter_index']
# update parameter_id in self.job_id_para_id_map
self.job_id_para_id_map[ret['trial_job_id']] = ret['parameter_id']
send(CommandType.SendTrialJobParameter, json_tricks.dumps(ret))
for _ in range(self.credit):
self._request_one_trial_job()

def _handle_trial_end(self, parameter_id):
s, i, _ = parameter_id.split('_')
hyper_configs = self.brackets[int(s)].inform_trial_end(int(i))

if hyper_configs is not None:
logger.debug(
'bracket %s next round %s, hyper_configs: %s', s, i, hyper_configs)
self.generated_hyper_configs = self.generated_hyper_configs + hyper_configs
for _ in range(self.credit):
self._request_one_trial_job()
# Finish this bracket and generate a new bracket
elif self.brackets[int(s)].no_more_trial:
self.curr_s -= 1
self.generate_new_bracket()
for _ in range(self.credit):
self._request_one_trial_job()
self._send_new_trial()

def handle_report_metric_data(self, data):
"""reveice the metric data and update Bayesian optimization with final result
Expand All @@ -534,36 +572,58 @@ def handle_report_metric_data(self, data):
"""
logger.debug('handle report metric data = %s', data)

assert 'value' in data
value = extract_scalar_reward(data['value'])
if self.optimize_mode is OptimizeMode.Maximize:
reward = -value
else:
reward = value
assert 'parameter_id' in data
s, i, _ = data['parameter_id'].split('_')

logger.debug('bracket id = %s, metrics value = %s, type = %s', s, value, data['type'])
s = int(s)

assert 'type' in data
if data['type'] == 'FINAL':
# and PERIODICAL metric are independent, thus, not comparable.
assert 'sequence' in data
self.brackets[s].set_config_perf(
int(i), data['parameter_id'], sys.maxsize, value)
self.completed_hyper_configs.append(data)

_parameters = self.parameters[data['parameter_id']]
_parameters.pop(_KEY)
# update BO with loss, max_s budget, hyperparameters
self.cg.new_result(loss=reward, budget=data['sequence'], parameters=_parameters, update_model=True)
elif data['type'] == 'PERIODICAL':
self.brackets[s].set_config_perf(
int(i), data['parameter_id'], data['sequence'], value)
if data['type'] == MetricType.REQUEST_PARAMETER:
assert multi_phase_enabled()
assert data['trial_job_id'] is not None
assert data['parameter_index'] is not None
assert data['trial_job_id'] in self.job_id_para_id_map
self._handle_trial_end(self.job_id_para_id_map[data['trial_job_id']])
ret = self._get_one_trial_job()
if ret is None:
self.unsatisfied_jobs.append({'trial_job_id': data['trial_job_id'], 'parameter_index': data['parameter_index']})
else:
ret['trial_job_id'] = data['trial_job_id']
ret['parameter_index'] = data['parameter_index']
# update parameter_id in self.job_id_para_id_map
self.job_id_para_id_map[data['trial_job_id']] = ret['parameter_id']
send(CommandType.SendTrialJobParameter, json_tricks.dumps(ret))
else:
raise ValueError(
'Data type not supported: {}'.format(data['type']))
assert 'value' in data
value = extract_scalar_reward(data['value'])
if self.optimize_mode is OptimizeMode.Maximize:
reward = -value
else:
reward = value
assert 'parameter_id' in data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could assert after 591 line.

s, i, _ = data['parameter_id'].split('_')
logger.debug('bracket id = %s, metrics value = %s, type = %s', s, value, data['type'])
s = int(s)

# add <trial_job_id, parameter_id> to self.job_id_para_id_map here,
# because when the first parameter_id is created, trial_job_id is not known yet.
if data['trial_job_id'] in self.job_id_para_id_map:
assert self.job_id_para_id_map[data['trial_job_id']] == data['parameter_id']
else:
self.job_id_para_id_map[data['trial_job_id']] = data['parameter_id']

assert 'type' in data
if data['type'] == MetricType.FINAL:
# and PERIODICAL metric are independent, thus, not comparable.
assert 'sequence' in data
self.brackets[s].set_config_perf(
int(i), data['parameter_id'], sys.maxsize, value)
self.completed_hyper_configs.append(data)

_parameters = self.parameters[data['parameter_id']]
_parameters.pop(_KEY)
# update BO with loss, max_s budget, hyperparameters
self.cg.new_result(loss=reward, budget=data['sequence'], parameters=_parameters, update_model=True)
elif data['type'] == MetricType.PERIODICAL:
self.brackets[s].set_config_perf(
int(i), data['parameter_id'], data['sequence'], value)
else:
raise ValueError(
'Data type not supported: {}'.format(data['type']))

def handle_add_customized_trial(self, data):
pass
Expand Down
98 changes: 60 additions & 38 deletions src/sdk/pynni/nni/hyperband_advisor/hyperband_advisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@

from nni.protocol import CommandType, send
from nni.msg_dispatcher_base import MsgDispatcherBase
from nni.common import init_logger
from nni.utils import NodeType, OptimizeMode, extract_scalar_reward, randint_to_quniform
from nni.common import init_logger, multi_phase_enabled
from nni.utils import NodeType, OptimizeMode, MetricType, extract_scalar_reward, randint_to_quniform
import nni.parameter_expressions as parameter_expressions

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -277,7 +277,7 @@ class Hyperband(MsgDispatcherBase):
optimize_mode: str
optimize mode, 'maximize' or 'minimize'
"""
def __init__(self, R, eta=3, optimize_mode='maximize'):
def __init__(self, R=60, eta=3, optimize_mode='maximize'):
"""B = (s_max + 1)R"""
super(Hyperband, self).__init__()
self.R = R # pylint: disable=invalid-name
Expand All @@ -296,11 +296,10 @@ def __init__(self, R, eta=3, optimize_mode='maximize'):
# In this case, tuner increases self.credit to issue a trial config sometime later.
self.credit = 0

def load_checkpoint(self):
pass

def save_checkpoint(self):
pass
# record the latest parameter_id of the trial job trial_job_id.
# if there is no running parameter_id, self.job_id_para_id_map[trial_job_id] == None
# new trial job is added to this dict and finished trial job is removed from it.
self.job_id_para_id_map = dict()

def handle_initialize(self, data):
"""data is search space
Expand All @@ -321,9 +320,10 @@ def handle_request_trial_jobs(self, data):
number of trial jobs
"""
for _ in range(data):
self._request_one_trial_job()
ret = self._get_one_trial_job()
send(CommandType.NewTrialJob, json_tricks.dumps(ret))

def _request_one_trial_job(self):
def _get_one_trial_job(self):
"""get one trial job, i.e., one hyperparameter configuration."""
if not self.generated_hyper_configs:
if self.curr_s < 0:
Expand All @@ -346,7 +346,8 @@ def _request_one_trial_job(self):
'parameter_source': 'algorithm',
'parameters': params[1]
}
send(CommandType.NewTrialJob, json_tricks.dumps(ret))
return ret


def handle_update_search_space(self, data):
"""data: JSON object, which is search space
Expand All @@ -360,6 +361,18 @@ def handle_update_search_space(self, data):
randint_to_quniform(self.searchspace_json)
self.random_state = np.random.RandomState()

def _handle_trial_end(self, parameter_id):
"""
Parameters
----------
parameter_id: parameter id of the finished config
"""
bracket_id, i, _ = parameter_id.split('_')
hyper_configs = self.brackets[int(bracket_id)].inform_trial_end(int(i))
if hyper_configs is not None:
_logger.debug('bracket %s next round %s, hyper_configs: %s', bracket_id, i, hyper_configs)
self.generated_hyper_configs = self.generated_hyper_configs + hyper_configs

def handle_trial_end(self, data):
"""
Parameters
Expand All @@ -371,22 +384,9 @@ def handle_trial_end(self, data):
hyper_params: the hyperparameters (a string) generated and returned by tuner
"""
hyper_params = json_tricks.loads(data['hyper_params'])
bracket_id, i, _ = hyper_params['parameter_id'].split('_')
hyper_configs = self.brackets[int(bracket_id)].inform_trial_end(int(i))
if hyper_configs is not None:
_logger.debug('bracket %s next round %s, hyper_configs: %s', bracket_id, i, hyper_configs)
self.generated_hyper_configs = self.generated_hyper_configs + hyper_configs
for _ in range(self.credit):
if not self.generated_hyper_configs:
break
params = self.generated_hyper_configs.pop()
ret = {
'parameter_id': params[0],
'parameter_source': 'algorithm',
'parameters': params[1]
}
send(CommandType.NewTrialJob, json_tricks.dumps(ret))
self.credit -= 1
self._handle_trial_end(hyper_params['parameter_id'])
if data['trial_job_id'] in self.job_id_para_id_map:
del self.job_id_para_id_map[data['trial_job_id']]

def handle_report_metric_data(self, data):
"""
Expand All @@ -400,18 +400,40 @@ def handle_report_metric_data(self, data):
ValueError
Data type not supported
"""
value = extract_scalar_reward(data['value'])
bracket_id, i, _ = data['parameter_id'].split('_')
bracket_id = int(bracket_id)
if data['type'] == 'FINAL':
# sys.maxsize indicates this value is from FINAL metric data, because data['sequence'] from FINAL metric
# and PERIODICAL metric are independent, thus, not comparable.
self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], sys.maxsize, value)
self.completed_hyper_configs.append(data)
elif data['type'] == 'PERIODICAL':
self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], data['sequence'], value)
if data['type'] == MetricType.REQUEST_PARAMETER:
assert multi_phase_enabled()
assert data['trial_job_id'] is not None
assert data['parameter_index'] is not None
assert data['trial_job_id'] in self.job_id_para_id_map
self._handle_trial_end(self.job_id_para_id_map[data['trial_job_id']])
ret = self._get_one_trial_job()
if data['trial_job_id'] is not None:
ret['trial_job_id'] = data['trial_job_id']
if data['parameter_index'] is not None:
ret['parameter_index'] = data['parameter_index']
self.job_id_para_id_map[data['trial_job_id']] = ret['parameter_id']
send(CommandType.SendTrialJobParameter, json_tricks.dumps(ret))
else:
raise ValueError('Data type not supported: {}'.format(data['type']))
value = extract_scalar_reward(data['value'])
bracket_id, i, _ = data['parameter_id'].split('_')
bracket_id = int(bracket_id)

# add <trial_job_id, parameter_id> to self.job_id_para_id_map here,
# because when the first parameter_id is created, trial_job_id is not known yet.
if data['trial_job_id'] in self.job_id_para_id_map:
assert self.job_id_para_id_map[data['trial_job_id']] == data['parameter_id']
else:
self.job_id_para_id_map[data['trial_job_id']] = data['parameter_id']

if data['type'] == MetricType.FINAL:
# sys.maxsize indicates this value is from FINAL metric data, because data['sequence'] from FINAL metric
# and PERIODICAL metric are independent, thus, not comparable.
self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], sys.maxsize, value)
self.completed_hyper_configs.append(data)
elif data['type'] == MetricType.PERIODICAL:
self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], data['sequence'], value)
else:
raise ValueError('Data type not supported: {}'.format(data['type']))

def handle_add_customized_trial(self, data):
pass
Expand Down
Loading