Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

support hybrid training service v2.0 config #3251

Merged
merged 12 commits into from
Jan 6, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ remoteConfig:
reuse: true
machineList:
- ip: 10.1.1.1
username: bob
passwd: bob123
#port can be skip if using default ssh port 22
#port: 22
username: xxx
passwd: xxx
port: 22
24 changes: 24 additions & 0 deletions examples/trials/mnist-tfv1/config_hybrid_v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
experimentName: example_mnist
trialConcurrency: 3
maxExperimentDuration: 1h
maxTrialNumber: 10
searchSpaceFile: search_space.json

trialCodeDirectory: .
trialCommand: python3 mnist.py
trialGpuNumber: 0
tuner:
name: TPE
classArgs:
optimize_mode: maximize

trainingService:
- platform: local
- platform: remote
reuseMode: true
machineList:
- host: 10.1.1.1
user: xxx
password: xxx
#port can be skip if using default ssh port 22
port: 22
35 changes: 35 additions & 0 deletions examples/trials/mnist-tfv1/launch_hybrid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# FIXME: For demonstration only. It should not be here

from pathlib import Path

from nni.experiment import Experiment
from nni.experiment import RemoteMachineConfig
from nni.algorithms.hpo.hyperopt_tuner import HyperoptTuner

tuner = HyperoptTuner('tpe')

search_space = {
"dropout_rate": { "_type": "uniform", "_value": [0.5, 0.9] },
"conv_size": { "_type": "choice", "_value": [2, 3, 5, 7] },
"hidden_size": { "_type": "choice", "_value": [124, 512, 1024] },
"batch_size": { "_type": "choice", "_value": [16, 32] },
"learning_rate": { "_type": "choice", "_value": [0.0001, 0.001, 0.01, 0.1] }
}

experiment = Experiment(tuner, ['local', 'remote'])
experiment.config.experiment_name = 'test'
experiment.config.trial_concurrency = 3
experiment.config.max_trial_number = 10
experiment.config.search_space = search_space
experiment.config.trial_command = 'python3 mnist.py'
experiment.config.trial_code_directory = Path(__file__).parent
experiment.config.training_service[0].use_active_gpu = True
experiment.config.training_service[1].reuse_mode = True
rm_conf = RemoteMachineConfig()
rm_conf.host = '10.1.1.1'
rm_conf.user = 'xxx'
rm_conf.password = 'xxx'
rm_conf.port = 22
experiment.config.training_service[1].machine_list = [rm_conf]

experiment.run(26780, debug=True)
14 changes: 9 additions & 5 deletions nni/experiment/config/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,19 @@ class ExperimentConfig(ConfigBase):
tuner: Optional[_AlgorithmConfig] = None
accessor: Optional[_AlgorithmConfig] = None
advisor: Optional[_AlgorithmConfig] = None
training_service: TrainingServiceConfig
training_service: Union[TrainingServiceConfig, List[TrainingServiceConfig]]

def __init__(self, training_service_platform: Optional[str] = None, **kwargs):
def __init__(self, training_service_platform: Optional[Union[str, List[str]]] = None, **kwargs):
kwargs = util.case_insensitive(kwargs)
if training_service_platform is not None:
assert 'trainingservice' not in kwargs
kwargs['trainingservice'] = util.training_service_config_factory(training_service_platform)
elif isinstance(kwargs.get('trainingservice'), dict):
kwargs['trainingservice'] = util.training_service_config_factory(**kwargs['trainingservice'])
kwargs['trainingservice'] = util.training_service_config_factory(platform = training_service_platform)
elif isinstance(kwargs.get('trainingservice'), (dict, list)):
# dict means a single training service
# list means hybrid training service
kwargs['trainingservice'] = util.training_service_config_factory(config = kwargs['trainingservice'])
else:
raise RuntimeError('Unsupported Training service configuration!')
super().__init__(**kwargs)

