Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

support multi-phase in hyperband #1257

Merged
merged 9 commits into from
Jul 12, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/sdk/pynni/nni/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ def main():

if args.advisor_class_name:
# advisor is enabled and starts to run
if args.multi_phase:
raise AssertionError('multi_phase has not been supported in advisor')
if args.advisor_class_name in AdvisorModuleName:
dispatcher = create_builtin_class_instance(
args.advisor_class_name,
Expand Down
144 changes: 105 additions & 39 deletions src/sdk/pynni/nni/bohb_advisor/bohb_advisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from nni.protocol import CommandType, send
from nni.msg_dispatcher_base import MsgDispatcherBase
from nni.utils import OptimizeMode, extract_scalar_reward, randint_to_quniform
from nni.common import multi_phase_enabled

from .config_generator import CG_BOHB

Expand Down Expand Up @@ -328,6 +329,13 @@ def __init__(self,
# config generator
self.cg = None

# record the latest parameter_id of the trial job trial_job_id.
# if there is no running parameter_id, self.job_id_para_id_map[trial_job_id] == None
# new trial job is added to this dict and finished trial job is removed from it.
self.job_id_para_id_map = dict()
# record the unsatisfied parameter request from trial jobs
self.unsatisfied_jobs = []

def load_checkpoint(self):
pass
Copy link
Contributor

@chicm-ms chicm-ms Jul 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load_checkpoint and save_checkpoint can be removed if they are empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed, thx!


Expand Down Expand Up @@ -398,7 +406,7 @@ def handle_request_trial_jobs(self, data):
for _ in range(self.credit):
self._request_one_trial_job()

def _request_one_trial_job(self):
def _get_one_trial_job(self):
"""get one trial job, i.e., one hyperparameter configuration.

If this function is called, Command will be sent by BOHB:
Expand All @@ -422,7 +430,7 @@ def _request_one_trial_job(self):
'parameters': ''
}
send(CommandType.NoMoreTrialJobs, json_tricks.dumps(ret))
return
return None
assert self.generated_hyper_configs
params = self.generated_hyper_configs.pop()
ret = {
Expand All @@ -431,8 +439,29 @@ def _request_one_trial_job(self):
'parameters': params[1]
}
self.parameters[params[0]] = params[1]
send(CommandType.NewTrialJob, json_tricks.dumps(ret))
self.credit -= 1
return ret

def _request_one_trial_job(self):
"""get one trial job, i.e., one hyperparameter configuration.

If this function is called, Command will be sent by BOHB:
a. If there is a parameter need to run, will return "NewTrialJob" with a dict:
{
'parameter_id': id of new hyperparameter
'parameter_source': 'algorithm'
'parameters': value of new hyperparameter
}
b. If BOHB don't have parameter waiting, will return "NoMoreTrialJobs" with
{
'parameter_id': '-1_0_0',
'parameter_source': 'algorithm',
'parameters': ''
}
"""
ret = self._get_one_trial_job()
if ret is not None:
send(CommandType.NewTrialJob, json_tricks.dumps(ret))
self.credit -= 1

def handle_update_search_space(self, data):
"""change json format to ConfigSpace format dict<dict> -> configspace
Expand Down Expand Up @@ -501,23 +530,38 @@ def handle_trial_end(self, data):
hyper_params: the hyperparameters (a string) generated and returned by tuner
"""
logger.debug('Tuner handle trial end, result is %s', data)

hyper_params = json_tricks.loads(data['hyper_params'])
s, i, _ = hyper_params['parameter_id'].split('_')
self._handle_trial_end(hyper_params['parameter_id'])
if data['trial_job_id'] in self.job_id_para_id_map:
del self.job_id_para_id_map[data['trial_job_id']]

def _send_new_trial(self):
while self.unsatisfied_jobs:
ret = self._get_one_trial_job()
if ret is None:
break
one_unsatisfied = self.unsatisfied_jobs.pop(0)
ret['trial_job_id'] = one_unsatisfied['trial_job_id']
ret['parameter_index'] = one_unsatisfied['parameter_index']
# update parameter_id in self.job_id_para_id_map
self.job_id_para_id_map[ret['trial_job_id']] = ret['parameter_id']
send(CommandType.SendTrialJobParameter, json_tricks.dumps(ret))
for _ in range(self.credit):
self._request_one_trial_job()

def _handle_trial_end(self, parameter_id):
s, i, _ = parameter_id.split('_')
hyper_configs = self.brackets[int(s)].inform_trial_end(int(i))

if hyper_configs is not None:
logger.debug(
'bracket %s next round %s, hyper_configs: %s', s, i, hyper_configs)
self.generated_hyper_configs = self.generated_hyper_configs + hyper_configs
for _ in range(self.credit):
self._request_one_trial_job()
# Finish this bracket and generate a new bracket
elif self.brackets[int(s)].no_more_trial:
self.curr_s -= 1
self.generate_new_bracket()
for _ in range(self.credit):
self._request_one_trial_job()
self._send_new_trial()

def handle_report_metric_data(self, data):
"""reveice the metric data and update Bayesian optimization with final result
Expand All @@ -534,36 +578,58 @@ def handle_report_metric_data(self, data):
"""
logger.debug('handle report metric data = %s', data)

assert 'value' in data
value = extract_scalar_reward(data['value'])
if self.optimize_mode is OptimizeMode.Maximize:
reward = -value
else:
reward = value
assert 'parameter_id' in data
s, i, _ = data['parameter_id'].split('_')

logger.debug('bracket id = %s, metrics value = %s, type = %s', s, value, data['type'])
s = int(s)

assert 'type' in data
if data['type'] == 'FINAL':
# and PERIODICAL metric are independent, thus, not comparable.
assert 'sequence' in data
self.brackets[s].set_config_perf(
int(i), data['parameter_id'], sys.maxsize, value)
self.completed_hyper_configs.append(data)

_parameters = self.parameters[data['parameter_id']]
_parameters.pop(_KEY)
# update BO with loss, max_s budget, hyperparameters
self.cg.new_result(loss=reward, budget=data['sequence'], parameters=_parameters, update_model=True)
elif data['type'] == 'PERIODICAL':
self.brackets[s].set_config_perf(
int(i), data['parameter_id'], data['sequence'], value)
if data['type'] == 'REQUEST_PARAMETER':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abstract these command into a command(class instead of using string)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thx!

assert multi_phase_enabled()
assert data['trial_job_id'] is not None
assert data['parameter_index'] is not None
assert data['trial_job_id'] in self.job_id_para_id_map
self._handle_trial_end(self.job_id_para_id_map[data['trial_job_id']])
ret = self._get_one_trial_job()
if ret is None:
self.unsatisfied_jobs.append({'trial_job_id': data['trial_job_id'], 'parameter_index': data['parameter_index']})
else:
ret['trial_job_id'] = data['trial_job_id']
ret['parameter_index'] = data['parameter_index']
# update parameter_id in self.job_id_para_id_map
self.job_id_para_id_map[data['trial_job_id']] = ret['parameter_id']
send(CommandType.SendTrialJobParameter, json_tricks.dumps(ret))
else:
raise ValueError(
'Data type not supported: {}'.format(data['type']))
assert 'value' in data
value = extract_scalar_reward(data['value'])
if self.optimize_mode is OptimizeMode.Maximize:
reward = -value
else:
reward = value
assert 'parameter_id' in data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could assert after 591 line.

s, i, _ = data['parameter_id'].split('_')
logger.debug('bracket id = %s, metrics value = %s, type = %s', s, value, data['type'])
s = int(s)

# add <trial_job_id, parameter_id> to self.job_id_para_id_map here,
# because when the first parameter_id is created, trial_job_id is not known yet.
if data['trial_job_id'] in self.job_id_para_id_map:
assert self.job_id_para_id_map[data['trial_job_id']] == data['parameter_id']
else:
self.job_id_para_id_map[data['trial_job_id']] = data['parameter_id']

assert 'type' in data
if data['type'] == 'FINAL':
# and PERIODICAL metric are independent, thus, not comparable.
assert 'sequence' in data
self.brackets[s].set_config_perf(
int(i), data['parameter_id'], sys.maxsize, value)
self.completed_hyper_configs.append(data)

_parameters = self.parameters[data['parameter_id']]
_parameters.pop(_KEY)
# update BO with loss, max_s budget, hyperparameters
self.cg.new_result(loss=reward, budget=data['sequence'], parameters=_parameters, update_model=True)
elif data['type'] == 'PERIODICAL':
self.brackets[s].set_config_perf(
int(i), data['parameter_id'], data['sequence'], value)
else:
raise ValueError(
'Data type not supported: {}'.format(data['type']))

def handle_add_customized_trial(self, data):
pass
Expand Down
92 changes: 60 additions & 32 deletions src/sdk/pynni/nni/hyperband_advisor/hyperband_advisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from nni.protocol import CommandType, send
from nni.msg_dispatcher_base import MsgDispatcherBase
from nni.common import init_logger
from nni.common import init_logger, multi_phase_enabled
from nni.utils import NodeType, OptimizeMode, extract_scalar_reward, randint_to_quniform
import nni.parameter_expressions as parameter_expressions

Expand Down Expand Up @@ -277,7 +277,7 @@ class Hyperband(MsgDispatcherBase):
optimize_mode: str
optimize mode, 'maximize' or 'minimize'
"""
def __init__(self, R, eta=3, optimize_mode='maximize'):
def __init__(self, R=60, eta=3, optimize_mode='maximize'):
"""B = (s_max + 1)R"""
super(Hyperband, self).__init__()
self.R = R # pylint: disable=invalid-name
Expand All @@ -296,6 +296,11 @@ def __init__(self, R, eta=3, optimize_mode='maximize'):
# In this case, tuner increases self.credit to issue a trial config sometime later.
self.credit = 0

# record the latest parameter_id of the trial job trial_job_id.
# if there is no running parameter_id, self.job_id_para_id_map[trial_job_id] == None
# new trial job is added to this dict and finished trial job is removed from it.
self.job_id_para_id_map = dict()

def load_checkpoint(self):
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load_checkpoint and save_checkpoint can be removed if they are empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed, thx!


Expand All @@ -321,9 +326,10 @@ def handle_request_trial_jobs(self, data):
number of trial jobs
"""
for _ in range(data):
self._request_one_trial_job()
ret = self._get_one_trial_job()
send(CommandType.NewTrialJob, json_tricks.dumps(ret))

def _request_one_trial_job(self):
def _get_one_trial_job(self):
"""get one trial job, i.e., one hyperparameter configuration."""
if not self.generated_hyper_configs:
if self.curr_s < 0:
Expand All @@ -346,7 +352,8 @@ def _request_one_trial_job(self):
'parameter_source': 'algorithm',
'parameters': params[1]
}
send(CommandType.NewTrialJob, json_tricks.dumps(ret))
return ret


def handle_update_search_space(self, data):
"""data: JSON object, which is search space
Expand All @@ -360,6 +367,18 @@ def handle_update_search_space(self, data):
randint_to_quniform(self.searchspace_json)
self.random_state = np.random.RandomState()

def _handle_trial_end(self, parameter_id):
"""
Parameters
----------
parameter_id: parameter id of the finished config
"""
bracket_id, i, _ = parameter_id.split('_')
hyper_configs = self.brackets[int(bracket_id)].inform_trial_end(int(i))
if hyper_configs is not None:
_logger.debug('bracket %s next round %s, hyper_configs: %s', bracket_id, i, hyper_configs)
self.generated_hyper_configs = self.generated_hyper_configs + hyper_configs

def handle_trial_end(self, data):
"""
Parameters
Expand All @@ -371,22 +390,9 @@ def handle_trial_end(self, data):
hyper_params: the hyperparameters (a string) generated and returned by tuner
"""
hyper_params = json_tricks.loads(data['hyper_params'])
bracket_id, i, _ = hyper_params['parameter_id'].split('_')
hyper_configs = self.brackets[int(bracket_id)].inform_trial_end(int(i))
if hyper_configs is not None:
_logger.debug('bracket %s next round %s, hyper_configs: %s', bracket_id, i, hyper_configs)
self.generated_hyper_configs = self.generated_hyper_configs + hyper_configs
for _ in range(self.credit):
if not self.generated_hyper_configs:
break
params = self.generated_hyper_configs.pop()
ret = {
'parameter_id': params[0],
'parameter_source': 'algorithm',
'parameters': params[1]
}
send(CommandType.NewTrialJob, json_tricks.dumps(ret))
self.credit -= 1
self._handle_trial_end(hyper_params['parameter_id'])
if data['trial_job_id'] in self.job_id_para_id_map:
del self.job_id_para_id_map[data['trial_job_id']]

def handle_report_metric_data(self, data):
"""
Expand All @@ -400,18 +406,40 @@ def handle_report_metric_data(self, data):
ValueError
Data type not supported
"""
value = extract_scalar_reward(data['value'])
bracket_id, i, _ = data['parameter_id'].split('_')
bracket_id = int(bracket_id)
if data['type'] == 'FINAL':
# sys.maxsize indicates this value is from FINAL metric data, because data['sequence'] from FINAL metric
# and PERIODICAL metric are independent, thus, not comparable.
self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], sys.maxsize, value)
self.completed_hyper_configs.append(data)
elif data['type'] == 'PERIODICAL':
self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], data['sequence'], value)
if data['type'] == 'REQUEST_PARAMETER':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thx!

