Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
support hybrid training service v2.0 config (#3251)
Browse files Browse the repository at this point in the history
  • Loading branch information
QuanluZhang authored Jan 6, 2021
1 parent 6330df2 commit b177bdc
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ remoteConfig:
reuse: true
machineList:
- ip: 10.1.1.1
username: bob
passwd: bob123
#port can be skip if using default ssh port 22
#port: 22
username: xxx
passwd: xxx
port: 22
24 changes: 24 additions & 0 deletions examples/trials/mnist-tfv1/config_hybrid_v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
experimentName: example_mnist
trialConcurrency: 3
maxExperimentDuration: 1h
maxTrialNumber: 10
searchSpaceFile: search_space.json

trialCodeDirectory: .
trialCommand: python3 mnist.py
trialGpuNumber: 0
tuner:
name: TPE
classArgs:
optimize_mode: maximize

trainingService:
- platform: local
- platform: remote
reuseMode: true
machineList:
- host: 10.1.1.1
user: xxx
password: xxx
#port can be skip if using default ssh port 22
port: 22
35 changes: 35 additions & 0 deletions examples/trials/mnist-tfv1/launch_hybrid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# FIXME: For demonstration only. It should not be here

from pathlib import Path

from nni.experiment import Experiment
from nni.experiment import RemoteMachineConfig
from nni.algorithms.hpo.hyperopt_tuner import HyperoptTuner

tuner = HyperoptTuner('tpe')

search_space = {
"dropout_rate": { "_type": "uniform", "_value": [0.5, 0.9] },
"conv_size": { "_type": "choice", "_value": [2, 3, 5, 7] },
"hidden_size": { "_type": "choice", "_value": [124, 512, 1024] },
"batch_size": { "_type": "choice", "_value": [16, 32] },
"learning_rate": { "_type": "choice", "_value": [0.0001, 0.001, 0.01, 0.1] }
}

experiment = Experiment(tuner, ['local', 'remote'])
experiment.config.experiment_name = 'test'
experiment.config.trial_concurrency = 3
experiment.config.max_trial_number = 10
experiment.config.search_space = search_space
experiment.config.trial_command = 'python3 mnist.py'
experiment.config.trial_code_directory = Path(__file__).parent
experiment.config.training_service[0].use_active_gpu = True
experiment.config.training_service[1].reuse_mode = True
rm_conf = RemoteMachineConfig()
rm_conf.host = '10.1.1.1'
rm_conf.user = 'xxx'
rm_conf.password = 'xxx'
rm_conf.port = 22
experiment.config.training_service[1].machine_list = [rm_conf]

experiment.run(26780, debug=True)
14 changes: 9 additions & 5 deletions nni/experiment/config/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,19 @@ class ExperimentConfig(ConfigBase):
tuner: Optional[_AlgorithmConfig] = None
accessor: Optional[_AlgorithmConfig] = None
advisor: Optional[_AlgorithmConfig] = None
training_service: TrainingServiceConfig
training_service: Union[TrainingServiceConfig, List[TrainingServiceConfig]]

def __init__(self, training_service_platform: Optional[str] = None, **kwargs):
def __init__(self, training_service_platform: Optional[Union[str, List[str]]] = None, **kwargs):
kwargs = util.case_insensitive(kwargs)
if training_service_platform is not None:
assert 'trainingservice' not in kwargs
kwargs['trainingservice'] = util.training_service_config_factory(training_service_platform)
elif isinstance(kwargs.get('trainingservice'), dict):
kwargs['trainingservice'] = util.training_service_config_factory(**kwargs['trainingservice'])
kwargs['trainingservice'] = util.training_service_config_factory(platform = training_service_platform)
elif isinstance(kwargs.get('trainingservice'), (dict, list)):
# dict means a single training service
# list means hybrid training service
kwargs['trainingservice'] = util.training_service_config_factory(config = kwargs['trainingservice'])
else:
raise RuntimeError('Unsupported Training service configuration!')
super().__init__(**kwargs)

def validate(self, initialized_tuner: bool = False) -> None:
Expand Down
82 changes: 52 additions & 30 deletions nni/experiment/config/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,28 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
data = config.json()

ts = data.pop('trainingService')
if ts['platform'] == 'openpai':
ts['platform'] = 'pai'
if isinstance(ts, list):
hybrid_names = []
for conf in ts:
if conf['platform'] == 'openpai':
conf['platform'] = 'pai'
hybrid_names.append(conf['platform'])
_handle_training_service(conf, data)
data['trainingServicePlatform'] = 'hybrid'
data['hybridConfig'] = {'trainingServicePlatforms': hybrid_names}
else:
if ts['platform'] == 'openpai':
ts['platform'] = 'pai'
data['trainingServicePlatform'] = ts['platform']
_handle_training_service(ts, data)

data['authorName'] = 'N/A'
data['experimentName'] = data.get('experimentName', 'N/A')
data['maxExecDuration'] = data.pop('maxExperimentDuration', '999d')
if data['debug']:
data['versionCheck'] = False
data['maxTrialNum'] = data.pop('maxTrialNumber', 99999)
data['trainingServicePlatform'] = ts['platform']

ss = data.pop('searchSpace', None)
ss_file = data.pop('searchSpaceFile', None)
if ss is not None:
Expand Down Expand Up @@ -66,6 +78,9 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
if 'trialGpuNumber' in data:
data['trial']['gpuNum'] = data.pop('trialGpuNumber')

return data

def _handle_training_service(ts, data):
if ts['platform'] == 'local':
data['localConfig'] = {
'useActiveGpu': ts.get('useActiveGpu', False),
Expand Down Expand Up @@ -140,8 +155,6 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
elif ts['platform'] == 'adl':
data['trial']['image'] = ts['dockerImage']

return data

def _convert_gpu_indices(indices):
return ','.join(str(idx) for idx in indices) if indices is not None else None

Expand Down Expand Up @@ -175,19 +188,34 @@ def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]:
experiment_config = to_v1_yaml(config, skip_nnictl=True)
ret = []

if config.training_service.platform == 'local':
if isinstance(config.training_service, list):
hybrid_conf = dict()
hybrid_conf['hybrid_config'] = experiment_config['hybridConfig']
for conf in config.training_service:
metadata = _get_cluster_metadata(conf.platform, experiment_config)
if metadata is not None:
hybrid_conf.update(metadata)
ret.append(hybrid_conf)
else:
metadata = _get_cluster_metadata(config.training_service.platform, experiment_config)
if metadata is not None:
ret.append(metadata)

if experiment_config.get('nniManagerIp') is not None:
ret.append({'nni_manager_ip': {'nniManagerIp': experiment_config['nniManagerIp']}})
ret.append({'trial_config': experiment_config['trial']})
return ret

def _get_cluster_metadata(platform: str, experiment_config) -> Dict:
if platform == 'local':
request_data = dict()
request_data['local_config'] = experiment_config['localConfig']
if request_data['local_config']:
if request_data['local_config'].get('gpuIndices') and isinstance(request_data['local_config'].get('gpuIndices'), int):
request_data['local_config']['gpuIndices'] = str(request_data['local_config'].get('gpuIndices'))
if request_data['local_config'].get('maxTrialNumOnEachGpu'):
request_data['local_config']['maxTrialNumOnEachGpu'] = request_data['local_config'].get('maxTrialNumOnEachGpu')
if request_data['local_config'].get('useActiveGpu'):
request_data['local_config']['useActiveGpu'] = request_data['local_config'].get('useActiveGpu')
ret.append(request_data)
return request_data

elif config.training_service.platform == 'remote':
elif platform == 'remote':
request_data = dict()
if experiment_config.get('remoteConfig'):
request_data['remote_config'] = experiment_config['remoteConfig']
Expand All @@ -198,31 +226,25 @@ def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]:
for i in range(len(request_data['machine_list'])):
if isinstance(request_data['machine_list'][i].get('gpuIndices'), int):
request_data['machine_list'][i]['gpuIndices'] = str(request_data['machine_list'][i].get('gpuIndices'))
ret.append(request_data)
return request_data