def validate(self, initialized_tuner: bool = False) -> None:
Expand Down
82 changes: 52 additions & 30 deletions nni/experiment/config/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,28 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
data = config.json()

ts = data.pop('trainingService')
if ts['platform'] == 'openpai':
ts['platform'] = 'pai'
if isinstance(ts, list):
hybrid_names = []
for conf in ts:
if conf['platform'] == 'openpai':
conf['platform'] = 'pai'
hybrid_names.append(conf['platform'])
_handle_training_service(conf, data)
data['trainingServicePlatform'] = 'hybrid'
data['hybridConfig'] = {'trainingServicePlatforms': hybrid_names}
else:
if ts['platform'] == 'openpai':
ts['platform'] = 'pai'
data['trainingServicePlatform'] = ts['platform']
_handle_training_service(ts, data)

data['authorName'] = 'N/A'
data['experimentName'] = data.get('experimentName', 'N/A')
data['maxExecDuration'] = data.pop('maxExperimentDuration', '999d')
if data['debug']:
data['versionCheck'] = False
data['maxTrialNum'] = data.pop('maxTrialNumber', 99999)
data['trainingServicePlatform'] = ts['platform']

ss = data.pop('searchSpace', None)
ss_file = data.pop('searchSpaceFile', None)
if ss is not None:
Expand Down Expand Up @@ -66,6 +78,9 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
if 'trialGpuNumber' in data:
data['trial']['gpuNum'] = data.pop('trialGpuNumber')

return data

def _handle_training_service(ts, data):
if ts['platform'] == 'local':
data['localConfig'] = {
'useActiveGpu': ts.get('useActiveGpu', False),
Expand Down Expand Up @@ -140,8 +155,6 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
elif ts['platform'] == 'adl':
data['trial']['image'] = ts['dockerImage']

return data

def _convert_gpu_indices(indices):
return ','.join(str(idx) for idx in indices) if indices is not None else None

Expand Down Expand Up @@ -175,19 +188,34 @@ def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]:
experiment_config = to_v1_yaml(config, skip_nnictl=True)
ret = []

if config.training_service.platform == 'local':
if isinstance(config.training_service, list):
hybrid_conf = dict()
hybrid_conf['hybrid_config'] = experiment_config['hybridConfig']
for conf in config.training_service:
metadata = _get_cluster_metadata(conf.platform, experiment_config)
if metadata is not None:
hybrid_conf.update(metadata)
ret.append(hybrid_conf)
else:
metadata = _get_cluster_metadata(config.training_service.platform, experiment_config)
if metadata is not None:
ret.append(metadata)

if experiment_config.get('nniManagerIp') is not None:
ret.append({'nni_manager_ip': {'nniManagerIp': experiment_config['nniManagerIp']}})
ret.append({'trial_config': experiment_config['trial']})
return ret

def _get_cluster_metadata(platform: str, experiment_config) -> Dict:
if platform == 'local':
request_data = dict()
request_data['local_config'] = experiment_config['localConfig']
if request_data['local_config']:
if request_data['local_config'].get('gpuIndices') and isinstance(request_data['local_config'].get('gpuIndices'), int):
request_data['local_config']['gpuIndices'] = str(request_data['local_config'].get('gpuIndices'))
if request_data['local_config'].get('maxTrialNumOnEachGpu'):
request_data['local_config']['maxTrialNumOnEachGpu'] = request_data['local_config'].get('maxTrialNumOnEachGpu')
if request_data['local_config'].get('useActiveGpu'):
request_data['local_config']['useActiveGpu'] = request_data['local_config'].get('useActiveGpu')
ret.append(request_data)
return request_data

elif config.training_service.platform == 'remote':
elif platform == 'remote':
request_data = dict()
if experiment_config.get('remoteConfig'):
request_data['remote_config'] = experiment_config['remoteConfig']
Expand All @@ -198,31 +226,25 @@ def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]:
for i in range(len(request_data['machine_list'])):
if isinstance(request_data['machine_list'][i].get('gpuIndices'), int):
request_data['machine_list'][i]['gpuIndices'] = str(request_data['machine_list'][i].get('gpuIndices'))
ret.append(request_data)
return request_data

