Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

support hybrid training service v2.0 config #3251

Merged
merged 12 commits into from
Jan 6, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ trainingServicePlatform: hybrid
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
nniManagerIp: 10.150.144.59
versionCheck: false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove ip and version check

tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
#SMAC (SMAC should be installed through nnictl)
Expand All @@ -25,8 +27,7 @@ hybridConfig:
remoteConfig:
reuse: true
machineList:
- ip: 10.1.1.1
username: bob
passwd: bob123
#port can be skip if using default ssh port 22
#port: 22
- ip: 10.190.175.223
username: xxx
passwd: xxx
port: 22
26 changes: 26 additions & 0 deletions examples/trials/mnist-tfv1/config_hybrid_v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
experimentName: example_mnist
trialConcurrency: 3
maxExperimentDuration: 1h
maxTrialNumber: 10
searchSpaceFile: search_space.json
nniManagerIp: 10.150.144.59
debug: true

trialCodeDirectory: .
trialCommand: python3 mnist.py
trialGpuNumber: 0
tuner:
name: TPE
classArgs:
optimize_mode: maximize

trainingService:
- platform: local
- platform: remote
reuseMode: true
machineList:
- host: 10.190.175.223
user: xxx
password: xxx
#port can be skip if using default ssh port 22
port: 22
36 changes: 36 additions & 0 deletions examples/trials/mnist-tfv1/launch_hybrid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# FIXME: For demonstration only. It should not be here

from pathlib import Path

from nni.experiment import Experiment
from nni.experiment import RemoteMachineConfig
from nni.algorithms.hpo.hyperopt_tuner import HyperoptTuner

tuner = HyperoptTuner('tpe')

search_space = {
"dropout_rate": { "_type": "uniform", "_value": [0.5, 0.9] },
"conv_size": { "_type": "choice", "_value": [2, 3, 5, 7] },
"hidden_size": { "_type": "choice", "_value": [124, 512, 1024] },
"batch_size": { "_type": "choice", "_value": [16, 32] },
"learning_rate": { "_type": "choice", "_value": [0.0001, 0.001, 0.01, 0.1] }
}

experiment = Experiment(tuner, ['local', 'remote'])
experiment.config.experiment_name = 'test'
experiment.config.trial_concurrency = 3
experiment.config.max_trial_number = 10
experiment.config.search_space = search_space
experiment.config.nni_manager_ip = '10.150.144.59'
experiment.config.trial_command = 'python3 mnist.py'
experiment.config.trial_code_directory = Path(__file__).parent
experiment.config.training_service[0].use_active_gpu = True
experiment.config.training_service[1].reuse_mode = True
rm_conf = RemoteMachineConfig()
rm_conf.host = '10.190.175.223'
rm_conf.user = 'xxx'
rm_conf.password = 'xxx'
rm_conf.port = 22
experiment.config.training_service[1].machine_list = [rm_conf]

experiment.run(26780, debug=True)
14 changes: 9 additions & 5 deletions nni/experiment/config/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,19 @@ class ExperimentConfig(ConfigBase):
tuner: Optional[_AlgorithmConfig] = None
accessor: Optional[_AlgorithmConfig] = None
advisor: Optional[_AlgorithmConfig] = None
training_service: TrainingServiceConfig
training_service: Union[TrainingServiceConfig, List[TrainingServiceConfig]]

def __init__(self, training_service_platform: Optional[str] = None, **kwargs):
def __init__(self, training_service_platform: Optional[Union[str, List[str]]] = None, **kwargs):
kwargs = util.case_insensitive(kwargs)
if training_service_platform is not None:
assert 'trainingservice' not in kwargs
kwargs['trainingservice'] = util.training_service_config_factory(training_service_platform)
elif isinstance(kwargs.get('trainingservice'), dict):
kwargs['trainingservice'] = util.training_service_config_factory(**kwargs['trainingservice'])
kwargs['trainingservice'] = util.training_service_config_factory(platform = training_service_platform)
elif isinstance(kwargs.get('trainingservice'), (dict, list)):
# dict means a single training service
# list means hybrid training service
kwargs['trainingservice'] = util.training_service_config_factory(config = kwargs['trainingservice'])
else:
raise RuntimeError('Unsupported Training service configuration!')
super().__init__(**kwargs)