elif config.training_service.platform == 'openpai':
ret.append({'pai_config': experiment_config['paiConfig']})
elif platform == 'openpai':
return {'pai_config': experiment_config['paiConfig']}

elif config.training_service.platform == 'aml':
ret.append({'aml_config': experiment_config['amlConfig']})
elif platform == 'aml':
return {'aml_config': experiment_config['amlConfig']}

elif config.training_service.platform == 'kubeflow':
ret.append({'kubeflow_config': experiment_config['kubeflowConfig']})
elif platform == 'kubeflow':
return {'kubeflow_config': experiment_config['kubeflowConfig']}

elif config.training_service.platform == 'frameworkcontroller':
ret.append({'frameworkcontroller_config': experiment_config['frameworkcontrollerConfig']})
elif platform == 'frameworkcontroller':
return {'frameworkcontroller_config': experiment_config['frameworkcontrollerConfig']}

elif config.training_service.platform == 'adl':
pass
elif platform == 'adl':
return None

else:
raise RuntimeError('Unsupported training service ' + config.training_service.platform)

if experiment_config.get('nniManagerIp') is not None:
ret.append({'nni_manager_ip': {'nniManagerIp': experiment_config['nniManagerIp']}})
ret.append({'trial_config': experiment_config['trial']})
return ret