elif config.training_service.platform == 'openpai':
ret.append({'pai_config': experiment_config['paiConfig']})
elif platform == 'openpai':
return {'pai_config': experiment_config['paiConfig']}

elif config.training_service.platform == 'aml':
ret.append({'aml_config': experiment_config['amlConfig']})
elif platform == 'aml':
return {'aml_config': experiment_config['amlConfig']}

elif config.training_service.platform == 'kubeflow':
ret.append({'kubeflow_config': experiment_config['kubeflowConfig']})
elif platform == 'kubeflow':
return {'kubeflow_config': experiment_config['kubeflowConfig']}

elif config.training_service.platform == 'frameworkcontroller':
ret.append({'frameworkcontroller_config': experiment_config['frameworkcontrollerConfig']})
elif platform == 'frameworkcontroller':
return {'frameworkcontroller_config': experiment_config['frameworkcontrollerConfig']}

elif config.training_service.platform == 'adl':
pass
elif platform == 'adl':
return None

else:
raise RuntimeError('Unsupported training service ' + config.training_service.platform)

if experiment_config.get('nniManagerIp') is not None:
ret.append({'nni_manager_ip': {'nniManagerIp': experiment_config['nniManagerIp']}})
ret.append({'trial_config': experiment_config['trial']})
return ret

raise RuntimeError('Unsupported training service ' + platform)

def to_rest_json(config: ExperimentConfig) -> Dict[str, Any]:
experiment_config = to_v1_yaml(config, skip_nnictl=True)
Expand Down
2 changes: 1 addition & 1 deletion nni/experiment/config/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RemoteMachineConfig(ConfigBase):
port: int = 22
user: str
password: Optional[str] = None
ssh_key_file: PathLike = '~/.ssh/id_rsa'
ssh_key_file: PathLike = None #'~/.ssh/id_rsa'
ssh_passphrase: Optional[str] = None
use_active_gpu: bool = False
max_trial_number_per_gpu: int = 1
Expand Down
26 changes: 20 additions & 6 deletions nni/experiment/config/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import math
import os.path
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional, Union, List

PathLike = Union[Path, str]

Expand All @@ -29,12 +29,26 @@ def canonical_path(path: Optional[PathLike]) -> Optional[str]:
def count(*values) -> int:
return sum(value is not None and value is not False for value in values)

def training_service_config_factory(platform: str, **kwargs): # -> TrainingServiceConfig
def training_service_config_factory(platform: Union[str, List[str]] = None, config: Union[List, Dict] = None): # -> TrainingServiceConfig
from .common import TrainingServiceConfig
for cls in TrainingServiceConfig.__subclasses__():
if cls.platform == platform:
return cls(**kwargs)
raise ValueError(f'Unrecognized platform {platform}')
ts_configs = []
if platform is not None:
assert config is None
platforms = platform if isinstance(platform, list) else [platform]
for cls in TrainingServiceConfig.__subclasses__():
if cls.platform in platforms:
ts_configs.append(cls())
if len(ts_configs) < len(platforms):
raise RuntimeError('There is unrecognized platform!')
else:
assert config is not None
supported_platforms = {cls.platform: cls for cls in TrainingServiceConfig.__subclasses__()}
configs = config if isinstance(config, list) else [config]
for conf in configs:
if conf['platform'] not in supported_platforms:
raise RuntimeError(f'Unrecognized platform {conf["platform"]}')
ts_configs.append(supported_platforms[conf['platform']](**conf))
return ts_configs if len(ts_configs) > 1 else ts_configs[0]

def load_config(Type, value):
if isinstance(value, list):
Expand Down
6 changes: 3 additions & 3 deletions nni/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from subprocess import Popen
from threading import Thread
import time
from typing import Optional, overload
from typing import Optional, Union, List, overload

