Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
extend reusable training service to support placement constraint (#3897)
Browse files Browse the repository at this point in the history
  • Loading branch information
hzhua authored Jul 26, 2021
1 parent b0de7c9 commit 3bce692
Show file tree
Hide file tree
Showing 18 changed files with 327 additions and 59 deletions.
29 changes: 29 additions & 0 deletions nni/common/device.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@

from dataclasses import dataclass
from typing import Literal

@dataclass
class GPUDevice:
node_id: str
gpu_id: int
status: Literal['idle', 'busy', 'unknown'] = 'idle'

def __eq__(self, o) -> bool:
return self.node_id == o.node_id and self.gpu_id == o.gpu_id

def __lt__(self, o) -> bool:
if self.node_id < o.node_id:
return True
elif self.node_id > o.node_id:
return False
else:
return self.gpu_id < o.gpu_id

def __repr__(self) -> str:
return "{Environment %s, GPU %d, Status %s}" % (self.node_id, self.gpu_id, self.status)

def __hash__(self) -> int:
return hash(self.node_id + '_' + self.gpu_id)

def set_status(self, status):
self.status = status
1 change: 1 addition & 0 deletions nni/experiment/config/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
@dataclass(init=False)
class LocalConfig(TrainingServiceConfig):
platform: str = 'local'
reuse_mode: bool = False
use_active_gpu: Optional[bool] = None
max_trial_number_per_gpu: int = 1
gpu_indices: Union[List[int], str, int, None] = None
Expand Down
1 change: 1 addition & 0 deletions nni/retiarii/execution/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

_logger = logging.getLogger(__name__)


class BaseGraphData:
def __init__(self, model_script: str, evaluator: Evaluator) -> None:
self.model_script = model_script
Expand Down
3 changes: 2 additions & 1 deletion nni/retiarii/execution/cgo_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@


class CGOExecutionEngine(AbstractExecutionEngine):
def __init__(self, n_model_per_graph=4) -> None:
def __init__(self, devices=None, n_model_per_graph=4) -> None:
self._listeners: List[AbstractGraphListener] = []
self._running_models: Dict[int, Model] = dict()
self.logical_plan_counter = 0
self.n_model_per_graph = n_model_per_graph
self._optimizers = [DedupInputOptimizer()]
self._original_models = {}
self._original_model_to_multi_model = {}
self.devices = [] if devices is None else devices

# register advisor callbacks
advisor = get_advisor()
Expand Down
15 changes: 14 additions & 1 deletion nni/retiarii/experiment/pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from nni.experiment.config.base import ConfigBase, PathLike
from nni.experiment.pipe import Pipe
from nni.tools.nnictl.command_utils import kill_command
from nni.common.device import GPUDevice

from ..codegen import model_to_pytorch_script
from ..converter import convert_to_graph
Expand Down Expand Up @@ -193,13 +194,14 @@ def start(self, port: int = 8080, debug: bool = False) -> None:
"""
atexit.register(self.stop)

devices = self._construct_devices()
# we will probably need a execution engine factory to make this clean and elegant
if self.config.execution_engine == 'base':
from ..execution.base import BaseExecutionEngine
engine = BaseExecutionEngine()
elif self.config.execution_engine == 'cgo':
from ..execution.cgo_engine import CGOExecutionEngine
engine = CGOExecutionEngine()
engine = CGOExecutionEngine(devices = devices)
elif self.config.execution_engine == 'py':
from ..execution.python import PurePythonExecutionEngine
engine = PurePythonExecutionEngine()
Expand Down Expand Up @@ -241,6 +243,17 @@ def start(self, port: int = 8080, debug: bool = False) -> None:
_logger.info('Waiting for experiment to become DONE (you can ctrl+c if there is no running trial jobs)...')
exp_status_checker.join()

def _construct_devices(self):
devices = []
if hasattr(self.config.training_service, 'machine_list'):
for machine_idx, machine in enumerate(self.config.training_service.machine_list):
for gpu_idx in machine.gpu_indices:
devices.append(GPUDevice(machine.host, gpu_idx))
else:
for gpu_idx in self.config.training_service.gpu_indices:
devices.append(GPUDevice('local', gpu_idx))
return devices

def _create_dispatcher(self):
return self._dispatcher

Expand Down
31 changes: 29 additions & 2 deletions nni/retiarii/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,27 @@ def handle_initialize(self, data):
self.handle_update_search_space(data)
send(CommandType.Initialized, '')

def send_trial(self, parameters):
def _validate_placement_constraint(self, placement_constraint):
if placement_constraint is None:
raise ValueError('placement_constraint is None')
if not 'type' in placement_constraint:
raise ValueError('placement_constraint must have `type`')
if not 'gpus' in placement_constraint:
raise ValueError('placement_constraint must have `gpus`')
if placement_constraint['type'] not in ['None', 'GPUNumber', 'Device']:
raise ValueError('placement_constraint.type must be either `None`,. `GPUNumber` or `Device`')
if placement_constraint['type'] == 'None' and len(placement_constraint['gpus']) > 0:
raise ValueError('placement_constraint.gpus must be an empty list when type == None')
if placement_constraint['type'] == 'Device' and len(placement_constraint['gpus']) != 1:
raise ValueError('placement_constraint.gpus must be a list of number (currently only support one host)')
if placement_constraint['type'] == 'Device':
for e in placement_constraint['gpus']:
if not isinstance(e, tuple):
raise ValueError('placement_constraint.gpus must be a list of tuple when type == Device')
if not (len(e) == 2 and isinstance(e[0], str) and isinstance(e[1], int)):
raise ValueError('placement_constraint.gpus`s tuple must be (str, int)')

def send_trial(self, parameters, placement_constraint=None):
"""
Send parameters to NNI.
Expand All @@ -84,10 +104,17 @@ def send_trial(self, parameters):
which will be used for identification in future.
"""
self.parameters_count += 1
if placement_constraint is None:
placement_constraint = {
'type': 'None',
'gpus': []
}
self._validate_placement_constraint(placement_constraint)
new_trial = {
'parameter_id': self.parameters_count,
'parameters': parameters,
'parameter_source': 'algorithm'
'parameter_source': 'algorithm',
'placement_constraint': placement_constraint
}
_logger.debug('New trial sent: %s', new_trial)
send(CommandType.NewTrialJob, json_dumps(new_trial))
Expand Down
4 changes: 2 additions & 2 deletions nni/retiarii/integration_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ def register_advisor(advisor: 'RetiariiAdvisor'):
_advisor = advisor


def send_trial(parameters: dict) -> int:
def send_trial(parameters: dict, placement_constraint=None) -> int:
"""
Send a new trial. Executed on tuner end.
Return a ID that is the unique identifier for this trial.
"""
return get_advisor().send_trial(parameters)
return get_advisor().send_trial(parameters, placement_constraint)


def receive_trial_parameters() -> dict:
Expand Down
2 changes: 1 addition & 1 deletion nni/runtime/msg_dispatcher_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,4 @@ def handle_trial_end(self, data):
hyper_params: the string that is sent by message dispatcher during the creation of trials.
"""
raise NotImplementedError('handle_trial_end not implemented')
raise NotImplementedError('handle_trial_end not implemented')
2 changes: 2 additions & 0 deletions test/retiarii_test/darts/.nniignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
lightning_logs
data
2 changes: 1 addition & 1 deletion test/retiarii_test/darts/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@
exp_config.training_service.use_active_gpu = True
exp_config.training_service.gpu_indices = [1, 2]

exp.run(exp_config, 8081)
exp.run(exp_config, 8081)
64 changes: 64 additions & 0 deletions test/retiarii_test/darts/test_training_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import json
from nni.common.device import GPUDevice
import os
import sys
import torch
from pathlib import Path

import nni.retiarii.evaluator.pytorch.lightning as pl
import nni.retiarii.strategy as strategy
from nni.experiment import RemoteMachineConfig
from nni.retiarii import serialize
from nni.retiarii.experiment.pytorch import RetiariiExperiment, RetiariiExeConfig
from torchvision import transforms
from torchvision.datasets import CIFAR10

from darts_model import CNN

if __name__ == '__main__':
base_model = CNN(32, 3, 16, 10, 8)

train_transform = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
valid_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

train_dataset = serialize(CIFAR10, root='data/cifar10', train=True, download=True, transform=train_transform)
test_dataset = serialize(CIFAR10, root='data/cifar10', train=False, download=True, transform=valid_transform)
trainer = pl.Classification(train_dataloader=pl.DataLoader(train_dataset, batch_size=100),
val_dataloaders=pl.DataLoader(test_dataset, batch_size=100),
max_epochs=1, limit_train_batches=0.2)

simple_strategy = strategy.Random()

exp = RetiariiExperiment(base_model, trainer, [], simple_strategy)

exp_config = RetiariiExeConfig('remote')
exp_config.experiment_name = 'darts_search'
exp_config.trial_concurrency = 2
exp_config.max_trial_number = 10
exp_config.trial_gpu_number = 1
exp_config.training_service.use_active_gpu = True
exp_config.training_service.reuse_mode = True
exp_config.training_service.gpu_indices = [0, 1, 2]

rm_conf = RemoteMachineConfig()
rm_conf.host = '127.0.0.1'
rm_conf.user = 'xxx'
rm_conf.password = 'xxx'
rm_conf.port = 22
rm_conf.python_path = '/home/xxx/py38/bin'
rm_conf.gpu_indices = [0, 1, 2]
rm_conf.use_active_gpu = True
rm_conf.max_trial_number_per_gpu = 3

exp_config.training_service.machine_list = [rm_conf]
exp_config.execution_engine = 'py'

exp.run(exp_config, 8081)
1 change: 1 addition & 0 deletions ts/nni_manager/common/experimentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export interface TrainingServiceConfig {

export interface LocalConfig extends TrainingServiceConfig {
platform: 'local';
reuseMode: boolean;
useActiveGpu?: boolean;
maxTrialNumberPerGpu: number;
gpuIndices?: number[];
Expand Down
21 changes: 20 additions & 1 deletion ts/nni_manager/common/trainingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,31 @@ interface HyperParameters {
readonly index: number;
}

type PlacementConstraintType = 'None' | 'GPUNumber' | 'Device'
interface PlacementConstraint{
readonly type: PlacementConstraintType;
readonly gpus: Array<number> | Array<[string,number]>;
/**
* GPUNumber constraint is in form of Array<number>, e.g., [3] means it must be placed on a node of 3 GPUs
*
* Device constraint is in form of Array<[string,number]>, e.g., [('Node-0',1),('Node-1',0)] means it must be placed on
* Node-0's GPU-1 and Node-1's GPU-0
*/
}
/**
* define TrialJobApplicationForm
*/
interface TrialJobApplicationForm {
readonly sequenceId: number;
readonly hyperParameters: HyperParameters;
readonly placementConstraint?: PlacementConstraint;
}

interface TrialCommandContent {
readonly parameter_id: string;
readonly parameters: string;
readonly parameter_source: string;
readonly placement_constraint?: PlacementConstraint;
}

/**
Expand Down Expand Up @@ -101,5 +120,5 @@ class NNIManagerIpConfig {
export {
TrainingService, TrainingServiceError, TrialJobStatus, TrialJobApplicationForm,
TrainingServiceMetadata, TrialJobDetail, TrialJobMetric, HyperParameters,
NNIManagerIpConfig
NNIManagerIpConfig, PlacementConstraint, TrialCommandContent
};
30 changes: 18 additions & 12 deletions ts/nni_manager/core/nnimanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { ExperimentConfig, toSeconds, toCudaVisibleDevices } from '../common/exp
import { ExperimentManager } from '../common/experimentManager';
import { TensorboardManager } from '../common/tensorboardManager';
import {
TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus
TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, TrialJobStatus, TrialCommandContent
} from '../common/trainingService';
import { delay, getCheckpointDir, getExperimentRootDir, getLogDir, getMsgDispatcherCommand, mkDirP, getTunerProc, getLogLevel, isAlive, killPid } from '../common/utils';
import {
Expand Down Expand Up @@ -178,7 +178,7 @@ class NNIManager implements Manager {
this.config = config;
this.log.info(`Starting experiment: ${this.experimentProfile.id}`);
await this.storeExperimentProfile();

if (this.trainingService === undefined) {
this.log.info('Setup training service...');
this.trainingService = await this.initTrainingService(config);
Expand Down Expand Up @@ -449,8 +449,12 @@ class NNIManager implements Manager {
if (!platform) {
throw new Error('Cannot detect training service platform');
}
const reuseMode = Array.isArray(config.trainingService) || (config.trainingService as any).reuseMode;

if (platform === 'local') {
if (reuseMode) {
const module_ = await import('../training_service/reusable/routerTrainingService');
return await module_.RouterTrainingService.construct(config);
} else if (platform === 'local') {
const module_ = await import('../training_service/local/localTrainingService');
return new module_.LocalTrainingService(config);
} else if (platform === 'frameworkcontroller') {
Expand Down Expand Up @@ -542,13 +546,13 @@ class NNIManager implements Manager {

private async stopTrialJobIfOverMaxDurationTimer(trialJobId: string): Promise<void> {
const trialJobDetail: TrialJobDetail | undefined = this.trialJobs.get(trialJobId);
if(undefined !== trialJobDetail &&
if (undefined !== trialJobDetail &&
trialJobDetail.status === 'RUNNING' &&
trialJobDetail.startTime !== undefined){
const isEarlyStopped = true;
await this.trainingService.cancelTrialJob(trialJobId, isEarlyStopped);
this.log.info(`Trial job ${trialJobId} has stoped because it is over maxTrialDuration.`);
}
trialJobDetail.startTime !== undefined) {
const isEarlyStopped = true;
await this.trainingService.cancelTrialJob(trialJobId, isEarlyStopped);
this.log.info(`Trial job ${trialJobId} has stoped because it is over maxTrialDuration.`);
}
}

private async requestTrialJobsStatus(): Promise<number> {
Expand Down Expand Up @@ -674,7 +678,7 @@ class NNIManager implements Manager {
this.currSubmittedTrialNum++;
this.log.info('submitTrialJob: form:', form);
const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
setTimeout(async ()=> this.stopTrialJobIfOverMaxDurationTimer(trialJobDetail.id), 1000 * this.maxTrialDuration);
setTimeout(async () => this.stopTrialJobIfOverMaxDurationTimer(trialJobDetail.id), 1000 * this.maxTrialDuration);
const Snapshot: TrialJobDetail = Object.assign({}, trialJobDetail);
await this.storeExperimentProfile();
this.trialJobs.set(trialJobDetail.id, Snapshot);
Expand Down Expand Up @@ -747,7 +751,7 @@ class NNIManager implements Manager {

private async onTrialJobMetrics(metric: TrialJobMetric): Promise<void> {
this.log.debug('NNIManager received trial job metrics:', metric);
if (this.trialJobs.has(metric.id)){
if (this.trialJobs.has(metric.id)) {
await this.dataStore.storeMetricData(metric.id, metric.data);
if (this.dispatcher === undefined) {
throw new Error('Error: tuner has not been setup');
Expand Down Expand Up @@ -796,12 +800,14 @@ class NNIManager implements Manager {
this.log.warning('It is not supposed to receive more trials after NO_MORE_TRIAL is set');
this.setStatus('RUNNING');
}
const trialRequestContent: TrialCommandContent = JSON.parse(content);
const form: TrialJobApplicationForm = {
sequenceId: this.experimentProfile.nextSequenceId++,
hyperParameters: {
value: content,
index: 0
}
},
placementConstraint: trialRequestContent.placement_constraint
};
this.waitingTrials.push(form);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ describe('Unit Test for RemoteMachineTrainingService', () => {
hyperParameters: {
value: 'mock hyperparameters',
index: 0
},
placementConstraint: {
type: "None",
gpus: []
}
};
const jobDetail: TrialJobDetail = await remoteMachineTrainingService.submitTrialJob(form);
Expand Down
Loading

0 comments on commit 3bce692

Please sign in to comment.