raise RuntimeError('Unsupported training service ' + platform)

def to_rest_json(config: ExperimentConfig) -> Dict[str, Any]:
experiment_config = to_v1_yaml(config, skip_nnictl=True)
Expand Down
2 changes: 1 addition & 1 deletion nni/experiment/config/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RemoteMachineConfig(ConfigBase):
port: int = 22
user: str
password: Optional[str] = None
ssh_key_file: PathLike = '~/.ssh/id_rsa'
ssh_key_file: PathLike = None #'~/.ssh/id_rsa'
ssh_passphrase: Optional[str] = None
use_active_gpu: bool = False
max_trial_number_per_gpu: int = 1
Expand Down
26 changes: 20 additions & 6 deletions nni/experiment/config/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import math
import os.path
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional, Union, List

PathLike = Union[Path, str]

Expand All @@ -29,12 +29,26 @@ def canonical_path(path: Optional[PathLike]) -> Optional[str]:
def count(*values) -> int:
return sum(value is not None and value is not False for value in values)

def training_service_config_factory(platform: str, **kwargs): # -> TrainingServiceConfig
def training_service_config_factory(platform: Union[str, List[str]] = None, config: Union[List, Dict] = None): # -> TrainingServiceConfig
from .common import TrainingServiceConfig
for cls in TrainingServiceConfig.__subclasses__():
if cls.platform == platform:
return cls(**kwargs)
raise ValueError(f'Unrecognized platform {platform}')
ts_configs = []
if platform is not None:
assert config is None
platforms = platform if isinstance(platform, list) else [platform]
for cls in TrainingServiceConfig.__subclasses__():
if cls.platform in platforms:
ts_configs.append(cls())
if len(ts_configs) < len(platforms):
raise RuntimeError('There is unrecognized platform!')
else:
assert config is not None
supported_platforms = {cls.platform: cls for cls in TrainingServiceConfig.__subclasses__()}
configs = config if isinstance(config, list) else [config]
for conf in configs:
if conf['platform'] not in supported_platforms:
raise RuntimeError(f'Unrecognized platform {conf["platform"]}')
ts_configs.append(supported_platforms[conf['platform']](**conf))
return ts_configs if len(ts_configs) > 1 else ts_configs[0]

def load_config(Type, value):
if isinstance(value, list):
Expand Down
6 changes: 3 additions & 3 deletions nni/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from subprocess import Popen
from threading import Thread
import time
from typing import Optional, overload
from typing import Optional, Union, List, overload