import colorama
import psutil
Expand Down Expand Up @@ -54,7 +54,7 @@ def __init__(self, tuner: Tuner, config: ExperimentConfig) -> None:
...

@overload
def __init__(self, tuner: Tuner, training_service: str) -> None:
def __init__(self, tuner: Tuner, training_service: Union[str, List[str]]) -> None:
"""
Prepare an experiment, leaving configuration fields to be set later.

Expand Down Expand Up @@ -86,7 +86,7 @@ def __init__(self, tuner: Tuner, config=None, training_service=None):
self._dispatcher: Optional[MsgDispatcher] = None
self._dispatcher_thread: Optional[Thread] = None

if isinstance(config, str):
if isinstance(config, (str, list)):
config, training_service = None, config

if config is None:
Expand Down
20 changes: 13 additions & 7 deletions nni/experiment/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ def start_experiment(exp_id: str, config: ExperimentConfig, port: int, debug: bo

config.validate(initialized_tuner=True)
_ensure_port_idle(port)
if config.training_service.platform == 'openpai':
_ensure_port_idle(port + 1, 'OpenPAI requires an additional port')
if isinstance(config.training_service, list): # hybrid training service
_ensure_port_idle(port + 1, 'Hybrid training service requires an additional port')
elif config.training_service.platform in ['remote', 'openpai', 'kubeflow', 'frameworkcontroller', 'adl']:
_ensure_port_idle(port + 1, f'{config.training_service.platform} requires an additional port')

try:
_logger.info('Creating experiment %s', colorama.Fore.CYAN + exp_id + colorama.Style.RESET_ALL)
_logger.info('Creating experiment, Experiment ID: %s', colorama.Fore.CYAN + exp_id + colorama.Style.RESET_ALL)
pipe = Pipe(exp_id)
start_time, proc = _start_rest_server(config, port, debug, exp_id, pipe.path)
_logger.info('Connecting IPC pipe...')
Expand All @@ -40,7 +42,8 @@ def start_experiment(exp_id: str, config: ExperimentConfig, port: int, debug: bo
nni.runtime.protocol._out_file = pipe_file
_logger.info('Statring web server...')
_check_rest_server(port)
_save_experiment_information(exp_id, port, start_time, config.training_service.platform,
platform = 'hybrid' if isinstance(config.training_service, list) else config.training_service.platform
_save_experiment_information(exp_id, port, start_time, platform,
config.experiment_name, proc.pid, config.experiment_working_directory)
_logger.info('Setting up...')
_init_experiment(config, port, debug)
Expand All @@ -66,9 +69,12 @@ def _ensure_port_idle(port: int, message: Optional[str] = None) -> None:


def _start_rest_server(config: ExperimentConfig, port: int, debug: bool, experiment_id: str, pipe_path: str) -> Tuple[int, Popen]:
ts = config.training_service.platform
if ts == 'openpai':
ts = 'pai'
if isinstance(config.training_service, list):
ts = 'hybrid'
else:
ts = config.training_service.platform
if ts == 'openpai':
ts = 'pai'

args = {
'port': port,
Expand Down
2 changes: 1 addition & 1 deletion nni/retiarii/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, training_service_platform: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
if training_service_platform is not None:
assert 'training_service' not in kwargs
self.training_service = util.training_service_config_factory(training_service_platform)
self.training_service = util.training_service_config_factory(platform = training_service_platform)

def validate(self, initialized_tuner: bool = False) -> None:
super().validate()
Expand Down
2 changes: 1 addition & 1 deletion nni/tools/nnictl/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ def create_experiment(args):
try:
validate_all_content(experiment_config, config_path)
except Exception as e:
print_error(f'Config validation failed. {repr(e)}')
print_error(f'Config in v1 format validation failed. {repr(e)}')
exit(1)

nni_config.set_config('experimentConfig', experiment_config)
Expand Down