def validate(self, initialized_tuner: bool = False) -> None:
Expand Down
78 changes: 52 additions & 26 deletions nni/experiment/config/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,28 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
data = config.json()

ts = data.pop('trainingService')
if ts['platform'] == 'openpai':
ts['platform'] = 'pai'
if isinstance(ts, list):
hybrid_names = []
for conf in ts:
if conf['platform'] == 'openpai':
conf['platform'] = 'pai'
hybrid_names.append(conf['platform'])
_handle_training_service(conf, data)
data['trainingServicePlatform'] = 'hybrid'
data['hybridConfig'] = {'trainingServicePlatforms': hybrid_names}
else:
if ts['platform'] == 'openpai':
ts['platform'] = 'pai'
data['trainingServicePlatform'] = ts['platform']
_handle_training_service(ts, data)

data['authorName'] = 'N/A'
data['experimentName'] = data.get('experimentName', 'N/A')
data['maxExecDuration'] = data.pop('maxExperimentDuration', '999d')
if data['debug']:
data['versionCheck'] = False
data['maxTrialNum'] = data.pop('maxTrialNumber', 99999)
data['trainingServicePlatform'] = ts['platform']

ss = data.pop('searchSpace', None)
ss_file = data.pop('searchSpaceFile', None)
if ss is not None:
Expand Down Expand Up @@ -66,6 +78,9 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
if 'trialGpuNumber' in data:
data['trial']['gpuNum'] = data.pop('trialGpuNumber')

return data

def _handle_training_service(ts, data):
if ts['platform'] == 'local':
data['localConfig'] = {
'useActiveGpu': ts.get('useActiveGpu', False),
Expand Down Expand Up @@ -140,8 +155,6 @@ def to_v1_yaml(config: ExperimentConfig, skip_nnictl: bool = False) -> Dict[str,
elif ts['platform'] == 'adl':
data['trial']['image'] = ts['dockerImage']

return data

def _convert_gpu_indices(indices):
return ','.join(str(idx) for idx in indices) if indices is not None else None

Expand Down Expand Up @@ -175,7 +188,26 @@ def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]:
experiment_config = to_v1_yaml(config, skip_nnictl=True)
ret = []

if config.training_service.platform == 'local':
if isinstance(config.training_service, list):
hybrid_conf = dict()
hybrid_conf['hybrid_config'] = experiment_config['hybridConfig']
for conf in config.training_service:
metadata = _get_cluster_metadata(conf.platform, experiment_config)
if metadata is not None:
hybrid_conf.update(metadata)
ret.append(hybrid_conf)
else:
metadata = _get_cluster_metadata(config.training_service.platform, experiment_config)
if metadata is not None:
ret.append(metadata)

if experiment_config.get('nniManagerIp') is not None:
ret.append({'nni_manager_ip': {'nniManagerIp': experiment_config['nniManagerIp']}})
ret.append({'trial_config': experiment_config['trial']})
return ret

def _get_cluster_metadata(platform: str, experiment_config) -> Dict:
if platform == 'local':
request_data = dict()
request_data['local_config'] = experiment_config['localConfig']
if request_data['local_config']:
Expand All @@ -185,9 +217,9 @@ def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]:
request_data['local_config']['maxTrialNumOnEachGpu'] = request_data['local_config'].get('maxTrialNumOnEachGpu')
if request_data['local_config'].get('useActiveGpu'):
request_data['local_config']['useActiveGpu'] = request_data['local_config'].get('useActiveGpu')
ret.append(request_data)
return request_data