import colorama
import psutil
Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(self, tuner: Tuner, config: ExperimentConfig) -> None:
...

@overload
def __init__(self, tuner: Tuner, training_service: str) -> None:
def __init__(self, tuner: Tuner, training_service: Union[str, List[str]]) -> None:
"""
Prepare an experiment, leaving configuration fields to be set later.
Expand Down Expand Up @@ -86,7 +86,7 @@ def __init__(self, tuner: Tuner, config=None, training_service=None):
self._dispatcher: Optional[MsgDispatcher] = None
self._dispatcher_thread: Optional[Thread] = None

if isinstance(config, str):
if isinstance(config, (str, list)):
config, training_service = None, config

if config is None:
Expand Down
20 changes: 13 additions & 7 deletions nni/experiment/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ def start_experiment(exp_id: str, config: ExperimentConfig, port: int, debug: bo

config.validate(initialized_tuner=True)
_ensure_port_idle(port)
if config.training_service.platform == 'openpai':
_ensure_port_idle(port + 1, 'OpenPAI requires an additional port')
if isinstance(config.training_service, list): # hybrid training service
_ensure_port_idle(port + 1, 'Hybrid training service requires an additional port')
elif config.training_service.platform in ['remote', 'openpai', 'kubeflow', 'frameworkcontroller', 'adl']:
_ensure_port_idle(port + 1, f'{config.training_service.platform} requires an additional port')

try:
_logger.info('Creating experiment %s', colorama.Fore.CYAN + exp_id + colorama.Style.RESET_ALL)
_logger.info('Creating experiment, Experiment ID: %s', colorama.Fore.CYAN + exp_id + colorama.Style.RESET_ALL)
pipe = Pipe(exp_id)
start_time, proc = _start_rest_server(config, port, debug, exp_id, pipe.path)
_logger.info('Connecting IPC pipe...')
Expand All @@ -40,7 +42,8 @@ def start_experiment(exp_id: str, config: ExperimentConfig, port: int, debug: bo
nni.runtime.protocol._out_file = pipe_file
_logger.info('Statring web server...')
_check_rest_server(port)
_save_experiment_information(exp_id, port, start_time, config.training_service.platform,
platform = 'hybrid' if isinstance(config.training_service, list) else config.training_service.platform
_save_experiment_information(exp_id, port, start_time, platform,
config.experiment_name, proc.pid, config.experiment_working_directory)
_logger.info('Setting up...')
_init_experiment(config, port, debug)
Expand All @@ -66,9 +69,12 @@ def _ensure_port_idle(port: int, message: Optional[str] = None) -> None:


def _start_rest_server(config: ExperimentConfig, port: int, debug: bool, experiment_id: str, pipe_path: str) -> Tuple[int, Popen]:
ts = config.training_service.platform
if ts == 'openpai':
ts = 'pai'
if isinstance(config.training_service, list):
ts = 'hybrid'
else:
ts = config.training_service.platform
if ts == 'openpai':
ts = 'pai'

args = {
'port': port,
Expand Down
2 changes: 1 addition & 1 deletion nni/retiarii/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, training_service_platform: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
if training_service_platform is not None:
assert 'training_service' not in kwargs
self.training_service = util.training_service_config_factory(training_service_platform)
self.training_service = util.training_service_config_factory(platform = training_service_platform)

def validate(self, initialized_tuner: bool = False) -> None:
super().validate()
Expand Down
2 changes: 1 addition & 1 deletion nni/tools/nnictl/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ def create_experiment(args):
try:
validate_all_content(experiment_config, config_path)
except Exception as e:
print_error(f'Config validation failed. {repr(e)}')
print_error(f'Config in v1 format validation failed. {repr(e)}')
exit(1)

nni_config.set_config('experimentConfig', experiment_config)
Expand Down

0 comments on commit b177bdc

Please sign in to comment.