assert multi_phase_enabled()
assert data['trial_job_id'] is not None
assert data['parameter_index'] is not None
assert data['trial_job_id'] in self.job_id_para_id_map
self._handle_trial_end(self.job_id_para_id_map[data['trial_job_id']])
ret = self._get_one_trial_job()
if data['trial_job_id'] is not None:
ret['trial_job_id'] = data['trial_job_id']
if data['parameter_index'] is not None:
ret['parameter_index'] = data['parameter_index']
self.job_id_para_id_map[data['trial_job_id']] = ret['parameter_id']
send(CommandType.SendTrialJobParameter, json_tricks.dumps(ret))
else:
raise ValueError('Data type not supported: {}'.format(data['type']))
value = extract_scalar_reward(data['value'])
bracket_id, i, _ = data['parameter_id'].split('_')
bracket_id = int(bracket_id)

# add <trial_job_id, parameter_id> to self.job_id_para_id_map here,
# because when the first parameter_id is created, trial_job_id is not known yet.
if data['trial_job_id'] in self.job_id_para_id_map:
assert self.job_id_para_id_map[data['trial_job_id']] == data['parameter_id']
else:
self.job_id_para_id_map[data['trial_job_id']] = data['parameter_id']

if data['type'] == 'FINAL':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thx!

# sys.maxsize indicates this value is from FINAL metric data, because data['sequence'] from FINAL metric
# and PERIODICAL metric are independent, thus, not comparable.
self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], sys.maxsize, value)
self.completed_hyper_configs.append(data)
elif data['type'] == 'PERIODICAL':
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thx!

self.brackets[bracket_id].set_config_perf(int(i), data['parameter_id'], data['sequence'], value)
else:
raise ValueError('Data type not supported: {}'.format(data['type']))

def handle_add_customized_trial(self, data):
pass
Expand Down