elif config.training_service.platform == 'remote':
elif platform == 'remote':
request_data = dict()
if experiment_config.get('remoteConfig'):
request_data['remote_config'] = experiment_config['remoteConfig']
Expand All @@ -198,31 +230,25 @@ def to_cluster_metadata(config: ExperimentConfig) -> List[Dict[str, Any]]:
for i in range(len(request_data['machine_list'])):
if isinstance(request_data['machine_list'][i].get('gpuIndices'), int):
request_data['machine_list'][i]['gpuIndices'] = str(request_data['machine_list'][i].get('gpuIndices'))
ret.append(request_data)
return request_data

elif config.training_service.platform == 'openpai':
ret.append({'pai_config': experiment_config['paiConfig']})
elif platform == 'openpai':
return {'pai_config': experiment_config['paiConfig']}

elif config.training_service.platform == 'aml':
ret.append({'aml_config': experiment_config['amlConfig']})
elif platform == 'aml':
return {'aml_config': experiment_config['amlConfig']}

elif config.training_service.platform == 'kubeflow':
ret.append({'kubeflow_config': experiment_config['kubeflowConfig']})
elif platform == 'kubeflow':
return {'kubeflow_config': experiment_config['kubeflowConfig']}

elif config.training_service.platform == 'frameworkcontroller':
ret.append({'frameworkcontroller_config': experiment_config['frameworkcontrollerConfig']})
elif platform == 'frameworkcontroller':
return {'frameworkcontroller_config': experiment_config['frameworkcontrollerConfig']}

elif config.training_service.platform == 'adl':
pass
elif platform == 'adl':
return None

else:
raise RuntimeError('Unsupported training service ' + config.training_service.platform)

if experiment_config.get('nniManagerIp') is not None:
ret.append({'nni_manager_ip': {'nniManagerIp': experiment_config['nniManagerIp']}})
ret.append({'trial_config': experiment_config['trial']})
return ret

raise RuntimeError('Unsupported training service ' + platform)

def to_rest_json(config: ExperimentConfig) -> Dict[str, Any]:
experiment_config = to_v1_yaml(config, skip_nnictl=True)
Expand Down
26 changes: 20 additions & 6 deletions nni/experiment/config/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import math
import os.path
from pathlib import Path
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional, Union, List

PathLike = Union[Path, str]

Expand All @@ -29,12 +29,26 @@ def canonical_path(path: Optional[PathLike]) -> Optional[str]:
def count(*values) -> int:
return sum(value is not None and value is not False for value in values)

def training_service_config_factory(platform: str, **kwargs): # -> TrainingServiceConfig
def training_service_config_factory(platform: Union[str, List[str]] = None, config: Union[List, Dict] = None): # -> TrainingServiceConfig
from .common import TrainingServiceConfig
for cls in TrainingServiceConfig.__subclasses__():
if cls.platform == platform:
return cls(**kwargs)
raise ValueError(f'Unrecognized platform {platform}')
ts_configs = []
if platform is not None:
assert config is None
platforms = platform if isinstance(platform, list) else [platform]
for cls in TrainingServiceConfig.__subclasses__():
if cls.platform in platforms:
ts_configs.append(cls())
if len(ts_configs) < len(platforms):
raise RuntimeError('There is unrecognized platform!')
else:
assert config is not None
supported_platforms = {cls.platform: cls for cls in TrainingServiceConfig.__subclasses__()}
configs = config if isinstance(config, list) else [config]
for conf in configs:
if conf['platform'] not in supported_platforms:
raise RuntimeError(f'Unrecognized platform {conf["platform"]}')
ts_configs.append(supported_platforms[conf['platform']](**conf))
return ts_configs if len(ts_configs) > 1 else ts_configs[0]

def load_config(Type, value):
if isinstance(value, list):
Expand Down
7 changes: 4 additions & 3 deletions nni/experiment/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from subprocess import Popen
from threading import Thread
import time
from typing import Optional, overload
from typing import Optional, Union, List, overload

import colorama
import psutil
Expand Down Expand Up @@ -52,7 +52,7 @@ def __init__(self, tuner: Tuner, config: ExperimentConfig) -> None:
...

@overload
def __init__(self, tuner: Tuner, training_service: str) -> None:
def __init__(self, tuner: Tuner, training_service: Union[str, List[str]]) -> None:
"""
Prepare an experiment, leaving configuration fields to be set later.

Expand Down Expand Up @@ -83,7 +83,7 @@ def __init__(self, tuner: Tuner, config=None, training_service=None):
self._dispatcher: Optional[MsgDispatcher] = None
self._dispatcher_thread: Optional[Thread] = None

if isinstance(config, str):
if isinstance(config, (str, list)):
config, training_service = None, config

if config is None:
Expand Down Expand Up @@ -171,6 +171,7 @@ def run(self, port: int = 8080, debug: bool = False) -> bool:
while True:
time.sleep(10)
status = self.get_status()
print('zqlllll: ', status)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this line used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, forgot to remove it...

if status == 'DONE' or status == 'STOPPED':
return True
if status == 'ERROR':
Expand Down
20 changes: 13 additions & 7 deletions nni/experiment/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ def start_experiment(config: ExperimentConfig, port: int, debug: bool) -> Tuple[

config.validate(initialized_tuner=True)
_ensure_port_idle(port)
if config.training_service.platform == 'openpai':
_ensure_port_idle(port + 1, 'OpenPAI requires an additional port')
if isinstance(config.training_service, list): # hybrid training service
_ensure_port_idle(port + 1, 'Hybrid training service requires an additional port')
elif config.training_service.platform in ['remote', 'openpai', 'kubeflow', 'frameworkcontroller', 'adl']:
_ensure_port_idle(port + 1, f'{config.training_service.platform} requires an additional port')
exp_id = management.generate_experiment_id()

try:
_logger.info('Creating experiment %s%s', colorama.Fore.CYAN, exp_id)
_logger.info('Creating experiment, Experiment ID: %s%s', colorama.Fore.CYAN, exp_id)
pipe = Pipe(exp_id)
start_time, proc = _start_rest_server(config, port, debug, exp_id, pipe.path)
_logger.info('Connecting IPC pipe...')
Expand All @@ -42,7 +44,8 @@ def start_experiment(config: ExperimentConfig, port: int, debug: bool) -> Tuple[
nni.runtime.protocol._out_file = pipe_file
_logger.info('Statring web server...')
_check_rest_server(port)
_save_experiment_information(exp_id, port, start_time, config.training_service.platform,
platform = 'hybrid' if isinstance(config.training_service, list) else config.training_service.platform
_save_experiment_information(exp_id, port, start_time, platform,
config.experiment_name, proc.pid, config.experiment_working_directory)
_logger.info('Setting up...')
_init_experiment(config, port, debug)
Expand All @@ -68,9 +71,12 @@ def _ensure_port_idle(port: int, message: Optional[str] = None) -> None:


def _start_rest_server(config: ExperimentConfig, port: int, debug: bool, experiment_id: str, pipe_path: str) -> Tuple[int, Popen]:
ts = config.training_service.platform
if ts == 'openpai':
ts = 'pai'
if isinstance(config.training_service, list):
ts = 'hybrid'
else:
ts = config.training_service.platform
if ts == 'openpai':
ts = 'pai'

args = {
'port': port,
Expand Down
2 changes: 1 addition & 1 deletion nni/retiarii/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, training_service_platform: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
if training_service_platform is not None:
assert 'training_service' not in kwargs
self.training_service = util.training_service_config_factory(training_service_platform)
self.training_service = util.training_service_config_factory(platform = training_service_platform)

def validate(self, initialized_tuner: bool = False) -> None:
super().validate()
Expand Down
2 changes: 1 addition & 1 deletion nni/runtime/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def init_logger_experiment() -> None:
formatter.format = _colorful_format

log_path = _prepare_log_dir(dispatcher_env_vars.NNI_LOG_DIRECTORY) / 'dispatcher.log'
_setup_root_logger(FileHandler(log_path), logging.DEBUG)
_setup_root_logger(FileHandler(log_path), logging.INFO)


time_format = '%Y-%m-%d %H:%M:%S'
Expand Down
Loading