From 0bae247ffcc22162cd25b01328d0d47aee2f1c88 Mon Sep 17 00:00:00 2001 From: chicm-ms <38930155+chicm-ms@users.noreply.github.com> Date: Wed, 29 Jul 2020 10:37:11 +0800 Subject: [PATCH 1/5] upgrade tensorflow version for docker image (#2732) (#2735) --- deployment/docker/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deployment/docker/Dockerfile b/deployment/docker/Dockerfile index 1f3b75b519..5e33cc6047 100644 --- a/deployment/docker/Dockerfile +++ b/deployment/docker/Dockerfile @@ -44,9 +44,9 @@ RUN python3 -m pip --no-cache-dir install \ numpy==1.14.3 scipy==1.1.0 # -# Tensorflow 1.10.0 +# Tensorflow 1.15 # -RUN python3 -m pip --no-cache-dir install tensorflow-gpu==1.10.0 +RUN python3 -m pip --no-cache-dir install tensorflow-gpu==1.15 # # Keras 2.1.6 From 98e86841cea50c1e0183d2c96be23507562c029f Mon Sep 17 00:00:00 2001 From: Junwei Sun <30487595+JunweiSUN@users.noreply.github.com> Date: Wed, 29 Jul 2020 10:54:08 +0800 Subject: [PATCH 2/5] update package version of docker doc (#2720) --- deployment/docker/README.md | 14 ++++++++------ docs/en_US/TrainingService/AMLMode.md | 4 ++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/deployment/docker/README.md b/deployment/docker/README.md index f15b9ddd6a..aa8dae38a4 100644 --- a/deployment/docker/README.md +++ b/deployment/docker/README.md @@ -4,15 +4,17 @@ Dockerfile This is the Dockerfile of NNI project. It includes serveral popular deep learning frameworks and NNI. It is tested on `Ubuntu 16.04 LTS`: ``` -CUDA 9.0, CuDNN 7.0 -numpy 1.14.3,scipy 1.1.0 -TensorFlow-gpu 1.10.0 -Keras 2.1.6 -PyTorch 0.4.1 +CUDA 9.0 +CuDNN 7.0 +numpy 1.14.3 +scipy 1.1.0 +tensorflow-gpu 1.15.0 +keras 2.1.6 +torch 1.4.0 scikit-learn 0.20.0 pandas 0.23.4 lightgbm 2.2.2 -NNI v0.7 +nni ``` You can take this Dockerfile as a reference for your own customized Dockerfile. diff --git a/docs/en_US/TrainingService/AMLMode.md b/docs/en_US/TrainingService/AMLMode.md index b77da982af..0907f29a02 100644 --- a/docs/en_US/TrainingService/AMLMode.md +++ b/docs/en_US/TrainingService/AMLMode.md @@ -22,8 +22,8 @@ Step 6. Create an AML cluster as the computeTarget. Step 7. Open a command line and install AML package environment. ``` -python3 -m pip install azureml --user -python3 -m pip install azureml-sdk --user +python3 -m pip install azureml +python3 -m pip install azureml-sdk ``` ## Run an experiment From 2e4a9d983a20c2b301263549d66f9c20eabe659c Mon Sep 17 00:00:00 2001 From: SparkSnail Date: Wed, 29 Jul 2020 11:08:51 +0800 Subject: [PATCH 3/5] Fix pipeline (#2741) --- test/config/integration_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/config/integration_tests.yml b/test/config/integration_tests.yml index 6e7e469309..b3802239da 100644 --- a/test/config/integration_tests.yml +++ b/test/config/integration_tests.yml @@ -158,7 +158,7 @@ testCases: configFile: test/config/examples/sklearn-regression.yml setExperimentIdtoVar: $resumeExpId # for subfolder in codedir test - launchCommand: mkdir ../examples/trials/sklearn/regression/subfolder && touch ../examples/trials/sklearn/regression/subfolder/subfile && nnictl create --config $configFile --debug + launchCommand: mkdir -p ../examples/trials/sklearn/regression/subfolder && touch ../examples/trials/sklearn/regression/subfolder/subfile && nnictl create --config $configFile --debug # Experiment resume test part 2 - name: nnictl-resume-2 From 8a20c3484309df3352bdf003950bc7f4a81dffdd Mon Sep 17 00:00:00 2001 From: SparkSnail Date: Thu, 30 Jul 2020 19:47:31 +0800 Subject: [PATCH 4/5] add trial job detail link (#2703) --- src/nni_manager/training_service/pai/paiConfig.ts | 4 +++- src/nni_manager/training_service/pai/paiJobInfoCollector.ts | 2 +- .../training_service/pai/paiK8S/paiK8STrainingService.ts | 4 +++- src/nni_manager/training_service/reusable/trialDispatcher.ts | 1 + 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/nni_manager/training_service/pai/paiConfig.ts b/src/nni_manager/training_service/pai/paiConfig.ts index eceda619c7..fa38d007d7 100644 --- a/src/nni_manager/training_service/pai/paiConfig.ts +++ b/src/nni_manager/training_service/pai/paiConfig.ts @@ -45,9 +45,10 @@ export class PAITrialJobDetail implements TrialJobDetail { public form: TrialJobApplicationForm; public logPath: string; public isEarlyStopped?: boolean; + public paiJobDetailUrl?: string; constructor(id: string, status: TrialJobStatus, paiJobName: string, - submitTime: number, workingDirectory: string, form: TrialJobApplicationForm, logPath: string) { + submitTime: number, workingDirectory: string, form: TrialJobApplicationForm, logPath: string, paiJobDetailUrl?: string) { this.id = id; this.status = status; this.paiJobName = paiJobName; @@ -56,5 +57,6 @@ export class PAITrialJobDetail implements TrialJobDetail { this.form = form; this.tags = []; this.logPath = logPath; + this.paiJobDetailUrl = paiJobDetailUrl; } } diff --git a/src/nni_manager/training_service/pai/paiJobInfoCollector.ts b/src/nni_manager/training_service/pai/paiJobInfoCollector.ts index eb15765a4f..2590547849 100644 --- a/src/nni_manager/training_service/pai/paiJobInfoCollector.ts +++ b/src/nni_manager/training_service/pai/paiJobInfoCollector.ts @@ -84,7 +84,7 @@ export class PAIJobInfoCollector { if (response.body.jobStatus.appTrackingUrl) { paiTrialJob.url = response.body.jobStatus.appTrackingUrl; } else { - paiTrialJob.url = paiTrialJob.logPath; + paiTrialJob.url = paiTrialJob.paiJobDetailUrl; } } break; diff --git a/src/nni_manager/training_service/pai/paiK8S/paiK8STrainingService.ts b/src/nni_manager/training_service/pai/paiK8S/paiK8STrainingService.ts index 85dcf66752..59bd994535 100644 --- a/src/nni_manager/training_service/pai/paiK8S/paiK8STrainingService.ts +++ b/src/nni_manager/training_service/pai/paiK8S/paiK8STrainingService.ts @@ -124,6 +124,7 @@ class PAIK8STrainingService extends PAITrainingService { const trialWorkingFolder: string = path.join(this.expRootDir, 'trials', trialJobId); const paiJobName: string = `nni_exp_${this.experimentId}_trial_${trialJobId}`; const logPath: string = path.join(this.paiTrialConfig.nniManagerNFSMountPath, this.experimentId, trialJobId); + const paiJobDetailUrl: string = `${this.protocol}://${this.paiClusterConfig.host}/job-detail.html?username=${this.paiClusterConfig.userName}&jobName=${paiJobName}`; const trialJobDetail: PAITrialJobDetail = new PAITrialJobDetail( trialJobId, 'WAITING', @@ -131,7 +132,8 @@ class PAIK8STrainingService extends PAITrainingService { Date.now(), trialWorkingFolder, form, - logPath); + logPath, + paiJobDetailUrl); this.trialJobsMap.set(trialJobId, trialJobDetail); this.jobQueue.push(trialJobId); diff --git a/src/nni_manager/training_service/reusable/trialDispatcher.ts b/src/nni_manager/training_service/reusable/trialDispatcher.ts index 156909e129..ff324899c6 100644 --- a/src/nni_manager/training_service/reusable/trialDispatcher.ts +++ b/src/nni_manager/training_service/reusable/trialDispatcher.ts @@ -362,6 +362,7 @@ class TrialDispatcher implements TrainingService { liveTrialsCount++; continue; } + trial.url = environment.trackingUrl; const environmentStatus = environment.status; // any node exit, then make sure the whole trial stopped. From 143c6615a18cd9dbc1d84a56cbfcbe325fb9ac58 Mon Sep 17 00:00:00 2001 From: Chi Song <27178119+squirrelsc@users.noreply.github.com> Date: Thu, 30 Jul 2020 23:02:47 +0800 Subject: [PATCH 5/5] Reusable environment support GPU scheduler, add test cases and refactoring. (#2627) --- src/nni_manager/common/utils.ts | 7 +- src/nni_manager/package.json | 1 + .../rest_server/restValidationSchemas.ts | 5 + .../training_service/common/gpuData.ts | 24 + .../training_service/common/trialConfig.ts | 4 + .../training_service/pai/paiConfig.ts | 15 +- .../remote_machine/gpuScheduler.ts | 26 +- .../remote_machine/remoteMachineData.ts | 25 +- .../remoteMachineTrainingService.ts | 9 +- .../reusable/channels/amlCommandChannel.ts | 11 +- .../training_service/reusable/environment.ts | 125 ++- .../environments/amlEnvironmentService.ts | 48 +- .../environments/openPaiEnvironmentService.ts | 74 +- .../training_service/reusable/gpuScheduler.ts | 235 ++++++ .../reusable/storageService.ts | 4 +- .../storages/mountedStorageService.ts | 10 +- .../test/mountedStorageService.test.ts | 125 +++ .../reusable/test/trialDispatcher.test.ts | 712 ++++++++++++++++++ .../reusable/test/utCommandChannel.ts | 57 ++ .../reusable/test/utEnvironmentService.ts | 76 ++ .../training_service/reusable/trial.ts | 2 +- .../reusable/trialDispatcher.ts | 271 +++++-- src/nni_manager/yarn.lock | 16 +- test/nni_test/nnitest/generate_ts_config.py | 3 + test/pipelines/pipelines-it-pai.yml | 2 +- tools/nni_cmd/config_schema.py | 103 +-- tools/nni_trial_tool/base_channel.py | 6 +- tools/nni_trial_tool/log_utils.py | 6 +- tools/nni_trial_tool/trial.py | 10 +- tools/nni_trial_tool/web_channel.py | 4 +- 30 files changed, 1750 insertions(+), 266 deletions(-) create mode 100644 src/nni_manager/training_service/reusable/gpuScheduler.ts create mode 100644 src/nni_manager/training_service/reusable/test/mountedStorageService.test.ts create mode 100644 src/nni_manager/training_service/reusable/test/trialDispatcher.test.ts create mode 100644 src/nni_manager/training_service/reusable/test/utCommandChannel.ts create mode 100644 src/nni_manager/training_service/reusable/test/utEnvironmentService.ts diff --git a/src/nni_manager/common/utils.ts b/src/nni_manager/common/utils.ts index 7150ca19f9..99c2d4e0c3 100644 --- a/src/nni_manager/common/utils.ts +++ b/src/nni_manager/common/utils.ts @@ -222,15 +222,16 @@ function getIPV4Address(): string { return cachedipv4Address; } - if (os.networkInterfaces().eth0) { - for (const item of os.networkInterfaces().eth0) { + const networkInterfaces = os.networkInterfaces(); + if (networkInterfaces.eth0) { + for (const item of networkInterfaces.eth0) { if (item.family === 'IPv4') { cachedipv4Address = item.address; return cachedipv4Address; } } } else { - throw Error('getIPV4Address() failed because os.networkInterfaces().eth0 is undefined.'); + throw Error(`getIPV4Address() failed because os.networkInterfaces().eth0 is undefined. Please specify NNI manager IP in config.`); } throw Error('getIPV4Address() failed because no valid IPv4 address found.') diff --git a/src/nni_manager/package.json b/src/nni_manager/package.json index fb0340e5ab..21d7a7f02a 100644 --- a/src/nni_manager/package.json +++ b/src/nni_manager/package.json @@ -39,6 +39,7 @@ "@types/express": "^4.16.0", "@types/glob": "^7.1.1", "@types/js-base64": "^2.3.1", + "@types/js-yaml": "^3.12.5", "@types/mocha": "^5.2.5", "@types/node": "10.12.18", "@types/request": "^2.47.1", diff --git a/src/nni_manager/rest_server/restValidationSchemas.ts b/src/nni_manager/rest_server/restValidationSchemas.ts index 302073707e..a480501a79 100644 --- a/src/nni_manager/rest_server/restValidationSchemas.ts +++ b/src/nni_manager/rest_server/restValidationSchemas.ts @@ -107,6 +107,11 @@ export namespace ValidationSchemas { token: joi.string().min(1), host: joi.string().min(1).required(), reuse: joi.boolean(), + cpuNum: joi.number().min(1), + memoryMB: joi.number().min(100), + gpuNum: joi.number().min(1), + maxTrialNumPerGpu: joi.number(), + useActiveGpu: joi.boolean(), }), kubeflow_config: joi.object({ // eslint-disable-line @typescript-eslint/camelcase operator: joi.string().min(1).required(), diff --git a/src/nni_manager/training_service/common/gpuData.ts b/src/nni_manager/training_service/common/gpuData.ts index 655c885d59..835569255e 100644 --- a/src/nni_manager/training_service/common/gpuData.ts +++ b/src/nni_manager/training_service/common/gpuData.ts @@ -3,6 +3,17 @@ 'use strict'; +export enum ScheduleResultType { + // Schedule succeeded + SUCCEED, + + // Temporarily, no enough available GPU right now + TMP_NO_AVAILABLE_GPU, + + // Cannot match requirement even if all GPU are a + REQUIRE_EXCEED_TOTAL +} + /** * GPU Infromation class * Representing the dynamic and static information retrieved from Nvidia-smi @@ -52,6 +63,19 @@ export class GPUSummary { } } + +export function parseGpuIndices(gpuIndices?: string): Set | undefined { + if (gpuIndices !== undefined) { + const indices: number[] = gpuIndices.split(',') + .map((x: string) => parseInt(x, 10)); + if (indices.length > 0) { + return new Set(indices); + } else { + throw new Error('gpuIndices can not be empty if specified.'); + } + } +} + export const GPU_INFO_COLLECTOR_FORMAT_WINDOWS: string = ` $env:METRIC_OUTPUT_DIR="{0}" diff --git a/src/nni_manager/training_service/common/trialConfig.ts b/src/nni_manager/training_service/common/trialConfig.ts index ae827fa6be..5458eaa325 100644 --- a/src/nni_manager/training_service/common/trialConfig.ts +++ b/src/nni_manager/training_service/common/trialConfig.ts @@ -17,6 +17,10 @@ export class TrialConfig { // Required GPU number for trial job. The number should be in [0,100] public readonly gpuNum: number; + // this flag uses for UT now. + // in future, all environments should be reusable, and this can be configurable by user. + public reuseEnvironment: boolean | undefined = true; + /** * Constructor * @param command Trail command diff --git a/src/nni_manager/training_service/pai/paiConfig.ts b/src/nni_manager/training_service/pai/paiConfig.ts index fa38d007d7..689f506483 100644 --- a/src/nni_manager/training_service/pai/paiConfig.ts +++ b/src/nni_manager/training_service/pai/paiConfig.ts @@ -3,7 +3,7 @@ 'use strict'; -import { TrialJobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../common/trainingService'; +import { TrialJobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../common/trainingService'; export class PAIClusterConfig { public readonly userName: string; @@ -12,6 +12,13 @@ export class PAIClusterConfig { public readonly token?: string; public readonly reuse?: boolean; + public cpuNum?: number; + public memoryMB?: number; + public gpuNum?: number; + + public useActiveGpu?: boolean; + public maxTrialNumPerGpu?: number; + /** * Constructor * @param userName User name of PAI Cluster @@ -20,12 +27,16 @@ export class PAIClusterConfig { * @param token PAI token of PAI Cluster * @param reuse If job is reusable for multiple trials */ - constructor(userName: string, host: string, passWord?: string, token?: string, reuse?: boolean) { + constructor(userName: string, host: string, passWord?: string, token?: string, reuse?: boolean, + cpuNum?: number, memoryMB?: number, gpuNum?: number) { this.userName = userName; this.passWord = passWord; this.host = host; this.token = token; this.reuse = reuse; + this.cpuNum = cpuNum; + this.memoryMB = memoryMB; + this.gpuNum = gpuNum; } } diff --git a/src/nni_manager/training_service/remote_machine/gpuScheduler.ts b/src/nni_manager/training_service/remote_machine/gpuScheduler.ts index 62ef3312bd..e0ca826b85 100644 --- a/src/nni_manager/training_service/remote_machine/gpuScheduler.ts +++ b/src/nni_manager/training_service/remote_machine/gpuScheduler.ts @@ -6,10 +6,8 @@ import * as assert from 'assert'; import { getLogger, Logger } from '../../common/log'; import { randomSelect } from '../../common/utils'; -import { GPUInfo } from '../common/gpuData'; -import { - parseGpuIndices, RemoteMachineMeta, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail, ScheduleResultType, ExecutorManager -} from './remoteMachineData'; +import { GPUInfo, parseGpuIndices, ScheduleResultType } from '../common/gpuData'; +import { ExecutorManager, RemoteMachineMeta, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail } from './remoteMachineData'; type SCHEDULE_POLICY_NAME = 'random' | 'round-robin'; @@ -39,7 +37,7 @@ export class GPUScheduler { * @param requiredGPUNum required GPU number */ public scheduleMachine(requiredGPUNum: number | undefined, trialJobDetail: RemoteMachineTrialJobDetail): RemoteMachineScheduleResult { - if(requiredGPUNum === undefined) { + if (requiredGPUNum === undefined) { requiredGPUNum = 0; } assert(requiredGPUNum >= 0); @@ -48,7 +46,7 @@ export class GPUScheduler { // Step 1: Check if required GPU number not exceeds the total GPU number in all machines const eligibleRM: RemoteMachineMeta[] = allRMs.filter((rmMeta: RemoteMachineMeta) => - rmMeta.gpuSummary === undefined || requiredGPUNum === 0 || (requiredGPUNum !== undefined && rmMeta.gpuSummary.gpuCount >= requiredGPUNum)); + rmMeta.gpuSummary === undefined || requiredGPUNum === 0 || (requiredGPUNum !== undefined && rmMeta.gpuSummary.gpuCount >= requiredGPUNum)); if (eligibleRM.length === 0) { // If the required gpu number exceeds the upper limit of all machine's GPU number // Return REQUIRE_EXCEED_TOTAL directly @@ -75,8 +73,8 @@ export class GPUScheduler { this.log.warning(`Scheduler: trialJob id ${trialJobDetail.id}, no machine can be scheduled, return TMP_NO_AVAILABLE_GPU `); return { - resultType : ScheduleResultType.TMP_NO_AVAILABLE_GPU, - scheduleInfo : undefined + resultType: ScheduleResultType.TMP_NO_AVAILABLE_GPU, + scheduleInfo: undefined }; } @@ -159,7 +157,7 @@ export class GPUScheduler { const num: number | undefined = rmMeta.occupiedGpuIndexMap.get(gpuInfo.index); const maxTrialNumPerGpu: number = rmMeta.maxTrialNumPerGpu ? rmMeta.maxTrialNumPerGpu : 1; if ((num === undefined && (!rmMeta.useActiveGpu && gpuInfo.activeProcessNum === 0 || rmMeta.useActiveGpu)) || - (num !== undefined && num < maxTrialNumPerGpu)) { + (num !== undefined && num < maxTrialNumPerGpu)) { availableGPUs.push(gpuInfo); } } else { @@ -200,7 +198,7 @@ export class GPUScheduler { } private allocateHost(requiredGPUNum: number, rmMeta: RemoteMachineMeta, - gpuInfos: GPUInfo[], trialJobDetail: RemoteMachineTrialJobDetail): RemoteMachineScheduleResult { + gpuInfos: GPUInfo[], trialJobDetail: RemoteMachineTrialJobDetail): RemoteMachineScheduleResult { assert(gpuInfos.length >= requiredGPUNum); const allocatedGPUs: GPUInfo[] = this.selectGPUsForTrial(gpuInfos, requiredGPUNum); allocatedGPUs.forEach((gpuInfo: GPUInfo) => { @@ -222,10 +220,10 @@ export class GPUScheduler { scheduleInfo: { rmMeta: rmMeta, cudaVisibleDevice: allocatedGPUs - .map((gpuInfo: GPUInfo) => { - return gpuInfo.index; - }) - .join(',') + .map((gpuInfo: GPUInfo) => { + return gpuInfo.index; + }) + .join(',') } }; } diff --git a/src/nni_manager/training_service/remote_machine/remoteMachineData.ts b/src/nni_manager/training_service/remote_machine/remoteMachineData.ts index 28d49762f4..61024c1fdd 100644 --- a/src/nni_manager/training_service/remote_machine/remoteMachineData.ts +++ b/src/nni_manager/training_service/remote_machine/remoteMachineData.ts @@ -4,7 +4,7 @@ 'use strict'; import { TrialJobApplicationForm, TrialJobDetail, TrialJobStatus } from '../../common/trainingService'; -import { GPUInfo, GPUSummary } from '../common/gpuData'; +import { GPUInfo, GPUSummary, ScheduleResultType } from '../common/gpuData'; import { ShellExecutor } from './shellExecutor'; /** @@ -25,18 +25,6 @@ export class RemoteMachineMeta { public readonly useActiveGpu?: boolean = false; } -export function parseGpuIndices(gpuIndices?: string): Set | undefined { - if (gpuIndices !== undefined) { - const indices: number[] = gpuIndices.split(',') - .map((x: string) => parseInt(x, 10)); - if (indices.length > 0) { - return new Set(indices); - } else { - throw new Error('gpuIndices can not be empty if specified.'); - } - } -} - /** * The execution result for command executed on remote machine */ @@ -168,14 +156,3 @@ export class ExecutorManager { export type RemoteMachineScheduleResult = { scheduleInfo: RemoteMachineScheduleInfo | undefined; resultType: ScheduleResultType }; export type RemoteMachineScheduleInfo = { rmMeta: RemoteMachineMeta; cudaVisibleDevice: string }; - -export enum ScheduleResultType { - // Schedule succeeded - SUCCEED, - - // Temporarily, no enough available GPU right now - TMP_NO_AVAILABLE_GPU, - - // Cannot match requirement even if all GPU are a - REQUIRE_EXCEED_TOTAL -} diff --git a/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts b/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts index 0fa8e1305b..c997b03a01 100644 --- a/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts +++ b/src/nni_manager/training_service/remote_machine/remoteMachineTrainingService.ts @@ -7,6 +7,7 @@ import * as assert from 'assert'; import { EventEmitter } from 'events'; import * as fs from 'fs'; import * as path from 'path'; +import { ShellExecutor } from 'training_service/remote_machine/shellExecutor'; import { Deferred } from 'ts-deferred'; import * as component from '../../common/component'; import { NNIError, NNIErrorNames } from '../../common/errors'; @@ -22,18 +23,16 @@ import { getVersion, uniqueString } from '../../common/utils'; import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData'; -import { GPUSummary } from '../common/gpuData'; +import { GPUSummary, ScheduleResultType } from '../common/gpuData'; import { TrialConfig } from '../common/trialConfig'; import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; import { execMkdir, validateCodeDir } from '../common/util'; import { GPUScheduler } from './gpuScheduler'; import { - RemoteMachineMeta, - RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail, - ScheduleResultType, ExecutorManager + ExecutorManager, RemoteMachineMeta, + RemoteMachineScheduleInfo, RemoteMachineScheduleResult, RemoteMachineTrialJobDetail } from './remoteMachineData'; import { RemoteMachineJobRestServer } from './remoteMachineJobRestServer'; -import { ShellExecutor } from 'training_service/remote_machine/shellExecutor'; /** * Training Service implementation for Remote Machine (Linux) diff --git a/src/nni_manager/training_service/reusable/channels/amlCommandChannel.ts b/src/nni_manager/training_service/reusable/channels/amlCommandChannel.ts index 6fbdf40cef..5816a9c780 100644 --- a/src/nni_manager/training_service/reusable/channels/amlCommandChannel.ts +++ b/src/nni_manager/training_service/reusable/channels/amlCommandChannel.ts @@ -3,7 +3,6 @@ 'use strict'; -import { EventEmitter } from 'events'; import { delay } from "../../../common/utils"; import { AMLEnvironmentInformation } from '../aml/amlConfig'; import { CommandChannel, RunnerConnection } from "../commandChannel"; @@ -15,11 +14,7 @@ class AMLRunnerConnection extends RunnerConnection { export class AMLCommandChannel extends CommandChannel { private stopping: boolean = false; private sendQueues: [EnvironmentInformation, string][] = []; - private readonly NNI_METRICS_PATTERN: string = `NNISDK_MEb'(?.*?)'`; - - public constructor(commandEmitter: EventEmitter) { - super(commandEmitter); - } + public get channelName(): Channel { return "aml"; } @@ -99,11 +94,11 @@ export class AMLCommandChannel extends CommandChannel { const messages = command['trial_runner']; if (messages) { if (messages instanceof Object && currentMessageIndex < messages.length - 1) { - for (let index = currentMessageIndex + 1; index < messages.length; index ++) { + for (let index = currentMessageIndex + 1; index < messages.length; index++) { this.handleCommand(runnerConnection.environment, messages[index]); } currentMessageIndex = messages.length - 1; - } else if (currentMessageIndex === -1){ + } else if (currentMessageIndex === -1) { this.handleCommand(runnerConnection.environment, messages); currentMessageIndex += 1; } diff --git a/src/nni_manager/training_service/reusable/environment.ts b/src/nni_manager/training_service/reusable/environment.ts index 547ddc43ca..ee9e2251ee 100644 --- a/src/nni_manager/training_service/reusable/environment.ts +++ b/src/nni_manager/training_service/reusable/environment.ts @@ -3,10 +3,10 @@ 'use strict'; -import { GPUSummary } from "training_service/common/gpuData"; +import { EventEmitter } from "events"; import { getLogger, Logger } from "../../common/log"; import { TrialJobStatus } from "../../common/trainingService"; -import { EventEmitter } from "events"; +import { GPUInfo } from "../../training_service/common/gpuData"; import { WebCommandChannel } from "./channels/webCommandChannel"; import { CommandChannel } from "./commandChannel"; @@ -14,24 +14,50 @@ import { CommandChannel } from "./commandChannel"; export type EnvironmentStatus = 'UNKNOWN' | 'WAITING' | 'RUNNING' | 'SUCCEEDED' | 'FAILED' | 'USER_CANCELED'; export type Channel = "web" | "file" | "aml" | "ut"; + +export class TrialGpuSummary { + // GPU count on the machine + public gpuCount: number; + // The timestamp when GPU summary data queried + public timestamp: string; + // The array of GPU information for each GPU card + public gpuInfos: GPUInfo[]; + // GPU assigned status + public assignedGpuIndexMap: Map = new Map(); + + constructor(gpuCount: number, timestamp: string, gpuInfos: GPUInfo[]) { + this.gpuCount = gpuCount; + this.timestamp = timestamp; + this.gpuInfos = gpuInfos; + } +} + export class EnvironmentInformation { + // node id is 5 chars, so won't conflict. + private readonly defaultNodeId = "default"; private log: Logger; - - // NNI environment ID - public id: string; - // training platform unique job ID. - public jobId: string; - // training platform job friendly name, in case it's different with job ID. - public jobName: string; + private isNoGpuWarned: boolean = false; // key states - // true: environment is ready to run trial. - public isIdle: boolean = false; // true: environment is running, waiting, or unknown. public isAlive: boolean = true; + // true: Runner is initialized, and can receive trials. + public isRunnerReady: boolean = false; // don't set status in environment directly, use setFinalState function to set a final state. public status: EnvironmentStatus = "UNKNOWN"; + // true: environment is ready to run trial. + public runningTrialCount: number = 0; + // uses to count how many trial runs on this environment. + // it can be used in many scenarios, but for now, it uses for reusable. + public assignedTrialCount: number = 0; + + // NNI environment ID + public id: string; + // training platform unique job ID. + public envId: string; + // training platform job friendly name, in case it's different with job ID. + public name: string; public trackingUrl: string = ""; public workingFolder: string = ""; public runnerWorkingFolder: string = ""; @@ -40,41 +66,82 @@ export class EnvironmentInformation { // it's used to aggregate node status for multiple node trial public nodes: Map; - public gpuSummary: Map = new Map(); + public gpuSummaries: Map = new Map(); - constructor(id: string, jobName: string, jobId?: string) { + // use can specify which gpus can be used by NNI. + // it's usable for sharable environment like remote machine. + public usableGpus?: number[]; + // user can specify how to use GPU resource for an environment, like local and remote. + public maxTrialNumberPerGpu?: number; + public useActiveGpu?: boolean; + + constructor(id: string, name: string, envId?: string) { this.log = getLogger(); this.id = id; - this.jobName = jobName; - this.jobId = jobId ? jobId : jobName; + this.name = name; + this.envId = envId ? envId : name; this.nodes = new Map(); } - public setFinalStatus(status: EnvironmentStatus): void { - switch (status) { - case 'WAITING': - case 'SUCCEEDED': - case 'FAILED': - case 'USER_CANCELED': - this.status = status; - break; - default: - this.log.error(`Environment: job ${this.jobId} set an invalid final state ${status}.`); - break; + public setStatus(status: EnvironmentStatus): void { + if (this.status !== status) { + this.log.info(`EnvironmentInformation: ${this.envId} change status from ${this.status} to ${status}.`) + this.status = status; + } + } + + public setGpuSummary(nodeId: string, newGpuSummary: TrialGpuSummary): void { + if (nodeId === null || nodeId === undefined) { + nodeId = this.defaultNodeId; + } + + const originalGpuSummary = this.gpuSummaries.get(nodeId); + if (undefined === originalGpuSummary) { + newGpuSummary.assignedGpuIndexMap = new Map(); + this.gpuSummaries.set(nodeId, newGpuSummary); + } else { + originalGpuSummary.gpuCount = newGpuSummary.gpuCount; + originalGpuSummary.timestamp = newGpuSummary.timestamp; + originalGpuSummary.gpuInfos = newGpuSummary.gpuInfos; + } + } + + public get defaultGpuSummary(): TrialGpuSummary | undefined { + const gpuSummary = this.gpuSummaries.get(this.defaultNodeId); + if (gpuSummary === undefined) { + if (false === this.isNoGpuWarned) { + this.log.warning(`EnvironmentInformation: ${this.envId} no default gpu found. current gpu info ${JSON.stringify(this.gpuSummaries)}`); + this.isNoGpuWarned = true; + } + } else { + this.isNoGpuWarned = false; } + return gpuSummary; } } export abstract class EnvironmentService { public abstract get hasStorageService(): boolean; - public abstract config(key: string, value: string): Promise; public abstract refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise; public abstract startEnvironment(environment: EnvironmentInformation): Promise; public abstract stopEnvironment(environment: EnvironmentInformation): Promise; - public getCommandChannel(commandEmitter: EventEmitter): CommandChannel { + // It depends on environment pressure and settings + // for example, OpenPAI relies on API calls, and there is an limitation for frequence, so it need to be bigger. + public get environmentMaintenceLoopInterval(): number { + return 5000; + } + + // it's needed in two scenario + // 1. remote machine has fixed number, so it can return false, when all environment are assigned. + // 2. If there are consistent error on requested environments, for example, authentication failure on platform. + public get hasMoreEnvironments(): boolean { + return true; + } + + public createCommandChannel(commandEmitter: EventEmitter): CommandChannel { return new WebCommandChannel(commandEmitter); } @@ -101,7 +168,7 @@ export class RunnerSettings { public nniManagerVersion: string = ""; public logCollection: string = "none"; public command: string = ""; - public enableGpuCollector: boolean = false; + public enableGpuCollector: boolean = true; // specify which communication channel is used by runner. // supported channel includes: rest, storage, aml diff --git a/src/nni_manager/training_service/reusable/environments/amlEnvironmentService.ts b/src/nni_manager/training_service/reusable/environments/amlEnvironmentService.ts index 7b17c5cc4d..fea393e75d 100644 --- a/src/nni_manager/training_service/reusable/environments/amlEnvironmentService.ts +++ b/src/nni_manager/training_service/reusable/environments/amlEnvironmentService.ts @@ -3,24 +3,20 @@ 'use strict'; +import { EventEmitter } from "events"; import * as fs from 'fs'; import * as path from 'path'; import * as component from '../../../common/component'; import { getExperimentId } from '../../../common/experimentStartupInfo'; import { getLogger, Logger } from '../../../common/log'; +import { getExperimentRootDir } from '../../../common/utils'; import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey'; -import { AMLClusterConfig, AMLTrialConfig } from '../aml/amlConfig'; -import { EnvironmentInformation, EnvironmentService } from '../environment'; -import { AMLEnvironmentInformation } from '../aml/amlConfig'; -import { AMLClient } from '../aml/amlClient'; -import { - NNIManagerIpConfig, -} from '../../../common/trainingService'; import { validateCodeDir } from '../../common/util'; -import { getExperimentRootDir } from '../../../common/utils'; +import { AMLClient } from '../aml/amlClient'; +import { AMLClusterConfig, AMLEnvironmentInformation, AMLTrialConfig } from '../aml/amlConfig'; import { AMLCommandChannel } from '../channels/amlCommandChannel'; import { CommandChannel } from "../commandChannel"; -import { EventEmitter } from "events"; +import { EnvironmentInformation, EnvironmentService, EnvironmentStatus } from '../environment'; /** @@ -28,17 +24,11 @@ import { EventEmitter } from "events"; */ @component.Singleton export class AMLEnvironmentService extends EnvironmentService { - + private readonly log: Logger = getLogger(); public amlClusterConfig: AMLClusterConfig | undefined; public amlTrialConfig: AMLTrialConfig | undefined; - private amlJobConfig: any; - private stopping: boolean = false; - private versionCheck: boolean = true; - private isMultiPhase: boolean = false; - private nniVersion?: string; private experimentId: string; - private nniManagerIpConfig?: NNIManagerIpConfig; private experimentRootDir: string; constructor() { @@ -51,7 +41,7 @@ export class AMLEnvironmentService extends EnvironmentService { return false; } - public getCommandChannel(commandEmitter: EventEmitter): CommandChannel { + public createCommandChannel(commandEmitter: EventEmitter): CommandChannel { return new AMLCommandChannel(commandEmitter); } @@ -83,29 +73,31 @@ export class AMLEnvironmentService extends EnvironmentService { public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise { environments.forEach(async (environment) => { const amlClient = (environment as AMLEnvironmentInformation).amlClient; - if (!amlClient) { - throw new Error('AML client not initialized!'); + if (!amlClient) { + throw new Error('AML client not initialized!'); } - const status = await amlClient.updateStatus(environment.status); - switch (status.toUpperCase()) { + const newStatus = await amlClient.updateStatus(environment.status); + switch (newStatus.toUpperCase()) { case 'WAITING': - case 'RUNNING': case 'QUEUED': - // RUNNING status is set by runner, and ignore waiting status + environment.setStatus('WAITING'); + break; + case 'RUNNING': + environment.setStatus('RUNNING'); break; case 'COMPLETED': case 'SUCCEEDED': - environment.setFinalStatus('SUCCEEDED'); + environment.setStatus('SUCCEEDED'); break; case 'FAILED': - environment.setFinalStatus('FAILED'); + environment.setStatus(newStatus.toUpperCase() as EnvironmentStatus); break; case 'STOPPED': case 'STOPPING': - environment.setFinalStatus('USER_CANCELED'); + environment.setStatus('USER_CANCELED'); break; default: - environment.setFinalStatus('UNKNOWN'); + environment.setStatus('UNKNOWN'); } }); } @@ -120,7 +112,7 @@ export class AMLEnvironmentService extends EnvironmentService { const amlEnvironment: AMLEnvironmentInformation = environment as AMLEnvironmentInformation; const environmentLocalTempFolder = path.join(this.experimentRootDir, this.experimentId, "environment-temp"); environment.command = `import os\nos.system('${amlEnvironment.command}')`; - await fs.promises.writeFile(path.join(environmentLocalTempFolder, 'nni_script.py'), amlEnvironment.command ,{ encoding: 'utf8' }); + await fs.promises.writeFile(path.join(environmentLocalTempFolder, 'nni_script.py'), amlEnvironment.command, { encoding: 'utf8' }); const amlClient = new AMLClient( this.amlClusterConfig.subscriptionId, this.amlClusterConfig.resourceGroup, diff --git a/src/nni_manager/training_service/reusable/environments/openPaiEnvironmentService.ts b/src/nni_manager/training_service/reusable/environments/openPaiEnvironmentService.ts index 0d935a6a37..3d92df5c99 100644 --- a/src/nni_manager/training_service/reusable/environments/openPaiEnvironmentService.ts +++ b/src/nni_manager/training_service/reusable/environments/openPaiEnvironmentService.ts @@ -4,6 +4,7 @@ 'use strict'; import * as fs from 'fs'; +import * as yaml from 'js-yaml'; import * as request from 'request'; import { Deferred } from 'ts-deferred'; import * as component from '../../../common/component'; @@ -15,7 +16,6 @@ import { NNIPAIK8STrialConfig } from '../../pai/paiK8S/paiK8SConfig'; import { EnvironmentInformation, EnvironmentService } from '../environment'; import { StorageService } from '../storageService'; -const yaml = require('js-yaml'); /** * Collector PAI jobs info from PAI cluster, and update pai job status locally @@ -40,6 +40,10 @@ export class OpenPaiEnvironmentService extends EnvironmentService { this.experimentId = getExperimentId(); } + public get environmentMaintenceLoopInterval(): number { + return 5000; + } + public get hasStorageService(): boolean { return true; } @@ -72,6 +76,16 @@ export class OpenPaiEnvironmentService extends EnvironmentService { if (this.paiTrialConfig.paiConfigPath) { this.paiJobConfig = yaml.safeLoad(fs.readFileSync(this.paiTrialConfig.paiConfigPath, 'utf8')); } + + if (this.paiClusterConfig.gpuNum === undefined) { + this.paiClusterConfig.gpuNum = this.paiTrialConfig.gpuNum; + } + if (this.paiClusterConfig.cpuNum === undefined) { + this.paiClusterConfig.cpuNum = this.paiTrialConfig.cpuNum; + } + if (this.paiClusterConfig.memoryMB === undefined) { + this.paiClusterConfig.memoryMB = this.paiTrialConfig.memoryMB; + } break; } default: @@ -111,37 +125,35 @@ export class OpenPaiEnvironmentService extends EnvironmentService { }); environments.forEach((environment) => { - if (jobInfos.has(environment.jobId)) { - const jobResponse = jobInfos.get(environment.jobId); + if (jobInfos.has(environment.envId)) { + const jobResponse = jobInfos.get(environment.envId); if (jobResponse && jobResponse.state) { const oldEnvironmentStatus = environment.status; switch (jobResponse.state) { case 'RUNNING': case 'WAITING': - // RUNNING status is set by runner, and ignore waiting status - break; case 'SUCCEEDED': case 'FAILED': - environment.setFinalStatus(jobResponse.state); + environment.setStatus(jobResponse.state); break; case 'STOPPED': case 'STOPPING': - environment.setFinalStatus('USER_CANCELED'); + environment.setStatus('USER_CANCELED'); break; default: - this.log.error(`OpenPAI: job ${environment.jobId} returns unknown state ${jobResponse.state}.`); - environment.setFinalStatus('UNKNOWN'); + this.log.error(`OpenPAI: job ${environment.envId} returns unknown state ${jobResponse.state}.`); + environment.setStatus('UNKNOWN'); } if (oldEnvironmentStatus !== environment.status) { - this.log.debug(`OpenPAI: job ${environment.jobId} change status ${oldEnvironmentStatus} to ${environment.status} due to job is ${jobResponse.state}.`) + this.log.debug(`OpenPAI: job ${environment.envId} change status ${oldEnvironmentStatus} to ${environment.status} due to job is ${jobResponse.state}.`) } } else { - this.log.error(`OpenPAI: job ${environment.jobId} has no state returned. body:${JSON.stringify(jobResponse)}`); + this.log.error(`OpenPAI: job ${environment.envId} has no state returned. body:${JSON.stringify(jobResponse)}`); // some error happens, and mark this environment environment.status = 'FAILED'; } } else { - this.log.error(`OpenPAI job ${environment.jobId} is not found in job list.`); + this.log.error(`OpenPAI job ${environment.envId} is not found in job list.`); environment.status = 'UNKNOWN'; } }); @@ -169,8 +181,10 @@ export class OpenPaiEnvironmentService extends EnvironmentService { // Step 1. Prepare PAI job configuration const environmentRoot = `${this.paiTrialConfig.containerNFSMountPath}/${this.experimentId}`; environment.runnerWorkingFolder = `${environmentRoot}/envs/${environment.id}`; - environment.command = `cd ${environmentRoot} && ${environment.command}` - environment.trackingUrl = `${this.protocol}://${this.paiClusterConfig.host}/job-detail.html?username=${this.paiClusterConfig.userName}&jobName=${environment.jobId}` + environment.command = `cd ${environmentRoot} && ${environment.command}`; + environment.trackingUrl = `${this.protocol}://${this.paiClusterConfig.host}/job-detail.html?username=${this.paiClusterConfig.userName}&jobName=${environment.envId}`; + environment.useActiveGpu = this.paiClusterConfig.useActiveGpu; + environment.maxTrialNumberPerGpu = this.paiClusterConfig.maxTrialNumPerGpu; // Step 2. Generate Job Configuration in yaml format const paiJobConfig = this.generateJobConfigInYamlFormat(environment); @@ -189,7 +203,7 @@ export class OpenPaiEnvironmentService extends EnvironmentService { request(submitJobRequest, (error, response, body) => { if ((error !== undefined && error !== null) || response.statusCode >= 400) { const errorMessage: string = (error !== undefined && error !== null) ? error.message : - `start environment ${environment.jobId} failed, http code:${response.statusCode}, http body: ${body}`; + `start environment ${environment.envId} failed, http code:${response.statusCode}, http body: ${body}`; this.log.error(errorMessage); environment.status = 'FAILED'; @@ -211,7 +225,7 @@ export class OpenPaiEnvironmentService extends EnvironmentService { } const stopJobRequest: request.Options = { - uri: `${this.protocol}://${this.paiClusterConfig.host}/rest-server/api/v2/jobs/${this.paiClusterConfig.userName}~${environment.jobId}/executionType`, + uri: `${this.protocol}://${this.paiClusterConfig.host}/rest-server/api/v2/jobs/${this.paiClusterConfig.userName}~${environment.envId}/executionType`, method: 'PUT', json: true, body: { value: 'STOP' }, @@ -222,17 +236,17 @@ export class OpenPaiEnvironmentService extends EnvironmentService { } }; - this.log.debug(`stopping OpenPAI environment ${environment.jobId}, ${stopJobRequest.uri}`); + this.log.debug(`stopping OpenPAI environment ${environment.envId}, ${stopJobRequest.uri}`); try { request(stopJobRequest, (error, response, _body) => { try { if ((error !== undefined && error !== null) || (response && response.statusCode >= 400)) { - this.log.error(`OpenPAI: stop job ${environment.jobId} failed with ${response.statusCode}\n${error}`); + this.log.error(`OpenPAI: stop job ${environment.envId} failed with ${response.statusCode}\n${error}`); deferred.reject((error !== undefined && error !== null) ? error : `Stop trial failed, http code: ${response.statusCode}`); } else { - this.log.info(`OpenPAI job ${environment.jobId} stopped.`); + this.log.info(`OpenPAI job ${environment.envId} stopped.`); } deferred.resolve(); } catch (error) { @@ -265,7 +279,7 @@ export class OpenPaiEnvironmentService extends EnvironmentService { if (this.paiTrialConfig === undefined) { throw new Error('trial config is not initialized'); } - const jobName = environment.jobId; + const jobName = environment.envId; let nniJobConfig: any = undefined; if (this.paiTrialConfig.paiConfigPath) { @@ -284,7 +298,6 @@ export class OpenPaiEnvironmentService extends EnvironmentService { environment.nodeCount += instanceCount; } - // Each taskRole will generate new command in NNI's command format // Each command will be formatted to NNI style for (const taskRoleName in nniJobConfig.taskRoles) { @@ -298,6 +311,19 @@ export class OpenPaiEnvironmentService extends EnvironmentService { } } else { + if (this.paiClusterConfig === undefined) { + throw new Error('PAI Cluster config is not initialized'); + } + if (this.paiClusterConfig.gpuNum === undefined) { + throw new Error('PAI Cluster gpuNum is not initialized'); + } + if (this.paiClusterConfig.cpuNum === undefined) { + throw new Error('PAI Cluster cpuNum is not initialized'); + } + if (this.paiClusterConfig.memoryMB === undefined) { + throw new Error('PAI Cluster memoryMB is not initialized'); + } + nniJobConfig = { protocolVersion: 2, name: jobName, @@ -320,9 +346,9 @@ export class OpenPaiEnvironmentService extends EnvironmentService { taskRetryCount: 0, dockerImage: 'docker_image_0', resourcePerInstance: { - gpu: this.paiTrialConfig.gpuNum, - cpu: this.paiTrialConfig.cpuNum, - memoryMB: this.paiTrialConfig.memoryMB + gpu: this.paiClusterConfig.gpuNum, + cpu: this.paiClusterConfig.cpuNum, + memoryMB: this.paiClusterConfig.memoryMB }, commands: [ environment.command diff --git a/src/nni_manager/training_service/reusable/gpuScheduler.ts b/src/nni_manager/training_service/reusable/gpuScheduler.ts new file mode 100644 index 0000000000..84f8ca4234 --- /dev/null +++ b/src/nni_manager/training_service/reusable/gpuScheduler.ts @@ -0,0 +1,235 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +'use strict'; + +import * as assert from 'assert'; +import { getLogger, Logger } from '../../common/log'; +import { randomSelect } from '../../common/utils'; +import { GPUInfo, ScheduleResultType } from '../common/gpuData'; +import { EnvironmentInformation } from './environment'; +import { TrialDetail } from './trial'; + +type SCHEDULE_POLICY_NAME = 'random' | 'round-robin'; + +export class GpuSchedulerSetting { + public useActiveGpu: boolean = false; + public maxTrialNumberPerGpu: number = 1; +} + +export type GpuScheduleResult = { + resultType: ScheduleResultType; + environment: EnvironmentInformation | undefined; + gpuIndices: GPUInfo[] | undefined; +}; + +/** + * A simple GPU scheduler implementation + */ +export class GpuScheduler { + + // private readonly machineExecutorMap: Set; + private readonly log: Logger = getLogger(); + private readonly policyName: SCHEDULE_POLICY_NAME = 'round-robin'; + private defaultSetting: GpuSchedulerSetting; + private roundRobinIndex: number = 0; + + /** + * Constructor + * @param environments map from remote machine to executor + */ + constructor(gpuSchedulerSetting: GpuSchedulerSetting | undefined = undefined) { + if (undefined === gpuSchedulerSetting) { + gpuSchedulerSetting = new GpuSchedulerSetting(); + } + this.defaultSetting = gpuSchedulerSetting; + } + + public setSettings(gpuSchedulerSetting: GpuSchedulerSetting): void { + this.defaultSetting = gpuSchedulerSetting; + } + + /** + * Schedule a machine according to the constraints (requiredGPUNum) + * @param requiredGPUNum required GPU number + */ + public scheduleMachine(environments: EnvironmentInformation[], requiredGPUNum: number | undefined, trialDetail: TrialDetail): GpuScheduleResult { + if (requiredGPUNum === undefined) { + requiredGPUNum = 0; + } + assert(requiredGPUNum >= 0); + // Step 1: Check if required GPU number not exceeds the total GPU number in all machines + const eligibleEnvironments: EnvironmentInformation[] = environments.filter((environment: EnvironmentInformation) => + environment.defaultGpuSummary === undefined || requiredGPUNum === 0 || (requiredGPUNum !== undefined && environment.defaultGpuSummary.gpuCount >= requiredGPUNum)); + if (eligibleEnvironments.length === 0) { + // If the required gpu number exceeds the upper limit of all machine's GPU number + // Return REQUIRE_EXCEED_TOTAL directly + return ({ + resultType: ScheduleResultType.REQUIRE_EXCEED_TOTAL, + gpuIndices: undefined, + environment: undefined, + }); + } + + // Step 2: Allocate Host/GPU for specified trial job + // Currenty the requireGPUNum parameter for all trial jobs are identical. + if (requiredGPUNum > 0) { + // Trial job requires GPU + const result: GpuScheduleResult | undefined = this.scheduleGPUHost(environments, requiredGPUNum, trialDetail); + if (result !== undefined) { + return result; + } + } else { + // Trail job does not need GPU + const allocatedRm: EnvironmentInformation = this.selectMachine(environments, environments); + + return this.allocateHost(requiredGPUNum, allocatedRm, [], trialDetail); + } + + return { + resultType: ScheduleResultType.TMP_NO_AVAILABLE_GPU, + gpuIndices: undefined, + environment: undefined, + }; + } + + /** + * remove the job's gpu reversion + */ + public removeGpuReservation(trial: TrialDetail): void { + if (trial.environment !== undefined && + trial.environment.defaultGpuSummary !== undefined && + trial.assignedGpus !== undefined && + trial.assignedGpus.length > 0) { + for (const gpuInfo of trial.assignedGpus) { + const defaultGpuSummary = trial.environment.defaultGpuSummary; + const num: number | undefined = defaultGpuSummary.assignedGpuIndexMap.get(gpuInfo.index); + if (num !== undefined) { + if (num === 1) { + defaultGpuSummary.assignedGpuIndexMap.delete(gpuInfo.index); + } else { + defaultGpuSummary.assignedGpuIndexMap.set(gpuInfo.index, num - 1); + } + } + } + } + } + + private scheduleGPUHost(environments: EnvironmentInformation[], requiredGPUNumber: number, trial: TrialDetail): GpuScheduleResult | undefined { + const totalResourceMap: Map = this.gpuResourceDetection(environments); + const qualifiedEnvironments: EnvironmentInformation[] = []; + totalResourceMap.forEach((gpuInfos: GPUInfo[], environment: EnvironmentInformation) => { + if (gpuInfos !== undefined && gpuInfos.length >= requiredGPUNumber) { + qualifiedEnvironments.push(environment); + } + }); + if (qualifiedEnvironments.length > 0) { + const allocatedEnvironment: EnvironmentInformation = this.selectMachine(qualifiedEnvironments, environments); + const gpuInfos: GPUInfo[] | undefined = totalResourceMap.get(allocatedEnvironment); + if (gpuInfos !== undefined) { // should always true + return this.allocateHost(requiredGPUNumber, allocatedEnvironment, gpuInfos, trial); + } else { + assert(false, 'gpuInfos is undefined'); + } + } + } + + /** + * Detect available GPU resource for an environment + * @returns Available GPUs on environments + */ + private gpuResourceDetection(environments: EnvironmentInformation[]): Map { + const totalResourceMap: Map = new Map(); + environments.forEach((environment: EnvironmentInformation) => { + // Assgin totoal GPU count as init available GPU number + if (environment.defaultGpuSummary !== undefined) { + const defaultGpuSummary = environment.defaultGpuSummary; + const availableGPUs: GPUInfo[] = []; + const designatedGpuIndices: Set = new Set(environment.usableGpus); + if (designatedGpuIndices.size > 0) { + for (const gpuIndex of designatedGpuIndices) { + if (gpuIndex >= environment.defaultGpuSummary.gpuCount) { + throw new Error(`Specified GPU index not found: ${gpuIndex}`); + } + } + } + + if (undefined !== defaultGpuSummary.gpuInfos) { + defaultGpuSummary.gpuInfos.forEach((gpuInfo: GPUInfo) => { + // if the GPU has active process, OR be reserved by a job, + // or index not in gpuIndices configuration in machineList, + // or trial number on a GPU reach max number, + // We should NOT allocate this GPU + // if users set useActiveGpu, use the gpu whether there is another activeProcess + if (designatedGpuIndices.size === 0 || designatedGpuIndices.has(gpuInfo.index)) { + if (defaultGpuSummary.assignedGpuIndexMap !== undefined) { + const num: number | undefined = defaultGpuSummary.assignedGpuIndexMap.get(gpuInfo.index); + const maxTrialNumberPerGpu: number = environment.maxTrialNumberPerGpu ? environment.maxTrialNumberPerGpu : this.defaultSetting.maxTrialNumberPerGpu; + const useActiveGpu: boolean = environment.useActiveGpu ? environment.useActiveGpu : this.defaultSetting.useActiveGpu; + if ((num === undefined && (!useActiveGpu && gpuInfo.activeProcessNum === 0 || useActiveGpu)) || + (num !== undefined && num < maxTrialNumberPerGpu)) { + availableGPUs.push(gpuInfo); + } + } else { + throw new Error(`occupiedGpuIndexMap is undefined!`); + } + } + }); + } + totalResourceMap.set(environment, availableGPUs); + } + }); + + return totalResourceMap; + } + + private selectMachine(qualifiedEnvironments: EnvironmentInformation[], allEnvironments: EnvironmentInformation[]): EnvironmentInformation { + assert(qualifiedEnvironments !== undefined && qualifiedEnvironments.length > 0); + + if (this.policyName === 'random') { + return randomSelect(qualifiedEnvironments); + } else if (this.policyName === 'round-robin') { + return this.roundRobinSelect(qualifiedEnvironments, allEnvironments); + } else { + throw new Error(`Unsupported schedule policy: ${this.policyName}`); + } + } + + private roundRobinSelect(qualifiedEnvironments: EnvironmentInformation[], allEnvironments: EnvironmentInformation[]): EnvironmentInformation { + while (!qualifiedEnvironments.includes(allEnvironments[this.roundRobinIndex % allEnvironments.length])) { + this.roundRobinIndex++; + } + + return allEnvironments[this.roundRobinIndex++ % allEnvironments.length]; + } + + private selectGPUsForTrial(gpuInfos: GPUInfo[], requiredGPUNum: number): GPUInfo[] { + // Sequentially allocate GPUs + return gpuInfos.slice(0, requiredGPUNum); + } + + private allocateHost(requiredGPUNum: number, environment: EnvironmentInformation, + gpuInfos: GPUInfo[], trialDetails: TrialDetail): GpuScheduleResult { + assert(gpuInfos.length >= requiredGPUNum); + const allocatedGPUs: GPUInfo[] = this.selectGPUsForTrial(gpuInfos, requiredGPUNum); + const defaultGpuSummary = environment.defaultGpuSummary; + if (undefined === defaultGpuSummary) { + throw new Error(`Environment ${environment.id} defaultGpuSummary shouldn't be undefined!`); + } + + allocatedGPUs.forEach((gpuInfo: GPUInfo) => { + let num: number | undefined = defaultGpuSummary.assignedGpuIndexMap.get(gpuInfo.index); + if (num === undefined) { + num = 0; + } + defaultGpuSummary.assignedGpuIndexMap.set(gpuInfo.index, num + 1); + }); + trialDetails.assignedGpus = allocatedGPUs; + + return { + resultType: ScheduleResultType.SUCCEED, + environment: environment, + gpuIndices: allocatedGPUs, + }; + } +} diff --git a/src/nni_manager/training_service/reusable/storageService.ts b/src/nni_manager/training_service/reusable/storageService.ts index 75b64c2606..ec54a6792a 100644 --- a/src/nni_manager/training_service/reusable/storageService.ts +++ b/src/nni_manager/training_service/reusable/storageService.ts @@ -83,7 +83,7 @@ export abstract class StorageService { localPath = this.expandPath(false, localPath); remotePath = this.expandPath(true, remotePath); this.logger.debug(`copy remotePath: ${remotePath} to localPath: ${localPath}`); - return await this.internalCopy(localPath, remotePath, true, true, false); + return await this.internalCopy(remotePath, localPath, true, true, false); } public async removeDirectory(remotePath: string, isRecursive: boolean): Promise { @@ -151,7 +151,7 @@ export abstract class StorageService { localPath = this.expandPath(false, localPath); remotePath = this.expandPath(true, remotePath); this.logger.debug(`copy file remotePath: ${remotePath} to localPath: ${localPath}`); - await this.internalCopy(localPath, remotePath, false, true, false); + await this.internalCopy(remotePath, localPath, false, true, false); } public async removeFile(remotePath: string): Promise { diff --git a/src/nni_manager/training_service/reusable/storages/mountedStorageService.ts b/src/nni_manager/training_service/reusable/storages/mountedStorageService.ts index a3e592e74c..9cc56402f4 100644 --- a/src/nni_manager/training_service/reusable/storages/mountedStorageService.ts +++ b/src/nni_manager/training_service/reusable/storages/mountedStorageService.ts @@ -17,12 +17,12 @@ export class MountedStorageService extends StorageService { if (isRecursive) { const children = await fs.promises.readdir(path); for (const file of children) { - const stat = await fs.promises.lstat(file); - this.internalRemove(file, stat.isDirectory(), isRecursive); + const filePath = this.internalJoin(path, file); + const stat = await fs.promises.lstat(filePath); + await this.internalRemove(filePath, stat.isDirectory(), isRecursive); } - } else { - await fs.promises.rmdir(path); } + await fs.promises.rmdir(path); } else { await fs.promises.unlink(path); } @@ -98,7 +98,7 @@ export class MountedStorageService extends StorageService { { encoding: "utf8", start: current, - end: readLength + current, + end: readLength + current - 1, }).on("data", (data) => { result += data; }).on("end", () => { diff --git a/src/nni_manager/training_service/reusable/test/mountedStorageService.test.ts b/src/nni_manager/training_service/reusable/test/mountedStorageService.test.ts new file mode 100644 index 0000000000..b57649fa33 --- /dev/null +++ b/src/nni_manager/training_service/reusable/test/mountedStorageService.test.ts @@ -0,0 +1,125 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +'use strict'; + +import * as chai from 'chai'; +import * as fs from 'fs'; +import * as path from 'path'; +import { getLogger, Logger } from "../../../common/log"; +import { cleanupUnitTest, prepareUnitTest } from '../../../common/utils'; +import { MountedStorageService } from "../storages/mountedStorageService"; +import chaiAsPromised = require("chai-as-promised"); + + +async function remove(removedPath: string, isDirectory: boolean, isRecursive: boolean): Promise { + if (isDirectory) { + if (isRecursive) { + const children = await fs.promises.readdir(removedPath); + for (const fileName of children) { + const filePath = path.join(removedPath, fileName); + const stat = await fs.promises.lstat(filePath); + await remove(filePath, stat.isDirectory(), isRecursive); + } + } + await fs.promises.rmdir(removedPath); + } else { + await fs.promises.unlink(removedPath); + } +} + +describe('Unit Test for MountedStorageService', () => { + + let service: MountedStorageService; + let log: Logger; + let localPath = "reusableut/local"; + let mountedPath = "reusableut/mounted"; + + const testPath = "testpath"; + const testFileName = "testfile.txt"; + let localCopiedPath: string; + let localFileName: string; + let mountedFileName: string; + + before(() => { + chai.should(); + chai.use(chaiAsPromised); + prepareUnitTest(); + log = getLogger(); + + const testRoot = path.dirname(__filename); + localPath = path.join(testRoot, localPath); + mountedPath = path.join(testRoot, mountedPath); + service = new MountedStorageService(); + service.initialize(localPath, mountedPath); + + localCopiedPath = path.join(localPath, testPath); + localFileName = path.join(localCopiedPath, testFileName); + mountedFileName = path.join(testPath, testFileName); + }); + + after(() => { + cleanupUnitTest(); + }); + + beforeEach(async () => { + if (!fs.existsSync(localPath)) { + await fs.promises.mkdir(localPath, { recursive: true }); + } + if (!fs.existsSync(mountedPath)) { + await fs.promises.mkdir(mountedPath, { recursive: true }); + } + log.info(`localFileName: ${localFileName}`); + + await fs.promises.mkdir(localCopiedPath, { recursive: true }); + await fs.promises.writeFile(localFileName, "hello world"); + }); + + afterEach(async () => { + const testRootPath = path.normalize(`${localPath}/../../reusableut`); + await remove(testRootPath, true, true); + }); + + it('copyAndRename', async () => { + await service.copyDirectory(localCopiedPath, "."); + chai.expect(fs.existsSync(mountedPath)); + + const newName = `${testFileName}new`; + await service.rename(mountedFileName, newName); + chai.assert.isFalse(fs.existsSync(testPath)); + const newTestPath = `${mountedFileName}new`; + chai.assert.isTrue(await service.exists(newTestPath)); + + await service.copyFileBack(newTestPath, "."); + const localNewFileName = `${localPath}/${newName}`; + chai.assert.isTrue(fs.existsSync(localNewFileName)); + + fs.unlinkSync(`${localFileName}`); + fs.rmdirSync(`${localPath}/${testPath}`); + await service.copyDirectoryBack(`${mountedPath}/${testPath}`, `.`); + const localNewName = `${localFileName}new`; + chai.assert.isTrue(fs.existsSync(localNewName)); + }) + + it('FileContentTest', async () => { + const savedFileName = "savedfile.txt"; + await service.save("01234", savedFileName); + chai.expect(fs.existsSync(savedFileName)); + + let content = await service.readFileContent(savedFileName, 0, -1); + chai.assert.equal(content, "01234"); + + await service.save("56789", savedFileName, true); + content = await service.readFileContent(savedFileName, 0, -1); + chai.assert.equal(content, "0123456789"); + + content = await service.readFileContent(savedFileName, -1, 1); + chai.assert.equal(content, "0"); + + content = await service.readFileContent(savedFileName, 5, 1); + chai.assert.equal(content, "5"); + + content = await service.readFileContent(savedFileName, 5, -1); + chai.assert.equal(content, "56789"); + }); +}); diff --git a/src/nni_manager/training_service/reusable/test/trialDispatcher.test.ts b/src/nni_manager/training_service/reusable/test/trialDispatcher.test.ts new file mode 100644 index 0000000000..958738de54 --- /dev/null +++ b/src/nni_manager/training_service/reusable/test/trialDispatcher.test.ts @@ -0,0 +1,712 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import * as chai from 'chai'; +import * as path from 'path'; +import { Scope } from "typescript-ioc"; +import * as component from '../../../common/component'; +import { getLogger, Logger } from "../../../common/log"; +import { TrialJobApplicationForm, TrialJobStatus } from '../../../common/trainingService'; +import { cleanupUnitTest, delay, prepareUnitTest, uniqueString } from '../../../common/utils'; +import { INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, SEND_TRIAL_JOB_PARAMETER, TRIAL_END, GPU_INFO } from '../../../core/commands'; +import { TrialConfigMetadataKey } from '../../../training_service/common/trialConfigMetadataKey'; +import { Command } from '../commandChannel'; +import { EnvironmentInformation, EnvironmentService } from "../environment"; +import { TrialDetail } from '../trial'; +import { TrialDispatcher } from "../trialDispatcher"; +import { UtCommandChannel } from './utCommandChannel'; +import { UtEnvironmentService } from "./utEnvironmentService"; +import chaiAsPromised = require("chai-as-promised"); +import { promises } from 'fs'; +import { Deferred } from 'ts-deferred'; +import { NNIErrorNames, NNIError, MethodNotImplementedError } from '../../../common/errors'; + +function createTrialForm(content: any = undefined): TrialJobApplicationForm { + if (content === undefined) { + content = { + "test": 1 + }; + } + const trialForm = { + sequenceId: 0, + hyperParameters: { + value: JSON.stringify(content), + index: 0 + } + }; + return trialForm; +} + +async function waitResult(callback: () => Promise, waitMs: number = 1000, interval: number = 1, throwError: boolean = false): Promise { + while (waitMs > 0) { + const result = await callback(); + if (result !== undefined) { + return result; + } + await delay(interval); + waitMs -= interval; + }; + + if (throwError) { + throw new Error(`wait result timeout!\n${callback.toString()}`); + } + + return undefined; +} + +async function waitResultMust(callback: () => Promise, waitMs: number = 1000, interval: number = 1): Promise { + const result = await waitResult(callback, waitMs, interval, true); + // this error should be thrown in waitResult already. + if (result === undefined) { + throw new Error(`wait result timeout!`); + } + return result; +} + +async function newTrial(trialDispatcher: TrialDispatcher): Promise { + const trialDetail = await trialDispatcher.submitTrialJob(createTrialForm()); + + return trialDetail; +} + +function newGpuInfo(gpuCount: Number = 2, nodeId: string | undefined = undefined): any { + let gpuInfos = []; + for (let index = 0; index < gpuCount; index++) { + gpuInfos.push({ + index: index, + activeProcessNum: 0, + }); + } + const gpuInfo = { + gpuInfos: gpuInfos, + gpuCount: gpuInfos.length, + node: nodeId + } + return gpuInfo; +} + +async function verifyTrialRunning(commandChannel: UtCommandChannel, trialDetail: TrialDetail): Promise { + + let command = await waitResultMust(async () => { + return await commandChannel.testReceiveCommandFromTrialDispatcher(); + }); + chai.assert.equal(command.command, NEW_TRIAL_JOB, "verifyTrialRunning command type"); + chai.assert.equal(command.data["trialId"], trialDetail.id, "verifyTrialRunning trialDetail.id should be equal."); + + return command; +} + +async function verifyTrialResult(commandChannel: UtCommandChannel, trialDetail: TrialDetail, returnCode: number = 0): Promise { + let trialResult = { + trial: trialDetail.id, + code: returnCode, + timestamp: Date.now(), + }; + if (trialDetail.environment === undefined) { + throw new Error(`environment shouldn't be undefined.`) + } + + await commandChannel.testSendCommandToTrialDispatcher(trialDetail.environment, TRIAL_END, trialResult); + await waitResultMust(async () => { + return trialDetail.status !== 'RUNNING' ? true : undefined; + }); + if (returnCode === 0) { + chai.assert.equal(trialDetail.status, 'SUCCEEDED', "trial should be succeeded"); + } else { + chai.assert.equal(trialDetail.status, 'FAILED', "trial should be failed"); + } +} + +async function waitEnvironment(waitCount: number, + previousEnvironments: Map, + environmentService: UtEnvironmentService, commandChannel: UtCommandChannel, + gpuCount: number = 2, nodeCount: number = 1, + callback: ((environment: EnvironmentInformation) => Promise) | undefined = undefined): Promise { + const waitRequestEnvironment = await waitResultMust(async () => { + const environments = environmentService.testGetEnvironments(); + if (environments.size === waitCount) { + for (const [id, environment] of environments) { + if (!previousEnvironments.has(id)) { + previousEnvironments.set(id, environment); + return environment; + } + } + } + return undefined; + }); + + if (waitRequestEnvironment === undefined) { + throw new Error(`waitRequestEnvironment is not defined.`); + } + + const nodeIds = []; + waitRequestEnvironment.nodeCount = nodeCount; + if (nodeCount > 1) { + for (let index = 0; index < nodeCount; index++) { + nodeIds.push(uniqueString(5)); + } + } else { + nodeIds.push(undefined); + } + for (const nodeId of nodeIds) { + // set runner is ready. + await commandChannel.testSendCommandToTrialDispatcher(waitRequestEnvironment, INITIALIZED, { node: nodeId }); + + if (gpuCount > 0) { + await commandChannel.testSendCommandToTrialDispatcher(waitRequestEnvironment, GPU_INFO, newGpuInfo(gpuCount, nodeId)); + } + } + + if (callback) { + await callback(waitRequestEnvironment); + } + + // set env to running + environmentService.testSetEnvironmentStatus(waitRequestEnvironment, 'RUNNING'); + + await waitResultMust(async () => { + return waitRequestEnvironment.isRunnerReady ? true : undefined; + }); + + return waitRequestEnvironment; +} + +describe('Unit Test for TrialDispatcher', () => { + + let trialRunPromise: Promise; + let trialDispatcher: TrialDispatcher; + let commandChannel: UtCommandChannel; + let environmentService: UtEnvironmentService; + let log: Logger; + let previousEnvironments: Map = new Map(); + const currentDir = path.dirname(__filename); + + before(() => { + chai.should(); + chai.use(chaiAsPromised); + prepareUnitTest(); + log = getLogger(); + }); + + after(() => { + cleanupUnitTest(); + }); + + beforeEach(async () => { + const trialConfig = { + codeDir: currentDir, + command: "echo", + } + const nniManagerIpConfig = { + nniManagerIp: "127.0.0.1", + } + trialDispatcher = new TrialDispatcher(); + component.Container.bind(EnvironmentService) + .to(UtEnvironmentService) + .scope(Scope.Singleton); + + await trialDispatcher.setClusterMetadata(TrialConfigMetadataKey.TRIAL_CONFIG, JSON.stringify(trialConfig)); + await trialDispatcher.setClusterMetadata(TrialConfigMetadataKey.NNI_MANAGER_IP, JSON.stringify(nniManagerIpConfig)); + trialRunPromise = trialDispatcher.run(); + + environmentService = component.get(EnvironmentService) as UtEnvironmentService; + commandChannel = environmentService.testGetCommandChannel(); + }); + + afterEach(async () => { + previousEnvironments.clear(); + await trialDispatcher.cleanUp(); + environmentService.testReset(); + await trialRunPromise; + }); + + it('reuse env', async () => { + + let trialDetail = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, 0); + + trialDetail = await newTrial(trialDispatcher); + await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, -1); + + chai.assert.equal(environmentService.testGetEnvironments().size, 1, "as env reused, so only 1 env should be here."); + const trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 2, "there should be 2 trials"); + }); + + it('not reusable env', async () => { + trialDispatcher.setClusterMetadata( + TrialConfigMetadataKey.TRIAL_CONFIG, + JSON.stringify({ + reuseEnvironment: false, + codeDir: currentDir, + })); + + let trialDetail = await newTrial(trialDispatcher); + + let environment = await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, 0); + await waitResultMust(async () => { + return environment.status === 'USER_CANCELED' ? true : undefined; + }); + + trialDetail = await newTrial(trialDispatcher); + + await waitEnvironment(2, previousEnvironments, environmentService, commandChannel); + await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, -1); + await waitResultMust(async () => { + return environment.status === 'USER_CANCELED' ? true : undefined; + }); + + chai.assert.equal(environmentService.testGetEnvironments().size, 2, "as env not reused, so only 2 envs should be here."); + const trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 2, "there should be 2 trials"); + }); + + it('no more env', async () => { + + const trialDetail1 = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + + // set to no more environment + environmentService.testSetNoMoreEnvironment(false); + + const trialDetail2 = await newTrial(trialDispatcher); + + await verifyTrialRunning(commandChannel, trialDetail1); + await verifyTrialResult(commandChannel, trialDetail1, 0); + + await verifyTrialRunning(commandChannel, trialDetail2); + await verifyTrialResult(commandChannel, trialDetail2, -1); + + chai.assert.equal(environmentService.testGetEnvironments().size, 1, "as env not reused, so only 1 envs should be here."); + const trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 2, "there should be 2 trials"); + }); + + + it('2trial2env', async () => { + + let trialDetail1 = await newTrial(trialDispatcher); + let trialDetail2 = await newTrial(trialDispatcher); + + await waitEnvironment(2, previousEnvironments, environmentService, commandChannel); + await verifyTrialRunning(commandChannel, trialDetail1); + await verifyTrialResult(commandChannel, trialDetail1, 0); + await verifyTrialRunning(commandChannel, trialDetail2); + await verifyTrialResult(commandChannel, trialDetail2, 0); + + chai.assert.equal(environmentService.testGetEnvironments().size, 2, "2 envs should be here."); + const trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 2, "there should be 2 trials"); + }); + + it('3trial2env', async () => { + + let trialDetail1 = await newTrial(trialDispatcher); + let trialDetail2 = await newTrial(trialDispatcher); + + await waitEnvironment(2, previousEnvironments, environmentService, commandChannel); + await verifyTrialRunning(commandChannel, trialDetail1); + await verifyTrialResult(commandChannel, trialDetail1, 0); + await verifyTrialRunning(commandChannel, trialDetail2); + await verifyTrialResult(commandChannel, trialDetail2, 0); + + chai.assert.equal(environmentService.testGetEnvironments().size, 2, "2 envs should be here."); + let trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 2, "there should be 2 trials"); + + + let trialDetail3 = await newTrial(trialDispatcher); + await verifyTrialRunning(commandChannel, trialDetail3); + await verifyTrialResult(commandChannel, trialDetail3, 0); + + chai.assert.equal(environmentService.testGetEnvironments().size, 2, "2 envs should be here."); + trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 3, "there should be 2 trials"); + }); + + it('stop trial', async () => { + + let trialDetail1 = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + await verifyTrialRunning(commandChannel, trialDetail1); + await trialDispatcher.cancelTrialJob(trialDetail1.id, false); + + let command = await waitResultMust(async () => { + return await commandChannel.testReceiveCommandFromTrialDispatcher(); + }); + chai.assert.equal(command.command, KILL_TRIAL_JOB); + log.info(`command: ${JSON.stringify(command)}`); + chai.assert.equal(command.data, trialDetail1.id); + + await waitResultMust(async () => { + return trialDetail1.status !== 'RUNNING' ? true : undefined; + }); + + let trialDetail2 = await newTrial(trialDispatcher); + await verifyTrialRunning(commandChannel, trialDetail2); + await trialDispatcher.cancelTrialJob(trialDetail2.id, true); + command = await waitResultMust(async () => { + return await commandChannel.testReceiveCommandFromTrialDispatcher(); + }); + chai.assert.equal(command.command, KILL_TRIAL_JOB); + log.info(`command: ${JSON.stringify(command)}`); + chai.assert.equal(command.data, trialDetail2.id); + await waitResultMust(async () => { + return trialDetail2.status !== 'RUNNING' ? true : undefined; + }); + + chai.assert.equal(environmentService.testGetEnvironments().size, 1, "only one trial, so one env"); + const trials = await trialDispatcher.listTrialJobs(); + + chai.assert.equal(trials.length, 2, "there should be 1 stopped trial only"); + let trial = await trialDispatcher.getTrialJob(trialDetail1.id); + chai.assert.equal(trial.status, 'USER_CANCELED', `trial is canceled.`); + trial = await trialDispatcher.getTrialJob(trialDetail2.id); + chai.assert.equal(trial.status, 'EARLY_STOPPED', `trial is earlier stopped.`); + }); + + it('multi phase', async () => { + let trialDetail = await newTrial(trialDispatcher); + + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + await verifyTrialRunning(commandChannel, trialDetail); + + let content = { + test: 2, + } + await trialDispatcher.updateTrialJob(trialDetail.id, createTrialForm(content)); + + let command = await waitResultMust(async () => { + return await commandChannel.testReceiveCommandFromTrialDispatcher(); + }); + + chai.assert.equal(command.command, SEND_TRIAL_JOB_PARAMETER); + chai.assert.equal(command.data["trialId"], trialDetail.id); + chai.assert.equal(command.data.parameters.index, 0); + chai.assert.equal(command.data.parameters.value, JSON.stringify(content)); + + content = { + test: 3, + } + await trialDispatcher.updateTrialJob(trialDetail.id, createTrialForm(content)); + command = await waitResultMust(async () => { + return await commandChannel.testReceiveCommandFromTrialDispatcher(); + }); + chai.assert.equal(command.command, SEND_TRIAL_JOB_PARAMETER); + chai.assert.equal(command.data["trialId"], trialDetail.id); + chai.assert.equal(command.data.parameters.index, 0); + chai.assert.equal(command.data.parameters.value, JSON.stringify(content)); + + await verifyTrialResult(commandChannel, trialDetail, 0); + + chai.assert.equal(environmentService.testGetEnvironments().size, 1, "only one trial, so one env"); + const trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 1, "there should be 1 stopped trial only"); + }); + + it('multi node', async () => { + let trialDetail = await newTrial(trialDispatcher); + + const environment = await waitEnvironment(1, previousEnvironments, environmentService, commandChannel, 2, 2); + log.debug(`environment ${JSON.stringify(environment)}`); + await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, 0); + + chai.assert.equal(environment.nodes.size, 2); + let command = await waitResultMust(async () => { + return await commandChannel.testReceiveCommandFromTrialDispatcher(); + }); + chai.assert.equal(command.command, KILL_TRIAL_JOB); + chai.assert.equal(environmentService.testGetEnvironments().size, 1, "only one trial, so one env"); + const trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 1, "there should be 1 stopped trial only"); + }); + + it('env timeout', async () => { + let trialDetail = await newTrial(trialDispatcher); + let environment = await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, 0); + + environmentService.testSetEnvironmentStatus(environment, 'SUCCEEDED'); + await waitResultMust(async () => { + return environment.status === 'SUCCEEDED' ? true : undefined; + }); + + trialDetail = await newTrial(trialDispatcher); + await waitEnvironment(2, previousEnvironments, environmentService, commandChannel); + await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, 0); + + chai.assert.equal(previousEnvironments.size, 2, "as an env timeout, so 2 envs should be here."); + const trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 2, "there should be 2 trials"); + }); + + it('env failed with trial', async () => { + let trialDetail = await newTrial(trialDispatcher); + let environment = await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + await verifyTrialRunning(commandChannel, trialDetail); + + environmentService.testSetEnvironmentStatus(environment, 'FAILED'); + await waitResultMust(async () => { + return environment.status === 'FAILED' ? true : undefined; + }); + + await waitResultMust(async () => { + return trialDetail.status === 'FAILED' ? true : undefined; + }); + + chai.assert.equal(trialDetail.status, 'FAILED', "env failed, so trial also failed."); + }); + + it('GPUScheduler disabled gpuNum === undefined', async () => { + + let trialDetail = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + const command = await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, 0); + + chai.assert.equal(command.data["gpuIndices"], undefined); + }); + + it('GPUScheduler disabled gpuNum === 0', async () => { + trialDispatcher.setClusterMetadata( + TrialConfigMetadataKey.TRIAL_CONFIG, + JSON.stringify({ + reuseEnvironment: false, + codeDir: currentDir, + gpuNum: 0, + })); + + let trialDetail = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + const command = await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, 0); + + chai.assert.equal(command.data["gpuIndices"], ""); + }); + + it('GPUScheduler enable no cluster gpu config', async () => { + trialDispatcher.setClusterMetadata( + TrialConfigMetadataKey.TRIAL_CONFIG, + JSON.stringify({ + reuseEnvironment: false, + codeDir: currentDir, + gpuNum: 1, + })); + + let trialDetail = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + const command = await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, 0); + + chai.assert.equal(command.data["gpuIndices"], "0"); + }); + + it('GPUScheduler skipped no GPU info', async () => { + trialDispatcher.setClusterMetadata( + TrialConfigMetadataKey.TRIAL_CONFIG, + JSON.stringify({ + reuseEnvironment: false, + codeDir: currentDir, + })); + + let trialDetail = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + const command = await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, 0); + + chai.assert.equal(command.data["gpuIndices"], undefined); + }); + + it('GPUScheduler disabled multi-node', async () => { + trialDispatcher.setClusterMetadata( + TrialConfigMetadataKey.TRIAL_CONFIG, + JSON.stringify({ + reuseEnvironment: false, + codeDir: currentDir, + gpuNum: 0, + })); + + let trialDetail = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + const command = await verifyTrialRunning(commandChannel, trialDetail); + await verifyTrialResult(commandChannel, trialDetail, 0); + + chai.assert.equal(command.data["gpuIndices"], ""); + }); + + it('GPUScheduler enabled 2 gpus 2 trial', async () => { + trialDispatcher.setClusterMetadata( + TrialConfigMetadataKey.TRIAL_CONFIG, + JSON.stringify({ + reuseEnvironment: false, + codeDir: currentDir, + gpuNum: 1, + })); + + const trialDetail1 = await newTrial(trialDispatcher); + const trialDetail2 = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + let command = await verifyTrialRunning(commandChannel, trialDetail1); + chai.assert.equal(command.data["gpuIndices"], "0"); + command = await verifyTrialRunning(commandChannel, trialDetail2); + chai.assert.equal(command.data["gpuIndices"], "1"); + + await verifyTrialResult(commandChannel, trialDetail1, 0); + await verifyTrialResult(commandChannel, trialDetail2, 0); + + chai.assert.equal(environmentService.testGetEnvironments().size, 1); + const trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 2, "there should be 2 trials"); + }); + + it('GPUScheduler enabled 4 gpus 2 trial(need 2 gpus)', async () => { + trialDispatcher.setClusterMetadata( + TrialConfigMetadataKey.TRIAL_CONFIG, + JSON.stringify({ + reuseEnvironment: false, + codeDir: currentDir, + gpuNum: 2, + })); + + const trialDetail1 = await newTrial(trialDispatcher); + const trialDetail2 = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel, 4); + let command = await verifyTrialRunning(commandChannel, trialDetail1); + chai.assert.equal(command.data["gpuIndices"], "0,1"); + command = await verifyTrialRunning(commandChannel, trialDetail2); + chai.assert.equal(command.data["gpuIndices"], "2,3"); + + await verifyTrialResult(commandChannel, trialDetail1, 0); + await verifyTrialResult(commandChannel, trialDetail2, 0); + + chai.assert.equal(environmentService.testGetEnvironments().size, 1); + const trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 2, "there should be 2 trials"); + }); + + it('GPUScheduler enabled use 4 gpus but only 1 usable(4)', async () => { + trialDispatcher.setClusterMetadata( + TrialConfigMetadataKey.TRIAL_CONFIG, + JSON.stringify({ + reuseEnvironment: false, + codeDir: currentDir, + gpuNum: 1, + })); + + const trialDetail = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel, 4, 1, async (environment) => { + environment.usableGpus = [3]; + }); + let command = await verifyTrialRunning(commandChannel, trialDetail); + chai.assert.equal(command.data["gpuIndices"], "3"); + + await verifyTrialResult(commandChannel, trialDetail, 0); + + chai.assert.equal(environmentService.testGetEnvironments().size, 1); + const trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 1); + }); + + it('GPUScheduler enabled TMP_NO_AVAILABLE_GPU, request new env', async () => { + trialDispatcher.setClusterMetadata( + TrialConfigMetadataKey.TRIAL_CONFIG, + JSON.stringify({ + reuseEnvironment: false, + codeDir: currentDir, + gpuNum: 1, + })); + + const trialDetail1 = await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel, 1); + let command = await verifyTrialRunning(commandChannel, trialDetail1); + chai.assert.equal(command.data["gpuIndices"], "0"); + + const trialDetail2 = await newTrial(trialDispatcher); + await waitEnvironment(2, previousEnvironments, environmentService, commandChannel, 1); + + await verifyTrialResult(commandChannel, trialDetail1, 0); + + command = await verifyTrialRunning(commandChannel, trialDetail2); + await verifyTrialResult(commandChannel, trialDetail2, 0); + chai.assert.equal(command.data["gpuIndices"], "0"); + + chai.assert.equal(environmentService.testGetEnvironments().size, 2, 'environments'); + const trials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(trials.length, 2, 'trials'); + }); + + it('GPUScheduler enabled REQUIRE_EXCEED_TOTAL, need fail', async () => { + trialDispatcher.setClusterMetadata( + TrialConfigMetadataKey.TRIAL_CONFIG, + JSON.stringify({ + reuseEnvironment: false, + codeDir: currentDir, + gpuNum: 8, + })); + + await newTrial(trialDispatcher); + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel); + await chai.expect(trialRunPromise).rejectedWith(NNIError, "REQUIRE_EXCEED_TOTAL"); + const deferred = new Deferred(); + trialRunPromise = deferred.promise; + deferred.resolve(); + }); + + it('GPUScheduler enabled maxTrialNumberPerGpu=2, 4 trials, 2 gpus', async () => { + trialDispatcher.setClusterMetadata( + TrialConfigMetadataKey.TRIAL_CONFIG, + JSON.stringify({ + reuseEnvironment: false, + codeDir: currentDir, + gpuNum: 1, + })); + const trials = []; + + // last two trials shouldn't be in first environment. + for (let index = 0; index < 6; index++) { + const trial = await newTrial(trialDispatcher); + trials.push(trial); + } + await waitEnvironment(1, previousEnvironments, environmentService, commandChannel, 2, 1, async (environment) => { + environment.maxTrialNumberPerGpu = 2; + }); + await waitEnvironment(2, previousEnvironments, environmentService, commandChannel, 2, 1, async (environment) => { + environment.maxTrialNumberPerGpu = 2; + }); + const gpuIndexMap = new Map(); + for (let index = 0; index < 6; index++) { + const trial = trials[index]; + let command = await verifyTrialRunning(commandChannel, trial); + const gpuIndex = command.data["gpuIndices"]; + const trialNumbers = gpuIndexMap.get(gpuIndex); + if (index < 4) { + if (undefined === trialNumbers) { + gpuIndexMap.set(gpuIndex, 1); + } else { + gpuIndexMap.set(gpuIndex, trialNumbers + 1); + } + } + } + chai.assert.equal(gpuIndexMap.size, 2); + chai.assert.equal(gpuIndexMap.get("0"), 2); + chai.assert.equal(gpuIndexMap.get("1"), 2); + + for (let index = 0; index < 6; index++) { + const trial = trials[index]; + await verifyTrialResult(commandChannel, trial, 0); + } + + chai.assert.equal(environmentService.testGetEnvironments().size, 2); + const listedTrials = await trialDispatcher.listTrialJobs(); + chai.assert.equal(listedTrials.length, 6); + }); +}); diff --git a/src/nni_manager/training_service/reusable/test/utCommandChannel.ts b/src/nni_manager/training_service/reusable/test/utCommandChannel.ts new file mode 100644 index 0000000000..17295b5002 --- /dev/null +++ b/src/nni_manager/training_service/reusable/test/utCommandChannel.ts @@ -0,0 +1,57 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { encodeCommand } from "../../../core/ipcInterface"; +import { Command, CommandChannel, RunnerConnection } from "../commandChannel"; +import { Channel, EnvironmentInformation } from "../environment"; + +class UtRunnerConnection extends RunnerConnection { + +} + +export class UtCommandChannel extends CommandChannel { + private readonly receivedCommands: Command[] = []; + + public get channelName(): Channel { + return "ut"; + } + + public async testSendCommandToTrialDispatcher(environment: EnvironmentInformation, commandType: string, commandData: any) { + const content = encodeCommand(commandType, JSON.stringify(commandData)); + this.log.debug(`UtCommandChannel: env ${environment.id} send test command ${content}`); + this.handleCommand(environment, content.toString("utf8")); + } + + public async testReceiveCommandFromTrialDispatcher(): Promise { + return this.receivedCommands.shift(); + } + + public async config(_key: string, value: any): Promise { + // do nothing + } + + public async start(): Promise { + // do nothing + } + + public async stop(): Promise { + // do nothing + } + + public async run(): Promise { + // do nothing + } + + protected async sendCommandInternal(environment: EnvironmentInformation, message: string): Promise { + const parsedCommands = this.parseCommands(message); + for (const parsedCommand of parsedCommands) { + const command = new Command(environment, parsedCommand[0], parsedCommand[1]); + this.receivedCommands.push(command); + } + } + + protected createRunnerConnection(environment: EnvironmentInformation): RunnerConnection { + // do nothing + return new UtRunnerConnection(environment); + } +} diff --git a/src/nni_manager/training_service/reusable/test/utEnvironmentService.ts b/src/nni_manager/training_service/reusable/test/utEnvironmentService.ts new file mode 100644 index 0000000000..d43bca5cfe --- /dev/null +++ b/src/nni_manager/training_service/reusable/test/utEnvironmentService.ts @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { EnvironmentInformation, EnvironmentService, EnvironmentStatus } from "../environment"; +import { EventEmitter } from "events"; +import { CommandChannel } from "../commandChannel"; +import { UtCommandChannel } from "./utCommandChannel"; + +export class UtEnvironmentService extends EnvironmentService { + private commandChannel: UtCommandChannel | undefined; + private allEnvironments = new Map(); + private hasMoreEnvironmentsInternal = true; + + constructor() { + super(); + } + + public get hasStorageService(): boolean { + // storage service is tested by integration testing. + return false; + } + public get environmentMaintenceLoopInterval(): number { + return 1; + } + + public testSetEnvironmentStatus(environment: EnvironmentInformation, newStatus: EnvironmentStatus): void { + environment.status = newStatus; + } + + public testReset(): void { + this.allEnvironments.clear(); + } + + public testGetEnvironments(): Map { + return this.allEnvironments; + } + + public testGetCommandChannel(): UtCommandChannel { + if (this.commandChannel === undefined) { + throw new Error(`command channel shouldn't be undefined.`); + } + return this.commandChannel; + } + + public testSetNoMoreEnvironment(hasMore: boolean): void { + this.hasMoreEnvironmentsInternal = hasMore; + } + + public get hasMoreEnvironments(): boolean { + return this.hasMoreEnvironmentsInternal; + } + + public createCommandChannel(commandEmitter: EventEmitter): CommandChannel { + this.commandChannel = new UtCommandChannel(commandEmitter) + return this.commandChannel; + } + + public async config(_key: string, _value: string): Promise { + // do nothing + } + + public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise { + // do nothing + } + + public async startEnvironment(environment: EnvironmentInformation): Promise { + if (!this.allEnvironments.has(environment.id)) { + this.allEnvironments.set(environment.id, environment); + environment.status = "WAITING"; + } + } + + public async stopEnvironment(environment: EnvironmentInformation): Promise { + environment.status = "USER_CANCELED"; + } +} diff --git a/src/nni_manager/training_service/reusable/trial.ts b/src/nni_manager/training_service/reusable/trial.ts index 7f1c6323a5..1db9d3fcae 100644 --- a/src/nni_manager/training_service/reusable/trial.ts +++ b/src/nni_manager/training_service/reusable/trial.ts @@ -25,7 +25,7 @@ export class TrialDetail implements TrialJobDetail { // it's used to aggregate node status for multiple node trial public nodes: Map; // assigned GPUs for multi-trial scheduled. - public assignedGpus: GPUInfo[] = []; + public assignedGpus: GPUInfo[] | undefined; public readonly TRIAL_METADATA_DIR = ".nni"; diff --git a/src/nni_manager/training_service/reusable/trialDispatcher.ts b/src/nni_manager/training_service/reusable/trialDispatcher.ts index ff324899c6..1b310ef9e0 100644 --- a/src/nni_manager/training_service/reusable/trialDispatcher.ts +++ b/src/nni_manager/training_service/reusable/trialDispatcher.ts @@ -9,18 +9,20 @@ import * as path from 'path'; import { Writable } from 'stream'; import { String } from 'typescript-string-operations'; import * as component from '../../common/component'; +import { NNIError, NNIErrorNames } from '../../common/errors'; import { getBasePort, getExperimentId, getPlatform } from '../../common/experimentStartupInfo'; import { getLogger, Logger } from '../../common/log'; import { NNIManagerIpConfig, TrainingService, TrialJobApplicationForm, TrialJobMetric, TrialJobStatus } from '../../common/trainingService'; -import { delay, getExperimentRootDir, getLogLevel, getVersion, mkDirPSync, uniqueString, getIPV4Address } from '../../common/utils'; +import { delay, getExperimentRootDir, getIPV4Address, getLogLevel, getVersion, mkDirPSync, uniqueString } from '../../common/utils'; import { GPU_INFO, INITIALIZED, KILL_TRIAL_JOB, NEW_TRIAL_JOB, REPORT_METRIC_DATA, SEND_TRIAL_JOB_PARAMETER, STDOUT, TRIAL_END, VERSION_CHECK } from '../../core/commands'; -import { GPUSummary } from '../../training_service/common/gpuData'; +import { ScheduleResultType } from '../../training_service/common/gpuData'; import { CONTAINER_INSTALL_NNI_SHELL_FORMAT } from '../common/containerJobData'; import { TrialConfig } from '../common/trialConfig'; import { TrialConfigMetadataKey } from '../common/trialConfigMetadataKey'; import { validateCodeDir } from '../common/util'; import { Command, CommandChannel } from './commandChannel'; -import { EnvironmentInformation, EnvironmentService, NodeInfomation, RunnerSettings } from './environment'; +import { EnvironmentInformation, EnvironmentService, NodeInfomation, RunnerSettings, TrialGpuSummary } from './environment'; +import { GpuScheduler } from './gpuScheduler'; import { MountedStorageService } from './storages/mountedStorageService'; import { StorageService } from './storageService'; import { TrialDetail } from './trial'; @@ -32,8 +34,6 @@ import { TrialDetail } from './trial'; **/ @component.Singleton class TrialDispatcher implements TrainingService { - - private readonly NNI_METRICS_PATTERN: string = `NNISDK_MEb'(?.*?)'`; private readonly log: Logger; private readonly isDeveloping: boolean = false; private stopping: boolean = false; @@ -53,6 +53,22 @@ class TrialDispatcher implements TrainingService { private readonly trials: Map; private readonly environments: Map; + // uses to accelerate trial manager loop + // true means there is updates, and trial loop should run a cycle immediately. + private shouldUpdateTrials: boolean = true; + // uses to decide environment assign strategy. + // true means use gpu scheduler to decide if there is free resource for new trial. + // false means one env run one trial in same time. + private enableGpuScheduler: boolean = false; + // uses to save if user like to reuse environment + private reuseEnvironment: boolean = true; + + private gpuScheduler: GpuScheduler; + + // uses to reduce log count. + private isLoggedNoMoreEnvironment: boolean = false; + private isLoggedNoGpuAvailable: boolean = false; + constructor() { this.log = getLogger(); this.trials = new Map(); @@ -71,8 +87,9 @@ class TrialDispatcher implements TrainingService { if (logLevel == "debug" && (fs.existsSync("../../../src/nni_manager") || __dirname.endsWith("src\\nni_manager\\dist\\training_service\\reusable"))) { this.log.debug("log level is debug, and exist code folder, so set to developing mode."); this.isDeveloping = true; - this.runnerSettings.enableGpuCollector = true; } + + this.gpuScheduler = new GpuScheduler(); } public async listTrialJobs(): Promise { @@ -161,7 +178,7 @@ class TrialDispatcher implements TrainingService { const environmentService = component.get(EnvironmentService); this.commandEmitter = new EventEmitter(); - this.commandChannel = environmentService.getCommandChannel(this.commandEmitter); + this.commandChannel = environmentService.createCommandChannel(this.commandEmitter); // TODO it's a hard code of web channel, it needs to be improved. if (this.runnerSettings.nniManagerIP === "" || this.runnerSettings.nniManagerIP === null) { @@ -170,9 +187,6 @@ class TrialDispatcher implements TrainingService { this.runnerSettings.nniManagerPort = getBasePort() + 1; this.runnerSettings.commandChannel = this.commandChannel.channelName; - // for AML channel, other channels can ignore this. - await this.commandChannel.config("MetricEmitter", this.metricsEmitter); - // start channel this.commandEmitter.on("command", (command: Command): void => { this.handleCommand(command).catch((err: Error) => { @@ -251,9 +265,17 @@ class TrialDispatcher implements TrainingService { this.runnerSettings.logCollection = value; break; case TrialConfigMetadataKey.TRIAL_CONFIG: - // TODO to support more storage types by better parameters. this.trialConfig = JSON.parse(value); + if (this.trialConfig.reuseEnvironment !== undefined) { + this.reuseEnvironment = this.trialConfig.reuseEnvironment; + } + if (this.trialConfig.gpuNum !== undefined && this.trialConfig.gpuNum > 0) { + this.log.info(`TrialDispatcher: GPU scheduler is enabled.`) + this.enableGpuScheduler = true; + } + this.runnerSettings.enableGpuCollector = this.enableGpuScheduler; + this.runnerSettings.command = this.trialConfig.command; // Validate to make sure codeDir doesn't have too many files await validateCodeDir(this.trialConfig.codeDir); @@ -275,6 +297,7 @@ class TrialDispatcher implements TrainingService { throw new Error(`TrialDispatcher: commandEmitter shouldn't be undefined in cleanUp.`); } this.stopping = true; + this.shouldUpdateTrials = true; const environmentService = component.get(EnvironmentService); const environments = [...this.environments.values()]; @@ -324,7 +347,8 @@ class TrialDispatcher implements TrainingService { this.log.debug(`set environment ${environment.id} isAlive from ${oldIsAlive} to ${environment.isAlive} due to status is ${environment.status}.`); } }); - await delay(5000); + this.shouldUpdateTrials = true; + await delay(environmentService.environmentMaintenceLoopInterval); } } @@ -332,9 +356,18 @@ class TrialDispatcher implements TrainingService { if (this.commandChannel === undefined) { throw new Error(`TrialDispatcher: commandChannel shouldn't be undefined in trialManagementLoop.`); } + const interval = 1; while (!this.stopping) { - await delay(2000); + let totalInterval = 1000; + while (totalInterval > 0) { + if (this.shouldUpdateTrials) { + this.shouldUpdateTrials = false; + break; + } + totalInterval -= interval; + await delay(interval); + } const toRefreshedTrials: TrialDetail[] = []; for (const trial of this.trials.values()) { @@ -347,7 +380,7 @@ class TrialDispatcher implements TrainingService { continue; } - const waitingTrials: TrialDetail[] = []; + let waitingTrials: TrialDetail[] = []; let liveTrialsCount = 0; for (const trial of toRefreshedTrials) { const currentStatus = trial.status; @@ -396,7 +429,7 @@ class TrialDispatcher implements TrainingService { } this.releaseEnvironment(trial); } else if (environmentStatus !== "RUNNING") { - this.log.error(`found running trial ${trial.id} on '${environment.jobId}' with '${environmentStatus}', set trial to environment status.`); + this.log.error(`found running trial ${trial.id} on '${environment.envId}' with '${environmentStatus}', set trial to environment status.`); this.releaseEnvironment(trial); trial.status = environmentStatus; } else { @@ -412,31 +445,133 @@ class TrialDispatcher implements TrainingService { break; } } + let liveEnvironmentsCount = 0; - const idleEnvironments: EnvironmentInformation[] = []; - this.environments.forEach((environment) => { + const reusableEnvironments: EnvironmentInformation[] = []; + for (const environment of this.environments.values()) { if (environment.isAlive === true) { liveEnvironmentsCount++; - if (environment.status === "RUNNING" && environment.isIdle) { - idleEnvironments.push(environment); + if (environment.status === "RUNNING" && environment.isRunnerReady) { + // if environment is not reusable and used, stop and not count as idle; + if ( + 0 === environment.runningTrialCount && + false === this.reuseEnvironment && + environment.assignedTrialCount > 0 + ) { + const environmentService = component.get(EnvironmentService); + await environmentService.stopEnvironment(environment); + continue; + } + + // if gpu scheduler is not enabled, and there is running trial, skip it. + if (false === this.enableGpuScheduler && environment.runningTrialCount > 0) { + continue; + } + + reusableEnvironments.push(environment); } } - }); - while (idleEnvironments.length > 0 && waitingTrials.length > 0) { - const trial = waitingTrials.shift(); - const idleEnvironment = idleEnvironments.shift(); - if (trial !== undefined && idleEnvironment != undefined) { - await this.assignEnvironment(trial, idleEnvironment); + } + + let neededEnvironmentCount = 0; + if (true === this.enableGpuScheduler) { + let noGpuAvailable: boolean = false; + while (waitingTrials.length > 0) { + // skip following trials, if first trial doesn't find available GPU. + if (true === noGpuAvailable) { + // break loop to try next time. + break; + } + const trial = waitingTrials.shift(); + if (undefined === trial) { + throw new Error(`TrialDispatcher: waiting trial shouldn't be undefined!`); + } + const gpuNum = this.trialConfig ? this.trialConfig.gpuNum : undefined; + const result = this.gpuScheduler.scheduleMachine(reusableEnvironments, gpuNum, trial); + switch (result.resultType) { + case ScheduleResultType.REQUIRE_EXCEED_TOTAL: + { + if (liveEnvironmentsCount == 0) { + this.log.debug(`TrialDispatcher: no live environment, so request one.`); + neededEnvironmentCount = 1; + waitingTrials = []; + this.isLoggedNoGpuAvailable = false; + } else if (reusableEnvironments.length > 0) { + const errorMessage: string = `TrialDispatcher: REQUIRE_EXCEED_TOTAL Required GPU number ${gpuNum} is too large, no machine can meet`; + this.log.error(errorMessage); + throw new NNIError(NNIErrorNames.RESOURCE_NOT_AVAILABLE, errorMessage); + } else { + if (false === this.isLoggedNoGpuAvailable) { + this.log.debug(`TrialDispatcher: wait GPU, live environment ${liveEnvironmentsCount}, no reusable, REQUIRE_EXCEED_TOTAL.`) + this.isLoggedNoGpuAvailable = true; + } + } + break; + } + case ScheduleResultType.TMP_NO_AVAILABLE_GPU: + { + if (false === this.isLoggedNoGpuAvailable) { + this.log.debug(`TrialDispatcher: wait GPU, live environment ${liveEnvironmentsCount}, reusable ${reusableEnvironments.length}, TMP_NO_AVAILABLE_GPU.`) + this.isLoggedNoGpuAvailable = true; + } + + // if some environment is alive, but not ready, no need to create more. + if (liveEnvironmentsCount <= reusableEnvironments.length) { + neededEnvironmentCount = 1; + this.isLoggedNoGpuAvailable = false; + this.log.info(`TrialDispatcher: ${liveEnvironmentsCount} live env, and ${reusableEnvironments.length} reusable, but no GPU available so request a new one.`); + } + noGpuAvailable = true; + } + break + case ScheduleResultType.SUCCEED: + { + const environment = result.environment; + if (undefined === environment) { + throw new Error(`TrialDispatcher: scheduled env shouldn't be undefined!`); + } + trial.assignedGpus = result.gpuIndices; + await this.allocateEnvironment(trial, environment); + this.isLoggedNoGpuAvailable = false; + } + break + default: + throw new Error(`TrialDispatcher: Unknown gpu schecduler type: ${result.resultType}`); + } + } + } else { + while (reusableEnvironments.length > 0 && waitingTrials.length > 0) { + const trial = waitingTrials.shift(); + const idleEnvironment = reusableEnvironments.shift(); + if (trial !== undefined && idleEnvironment != undefined) { + await this.allocateEnvironment(trial, idleEnvironment); + } } + neededEnvironmentCount = liveTrialsCount - liveEnvironmentsCount; } - if (liveEnvironmentsCount < liveTrialsCount) { - this.log.info(`request new environment, since live trials ${liveTrialsCount} ` + - `is more than live environments ${liveEnvironmentsCount}`); - for (let index = 0; index < liveTrialsCount - liveEnvironmentsCount; index++) { - await this.requestEnvironment(); + if (neededEnvironmentCount > 0) { + const environmentService = component.get(EnvironmentService); + let requestedCount = 0; + for (let index = 0; index < neededEnvironmentCount; index++) { + if (true === environmentService.hasMoreEnvironments) { + await this.requestEnvironment(); + requestedCount++; + this.isLoggedNoMoreEnvironment = false; + } else { + if (this.isLoggedNoMoreEnvironment === false) { + this.isLoggedNoMoreEnvironment = true; + this.log.info(`no more environment so far, so skip to request environment.`) + } + } + } + if (environmentService.hasMoreEnvironments === true || requestedCount > 0) { + this.log.info(`requested new environment, live trials: ${liveTrialsCount}, ` + + `live environments: ${liveEnvironmentsCount}, neededEnvironmentCount: ${neededEnvironmentCount}, ` + + `requestedCount: ${requestedCount}`); } } + } } @@ -462,35 +597,51 @@ class TrialDispatcher implements TrainingService { this.environments.set(environment.id, environment); if (environment.status === "FAILED") { - environment.isIdle = false; environment.isAlive = false; - throw new Error(`error on request environment ${environment.jobId}, please check log for more details.`); + throw new Error(`error on request environment ${environment.envId}, please check log for more details.`); } else { - environment.isIdle = true; environment.isAlive = true; } await this.commandChannel.open(environment); - this.log.info(`requested environment ${environment.id} and job id is ${environment.jobId}.`); + this.log.info(`requested environment ${environment.id} and job id is ${environment.envId}.`); } - private async assignEnvironment(trial: TrialDetail, environment: EnvironmentInformation): Promise { + private async allocateEnvironment(trial: TrialDetail, environment: EnvironmentInformation): Promise { if (this.commandChannel === undefined) { - throw new Error(`TrialDispatcher: commandChannel shouldn't be undefined in assignEnvironment.`); + throw new Error(`TrialDispatcher: commandChannel shouldn't be undefined in allocateEnvironment.`); + } + if (this.trialConfig === undefined) { + throw new Error(`TrialDispatcher: trialConfig shouldn't be undefined in allocateEnvironment.`); } if (trial.environment) { - throw new Error(`trial ${trial.id} has assigned environment ${trial.environment.id} already, not assign to ${environment.id}!`); + throw new Error(`TrialDispatcher: trial ${trial.id} has assigned environment ${trial.environment.id} already, not assign to ${environment.id}!`); } - if (environment.isIdle == false) { - throw new Error(`environment ${environment.id} is not idle, and cannot be assigned again!`); + if (environment.runningTrialCount > 0 && false === this.enableGpuScheduler) { + throw new Error(`TrialDispatcher: environment ${environment.id} has running trial, and gpu scheduler is not enabled, it cannot be assigned again!`); } this.log.info(`assigning environment ${environment.id} to trial ${trial.id}.`); - environment.isIdle = false; + // convert assigned gpus to string for nvidia visible settings + // undefined means no constraint, [] means no gpu visible. + let gpuIndices: string | undefined = undefined; + if (undefined !== this.trialConfig.gpuNum) { + const gpuArray: number[] = []; + if (undefined !== trial.assignedGpus) { + trial.assignedGpus.map((value) => { + gpuArray.push(value.index); + }); + } + gpuIndices = gpuArray.join(','); + } + + environment.runningTrialCount++; + environment.assignedTrialCount++; trial.environment = environment; trial.settings = { trialId: trial.id, + gpuIndices: gpuIndices, sequenceId: trial.form.sequenceId, parameter: trial.form.hyperParameters, } @@ -500,13 +651,16 @@ class TrialDispatcher implements TrainingService { } private releaseEnvironment(trial: TrialDetail): void { - if (!trial.environment) { - throw new Error(`environment is not assigned to trial ${trial.id}, and cannot be released!`); + if (undefined === trial.environment) { + throw new Error(`TrialDispatcher: environment is not assigned to trial ${trial.id}, and cannot be released!`); + } + if (trial.environment.runningTrialCount <= 0) { + throw new Error(`TrialDispatcher: environment ${trial.environment.id} has no counted running trial!`); } - if (trial.environment.isIdle) { - throw new Error(`environment ${trial.environment.id} is idle already!`); + if (true === this.enableGpuScheduler) { + this.gpuScheduler.removeGpuReservation(trial); } - trial.environment.isIdle = true; + trial.environment.runningTrialCount--; trial.environment = undefined; } @@ -527,19 +681,20 @@ class TrialDispatcher implements TrainingService { } private async handleStdout(commandData: any): Promise { + const metricPattern: RegExp = /NNISDK_MEb'(?.*a?)'$/gm; const trialLogDir: string = path.join(getExperimentRootDir(), 'trials', commandData["trial"]); mkDirPSync(trialLogDir); const trialLogPath: string = path.join(trialLogDir, 'stdout_log_collection.log'); try { let skipLogging: boolean = false; if (commandData["tag"] === 'trial' && commandData["msg"] !== undefined) { - const message = commandData["msg"]; - const metricsContent: any = message.match(this.NNI_METRICS_PATTERN); - if (metricsContent && metricsContent.groups) { + const message: string = commandData["msg"]; + let metricsContent = metricPattern.exec(message); + while (metricsContent && metricsContent.groups) { const key: string = 'metrics'; const data = metricsContent.groups[key]; - const metricData = JSON.parse('"' + data.split('"').join('\\"') + '"'); - await this.handleMetricData(commandData["trial"], metricData); + await this.handleMetricData(commandData["trial"], data); + metricsContent = metricPattern.exec(message); skipLogging = true; } } @@ -561,7 +716,7 @@ class TrialDispatcher implements TrainingService { } private async handleCommand(command: Command): Promise { - this.log.debug(`TrialDispatcher: env ${command.environment.id} received command ${command.command}, data: ${command.data}`); + this.log.debug(`TrialDispatcher: env ${command.environment.id} received command ${command.command}.`); const environment = command.environment; const data = command.data; const nodeId = data["node"]; @@ -574,9 +729,7 @@ class TrialDispatcher implements TrainingService { break; case INITIALIZED: { - const oldStatus = environment.status; let isAllReady = true; - if (environment.nodeCount > 1) { let node = environment.nodes.get(nodeId); if (node === undefined) { @@ -601,9 +754,9 @@ class TrialDispatcher implements TrainingService { } // single node is always ready to set env status - if (isAllReady && oldStatus === "UNKNOWN") { - environment.status = "RUNNING"; - this.log.info(`TrialDispatcher: env ${environment.id} received initialized message, old status: ${oldStatus}, new status: ${environment.status}.`); + if (isAllReady) { + environment.isRunnerReady = true; + this.log.info(`TrialDispatcher: env ${environment.id} received initialized message and runner is ready, env status: ${environment.status}.`); } } break; @@ -621,7 +774,10 @@ class TrialDispatcher implements TrainingService { } break; case GPU_INFO: - environment.gpuSummary.set(nodeId, (data)); + { + const gpuData = (data); + environment.setGpuSummary(nodeId, gpuData); + } break; case TRIAL_END: { @@ -647,6 +803,7 @@ class TrialDispatcher implements TrainingService { } break; } + this.shouldUpdateTrials = true; } } diff --git a/src/nni_manager/yarn.lock b/src/nni_manager/yarn.lock index d624cf06db..54e1399b8e 100644 --- a/src/nni_manager/yarn.lock +++ b/src/nni_manager/yarn.lock @@ -262,6 +262,10 @@ version "2.3.1" resolved "https://registry.yarnpkg.com/@types/js-base64/-/js-base64-2.3.1.tgz#c39f14f129408a3d96a1105a650d8b2b6eeb4168" +"@types/js-yaml@^3.12.5": + version "3.12.5" + resolved "https://registry.yarnpkg.com/@types/js-yaml/-/js-yaml-3.12.5.tgz#136d5e6a57a931e1cce6f9d8126aa98a9c92a6bb" + "@types/json-schema@^7.0.3": version "7.0.3" resolved "https://registry.yarnpkg.com/@types/json-schema/-/json-schema-7.0.3.tgz#bdfd69d61e464dcc81b25159c270d75a73c1a636" @@ -277,7 +281,6 @@ "@types/minipass@*": version "2.2.0" resolved "https://registry.yarnpkg.com/@types/minipass/-/minipass-2.2.0.tgz#51ad404e8eb1fa961f75ec61205796807b6f9651" - integrity sha512-wuzZksN4w4kyfoOv/dlpov4NOunwutLA/q7uc00xU02ZyUY+aoM5PWIXEKBMnm0NHd4a+N71BMjq+x7+2Af1fg== dependencies: "@types/node" "*" @@ -430,7 +433,6 @@ "@types/tar@^4.0.3": version "4.0.3" resolved "https://registry.yarnpkg.com/@types/tar/-/tar-4.0.3.tgz#e2cce0b8ff4f285293243f5971bd7199176ac489" - integrity sha512-Z7AVMMlkI8NTWF0qGhC4QIX0zkV/+y0J8x7b/RsHrN0310+YNjoJd8UrApCiGBCWtKjxS9QhNqLi2UJNToh5hA== dependencies: "@types/minipass" "*" "@types/node" "*" @@ -1017,7 +1019,6 @@ chownr@^1.1.2, chownr@^1.1.3: chownr@^2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/chownr/-/chownr-2.0.0.tgz#15bfbe53d2eab4cf70f18a8cd68ebe5b3cb1dece" - integrity sha512-bIomtDF5KGpdogkLd9VspvFzk9KfpyyGlS8YFVZl7TGPBHL5snIOnxeshwVgPteQ9b4Eydl+pVbIyE1DcvCWgQ== ci-info@^1.5.0: version "1.6.0" @@ -1912,7 +1913,6 @@ fs-minipass@^1.2.5: fs-minipass@^2.0.0: version "2.1.0" resolved "https://registry.yarnpkg.com/fs-minipass/-/fs-minipass-2.1.0.tgz#7f5036fdbf12c63c169190cbe4199c852271f9fb" - integrity sha512-V/JgOLFCS+R6Vcq0slCuaeWEdNC3ouDlJMNIsacH2VtALiu9mV4LPrHc5cDl8k5aw6J8jwgWWpiTo5RYhmIzvg== dependencies: minipass "^3.0.0" @@ -2331,7 +2331,6 @@ ignore@^4.0.6: ignore@^5.1.4: version "5.1.4" resolved "https://registry.yarnpkg.com/ignore/-/ignore-5.1.4.tgz#84b7b3dbe64552b6ef0eca99f6743dbec6d97adf" - integrity sha512-MzbUSahkTW1u7JpKKjY7LCARd1fU5W2rLdxlM4kdkayuCwZImjkpluF9CM1aLewYJguPDqewLam18Y6AU69A8A== import-fresh@^3.0.0: version "3.2.1" @@ -2650,7 +2649,6 @@ istanbul-lib-source-maps@^4.0.0: istanbul-reports@^3.0.2: version "3.0.2" resolved "https://registry.yarnpkg.com/istanbul-reports/-/istanbul-reports-3.0.2.tgz#d593210e5000683750cb09fc0644e4b6e27fd53b" - integrity sha512-9tZvz7AiR3PEDNGiV9vIouQ/EAcqMXFmkcA1CDFTwOB98OZVDL0PH9glHotf5Ugp6GCOTypfzGWI/OqjWNCRUw== dependencies: html-escaper "^2.0.0" istanbul-lib-report "^3.0.0" @@ -3193,7 +3191,6 @@ minipass@^2.3.5, minipass@^2.8.6, minipass@^2.9.0: minipass@^3.0.0: version "3.1.3" resolved "https://registry.yarnpkg.com/minipass/-/minipass-3.1.3.tgz#7d42ff1f39635482e15f9cdb53184deebd5815fd" - integrity sha512-Mgd2GdMVzY+x3IJ+oHnVM+KG3lA5c8tnabyJKmHSaG2kAGpudxuOf8ToDkhumF7UzME7DecbQE9uOZhNm7PuJg== dependencies: yallist "^4.0.0" @@ -3212,7 +3209,6 @@ minizlib@^1.2.1: minizlib@^2.1.0: version "2.1.0" resolved "https://registry.yarnpkg.com/minizlib/-/minizlib-2.1.0.tgz#fd52c645301ef09a63a2c209697c294c6ce02cf3" - integrity sha512-EzTZN/fjSvifSX0SlqUERCN39o6T40AMarPbv0MrarSFtIITCBh7bi+dU8nxGFHuqs9jdIAeoYoKuQAAASsPPA== dependencies: minipass "^3.0.0" yallist "^4.0.0" @@ -3249,7 +3245,6 @@ mkdirp@^0.5.1: mkdirp@^1.0.3: version "1.0.4" resolved "https://registry.yarnpkg.com/mkdirp/-/mkdirp-1.0.4.tgz#3eb5ed62622756d79a5f0e2a221dfebad75c2f7e" - integrity sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw== mocha@^7.1.1: version "7.1.1" @@ -3707,7 +3702,6 @@ number-is-nan@^1.0.0: nyc@^15.0.0: version "15.0.1" resolved "https://registry.yarnpkg.com/nyc/-/nyc-15.0.1.tgz#bd4d5c2b17f2ec04370365a5ca1fc0ed26f9f93d" - integrity sha512-n0MBXYBYRqa67IVt62qW1r/d9UH/Qtr7SF1w/nQLJ9KxvWF6b2xCHImRAixHN9tnMMYHC2P14uo6KddNGwMgGg== dependencies: "@istanbuljs/load-nyc-config" "^1.0.0" "@istanbuljs/schema" "^0.1.2" @@ -5065,7 +5059,6 @@ tar@^4.4.10, tar@^4.4.12, tar@^4.4.13: tar@^6.0.2: version "6.0.2" resolved "https://registry.yarnpkg.com/tar/-/tar-6.0.2.tgz#5df17813468a6264ff14f766886c622b84ae2f39" - integrity sha512-Glo3jkRtPcvpDlAs/0+hozav78yoXKFr+c4wgw62NNMO3oo4AaJdCo21Uu7lcwr55h39W2XD1LMERc64wtbItg== dependencies: chownr "^2.0.0" fs-minipass "^2.0.0" @@ -5541,7 +5534,6 @@ yallist@^3.0.2, yallist@^3.0.3: yallist@^4.0.0: version "4.0.0" resolved "https://registry.yarnpkg.com/yallist/-/yallist-4.0.0.tgz#9bb92790d9c0effec63be73519e11a35019a3a72" - integrity sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A== yargs-parser@13.1.2, yargs-parser@^13.1.2: version "13.1.2" diff --git a/test/nni_test/nnitest/generate_ts_config.py b/test/nni_test/nnitest/generate_ts_config.py index d3ea1d60ff..c11fb0d213 100644 --- a/test/nni_test/nnitest/generate_ts_config.py +++ b/test/nni_test/nnitest/generate_ts_config.py @@ -35,6 +35,8 @@ def update_training_service_config(args): config[args.ts]['paiConfig']['host'] = args.pai_host if args.pai_token is not None: config[args.ts]['paiConfig']['token'] = args.pai_token + if args.pai_reuse is not None: + config[args.ts]['paiConfig']['reuse'] = args.pai_reuse.lower() == 'true' if args.nni_docker_image is not None: config[args.ts]['trial']['image'] = args.nni_docker_image if args.nni_manager_nfs_mount_path is not None: @@ -101,6 +103,7 @@ def update_training_service_config(args): parser.add_argument("--output_dir", type=str) parser.add_argument("--vc", type=str) parser.add_argument("--pai_token", type=str) + parser.add_argument("--pai_reuse", type=str) parser.add_argument("--pai_storage_config_name", type=str) parser.add_argument("--nni_manager_nfs_mount_path", type=str) parser.add_argument("--container_nfs_mount_path", type=str) diff --git a/test/pipelines/pipelines-it-pai.yml b/test/pipelines/pipelines-it-pai.yml index 6e0ea46403..57a1e1dd52 100644 --- a/test/pipelines/pipelines-it-pai.yml +++ b/test/pipelines/pipelines-it-pai.yml @@ -57,7 +57,7 @@ jobs: echo "TEST_IMG:$TEST_IMG" cd test - python3 nni_test/nnitest/generate_ts_config.py --ts pai --pai_host $(pai_host) --pai_user $(pai_user) --nni_docker_image $TEST_IMG --pai_storage_config_name $(pai_storage_config_name)\ + python3 nni_test/nnitest/generate_ts_config.py --ts pai --pai_reuse $(pai_reuse) --pai_host $(pai_host) --pai_user $(pai_user) --nni_docker_image $TEST_IMG --pai_storage_config_name $(pai_storage_config_name)\ --pai_token $(pai_token) --nni_manager_nfs_mount_path $(nni_manager_nfs_mount_path) --container_nfs_mount_path $(container_nfs_mount_path) --nni_manager_ip $(nni_manager_ip) --vc $(virtual_cluster) PATH=$HOME/.local/bin:$PATH python3 nni_test/nnitest/run_tests.py --config config/integration_tests.yml --ts pai displayName: 'integration test' diff --git a/tools/nni_cmd/config_schema.py b/tools/nni_cmd/config_schema.py index b8bed07fb9..631abcf5cc 100644 --- a/tools/nni_cmd/config_schema.py +++ b/tools/nni_cmd/config_schema.py @@ -14,10 +14,12 @@ def setType(key, valueType): '''check key type''' return And(valueType, error=SCHEMA_TYPE_ERROR % (key, valueType.__name__)) + def setChoice(key, *args): '''check choice''' return And(lambda n: n in args, error=SCHEMA_RANGE_ERROR % (key, str(args))) + def setNumberRange(key, keyType, start, end): '''check number range''' return And( @@ -25,16 +27,19 @@ def setNumberRange(key, keyType, start, end): And(lambda n: start <= n <= end, error=SCHEMA_RANGE_ERROR % (key, '(%s,%s)' % (start, end))), ) + def setPathCheck(key): '''check if path exist''' return And(os.path.exists, error=SCHEMA_PATH_ERROR % key) + class AlgoSchema: """ This class is the schema of 'tuner', 'assessor' and 'advisor' sections of experiment configuraion file. For example: AlgoSchema('tuner') creates the schema of tuner section. """ + def __init__(self, algo_type): """ Parameters: @@ -108,6 +113,7 @@ def validate(self, data): Schema(self.algo_schema).validate(data) self.validate_extras(data, self.algo_type) + common_schema = { 'authorName': setType('authorName', str), 'experimentName': setType('experimentName', str), @@ -138,7 +144,7 @@ def validate(self, data): } common_trial_schema = { - 'trial':{ + 'trial': { 'command': setType('command', str), 'codeDir': setPathCheck('codeDir'), Optional('gpuNum'): setNumberRange('gpuNum', int, 0, 99999), @@ -147,7 +153,7 @@ def validate(self, data): } pai_yarn_trial_schema = { - 'trial':{ + 'trial': { 'command': setType('command', str), 'codeDir': setPathCheck('codeDir'), 'gpuNum': setNumberRange('gpuNum', int, 0, 99999), @@ -156,10 +162,10 @@ def validate(self, data): 'image': setType('image', str), Optional('authFile'): And(os.path.exists, error=SCHEMA_PATH_ERROR % 'authFile'), Optional('shmMB'): setType('shmMB', int), - Optional('dataDir'): And(Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'),\ - error='ERROR: dataDir format error, dataDir format is hdfs://xxx.xxx.xxx.xxx:xxx'), - Optional('outputDir'): And(Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'),\ - error='ERROR: outputDir format error, outputDir format is hdfs://xxx.xxx.xxx.xxx:xxx'), + Optional('dataDir'): And(Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'), + error='ERROR: dataDir format error, dataDir format is hdfs://xxx.xxx.xxx.xxx:xxx'), + Optional('outputDir'): And(Regex(r'hdfs://(([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(/.*)?'), + error='ERROR: outputDir format error, outputDir format is hdfs://xxx.xxx.xxx.xxx:xxx'), Optional('virtualCluster'): setType('virtualCluster', str), Optional('nasMode'): setChoice('nasMode', 'classic_mode', 'enas_mode', 'oneshot_mode', 'darts_mode'), Optional('portList'): [{ @@ -184,7 +190,7 @@ def validate(self, data): pai_trial_schema = { - 'trial':{ + 'trial': { 'codeDir': setPathCheck('codeDir'), 'nniManagerNFSMountPath': setPathCheck('nniManagerNFSMountPath'), 'containerNFSMountPath': setType('containerNFSMountPath', str), @@ -200,21 +206,21 @@ def validate(self, data): } pai_config_schema = { - 'paiConfig': Or({ - 'userName': setType('userName', str), - 'passWord': setType('passWord', str), - 'host': setType('host', str), - Optional('reuse'): setType('reuse', bool) - }, { + 'paiConfig': { 'userName': setType('userName', str), - 'token': setType('token', str), + Or('passWord', 'token', only_one=True): str, 'host': setType('host', str), - Optional('reuse'): setType('reuse', bool) - }) + Optional('reuse'): setType('reuse', bool), + Optional('gpuNum'): setNumberRange('gpuNum', int, 0, 99999), + Optional('cpuNum'): setNumberRange('cpuNum', int, 0, 99999), + Optional('memoryMB'): setType('memoryMB', int), + Optional('maxTrialNumPerGpu'): setType('maxTrialNumPerGpu', int), + Optional('useActiveGpu'): setType('useActiveGpu', bool), + } } dlts_trial_schema = { - 'trial':{ + 'trial': { 'command': setType('command', str), 'codeDir': setPathCheck('codeDir'), 'gpuNum': setNumberRange('gpuNum', int, 0, 99999), @@ -235,7 +241,7 @@ def validate(self, data): } aml_trial_schema = { - 'trial':{ + 'trial': { 'codeDir': setPathCheck('codeDir'), 'command': setType('command', str), 'image': setType('image', str), @@ -252,7 +258,7 @@ def validate(self, data): } kubeflow_trial_schema = { - 'trial':{ + 'trial': { 'codeDir': setPathCheck('codeDir'), Optional('nasMode'): setChoice('nasMode', 'classic_mode', 'enas_mode', 'oneshot_mode', 'darts_mode'), Optional('ps'): { @@ -273,7 +279,7 @@ def validate(self, data): 'image': setType('image', str), Optional('privateRegistryAuthPath'): And(os.path.exists, error=SCHEMA_PATH_ERROR % 'privateRegistryAuthPath') }, - Optional('worker'):{ + Optional('worker'): { 'replicas': setType('replicas', int), 'command': setType('command', str), 'gpuNum': setNumberRange('gpuNum', int, 0, 99999), @@ -286,7 +292,7 @@ def validate(self, data): } kubeflow_config_schema = { - 'kubeflowConfig':Or({ + 'kubeflowConfig': Or({ 'operator': setChoice('operator', 'tf-operator', 'pytorch-operator'), 'apiVersion': setType('apiVersion', str), Optional('storage'): setChoice('storage', 'nfs', 'azureStorage'), @@ -299,23 +305,23 @@ def validate(self, data): 'apiVersion': setType('apiVersion', str), Optional('storage'): setChoice('storage', 'nfs', 'azureStorage'), 'keyVault': { - 'vaultName': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),\ - error='ERROR: vaultName format error, vaultName support using (0-9|a-z|A-Z|-)'), - 'name': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),\ - error='ERROR: name format error, name support using (0-9|a-z|A-Z|-)') + 'vaultName': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'), + error='ERROR: vaultName format error, vaultName support using (0-9|a-z|A-Z|-)'), + 'name': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'), + error='ERROR: name format error, name support using (0-9|a-z|A-Z|-)') }, 'azureStorage': { - 'accountName': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,31}'),\ - error='ERROR: accountName format error, accountName support using (0-9|a-z|A-Z|-)'), - 'azureShare': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,63}'),\ - error='ERROR: azureShare format error, azureShare support using (0-9|a-z|A-Z|-)') + 'accountName': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,31}'), + error='ERROR: accountName format error, accountName support using (0-9|a-z|A-Z|-)'), + 'azureShare': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,63}'), + error='ERROR: azureShare format error, azureShare support using (0-9|a-z|A-Z|-)') }, Optional('uploadRetryCount'): setNumberRange('uploadRetryCount', int, 1, 99999) }) } frameworkcontroller_trial_schema = { - 'trial':{ + 'trial': { 'codeDir': setPathCheck('codeDir'), 'taskRoles': [{ 'name': setType('name', str), @@ -335,7 +341,7 @@ def validate(self, data): } frameworkcontroller_config_schema = { - 'frameworkcontrollerConfig':Or({ + 'frameworkcontrollerConfig': Or({ Optional('storage'): setChoice('storage', 'nfs', 'azureStorage'), Optional('serviceAccountName'): setType('serviceAccountName', str), 'nfs': { @@ -346,23 +352,23 @@ def validate(self, data): Optional('storage'): setChoice('storage', 'nfs', 'azureStorage'), Optional('serviceAccountName'): setType('serviceAccountName', str), 'keyVault': { - 'vaultName': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),\ - error='ERROR: vaultName format error, vaultName support using (0-9|a-z|A-Z|-)'), - 'name': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'),\ - error='ERROR: name format error, name support using (0-9|a-z|A-Z|-)') + 'vaultName': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'), + error='ERROR: vaultName format error, vaultName support using (0-9|a-z|A-Z|-)'), + 'name': And(Regex('([0-9]|[a-z]|[A-Z]|-){1,127}'), + error='ERROR: name format error, name support using (0-9|a-z|A-Z|-)') }, 'azureStorage': { - 'accountName': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,31}'),\ - error='ERROR: accountName format error, accountName support using (0-9|a-z|A-Z|-)'), - 'azureShare': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,63}'),\ - error='ERROR: azureShare format error, azureShare support using (0-9|a-z|A-Z|-)') + 'accountName': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,31}'), + error='ERROR: accountName format error, accountName support using (0-9|a-z|A-Z|-)'), + 'azureShare': And(Regex('([0-9]|[a-z]|[A-Z]|-){3,63}'), + error='ERROR: azureShare format error, azureShare support using (0-9|a-z|A-Z|-)') }, Optional('uploadRetryCount'): setNumberRange('uploadRetryCount', int, 1, 99999) }) } machine_list_schema = { - 'machineList':[Or( + 'machineList': [Or( { 'ip': setType('ip', str), Optional('port'): setNumberRange('port', int, 1, 65535), @@ -395,6 +401,7 @@ def validate(self, data): 'dlts': Schema({**common_schema, **dlts_trial_schema, **dlts_config_schema}), } + class NNIConfigSchema: def validate(self, data): train_service = data['trainingServicePlatform'] @@ -483,19 +490,25 @@ def validate_pai_config_path(self, experiment_config): if not taskRoles_dict: raise SchemaError('Please set taskRoles in paiConfigPath config file!') else: - pai_trial_fields_required_list = ['image', 'gpuNum', 'cpuNum', 'memoryMB', 'paiStorageConfigName', 'command'] + pai_trial_fields_required_list = ['image', 'paiStorageConfigName', 'command'] for trial_field in pai_trial_fields_required_list: if experiment_config['trial'].get(trial_field) is None: raise SchemaError('Please set {0} in trial configuration,\ or set additional pai configuration file path in paiConfigPath!'.format(trial_field)) + pai_resource_fields_required_list = ['gpuNum', 'cpuNum', 'memoryMB'] + for required_field in pai_resource_fields_required_list: + if experiment_config['trial'].get(required_field) is None and \ + experiment_config['paiConfig'].get(required_field) is None: + raise SchemaError('Please set {0} in trial or paiConfig configuration,\ + or set additional pai configuration file path in paiConfigPath!'.format(required_field)) def validate_pai_trial_conifg(self, experiment_config): '''validate the trial config in pai platform''' if experiment_config.get('trainingServicePlatform') in ['pai', 'paiYarn']: if experiment_config.get('trial').get('shmMB') and \ - experiment_config['trial']['shmMB'] > experiment_config['trial']['memoryMB']: + experiment_config['trial']['shmMB'] > experiment_config['trial']['memoryMB']: raise SchemaError('shmMB should be no more than memoryMB!') - #backward compatibility + # backward compatibility warning_information = '{0} is not supported in NNI anymore, please remove the field in config file!\ please refer https://github.com/microsoft/nni/blob/master/docs/en_US/TrainingService/PaiMode.md#run-an-experiment\ for the practices of how to get data and output model in trial code' @@ -508,6 +521,6 @@ def validate_pai_trial_conifg(self, experiment_config): def validate_eth0_device(self, experiment_config): '''validate whether the machine has eth0 device''' if experiment_config.get('trainingServicePlatform') not in ['local'] \ - and not experiment_config.get('nniManagerIp') \ - and 'eth0' not in netifaces.interfaces(): + and not experiment_config.get('nniManagerIp') \ + and 'eth0' not in netifaces.interfaces(): raise SchemaError('This machine does not contain eth0 network device, please set nniManagerIp in config file!') diff --git a/tools/nni_trial_tool/base_channel.py b/tools/nni_trial_tool/base_channel.py index c1ce564ba8..b9d3392abc 100644 --- a/tools/nni_trial_tool/base_channel.py +++ b/tools/nni_trial_tool/base_channel.py @@ -57,7 +57,11 @@ def open(self): def close(self): self.is_running = False - self._inner_close() + try: + self._inner_close() + except Exception as err: + # ignore any error on closing + print("error on closing channel: %s" % err) def send(self, command, data): """Send command to Training Service. diff --git a/tools/nni_trial_tool/log_utils.py b/tools/nni_trial_tool/log_utils.py index 20848d9804..8d5b3d94c0 100644 --- a/tools/nni_trial_tool/log_utils.py +++ b/tools/nni_trial_tool/log_utils.py @@ -82,7 +82,11 @@ def __init__(self, syslog_host, syslog_port, tag, std_output_type, log_collectio ''' constructor ''' - self.logger = logging.getLogger('nni_syslog_{}'.format(tag)) + logger_name = 'nni_syslog_{}'.format(tag) + # to prevent multiple trial logged in same logger + if trial_id is not None: + logger_name = '{}_{}'.format(logger_name, trial_id) + self.logger = logging.getLogger(logger_name) self.log_level = log_level self.logger.setLevel(self.log_level) self.pipeReader = None diff --git a/tools/nni_trial_tool/trial.py b/tools/nni_trial_tool/trial.py index e2753b1518..6f81b2427e 100644 --- a/tools/nni_trial_tool/trial.py +++ b/tools/nni_trial_tool/trial.py @@ -86,11 +86,17 @@ def run(self): break time.sleep(0.1) + trial_command = self.args.trial_command + + gpuIndices = self.data.get('gpuIndices') + if (gpuIndices is not None): + trial_command = 'CUDA_VISIBLE_DEVICES="%s " %s' % (gpuIndices, trial_command) + self.log_pipe_stdout = self.trial_syslogger_stdout.get_pipelog_reader() - self.process = Popen(self.args.trial_command, shell=True, stdout=self.log_pipe_stdout, + self.process = Popen(trial_command, shell=True, stdout=self.log_pipe_stdout, stderr=self.log_pipe_stdout, cwd=trial_code_dir, env=dict(environ)) nni_log(LogType.Info, '{0}: spawns a subprocess (pid {1}) to run command: {2}'. - format(self.name, self.process.pid, shlex.split(self.args.trial_command))) + format(self.name, self.process.pid, shlex.split(trial_command))) def save_parameter_file(self, command_data): parameters = command_data["parameters"] diff --git a/tools/nni_trial_tool/web_channel.py b/tools/nni_trial_tool/web_channel.py index a68be47908..87901e1163 100644 --- a/tools/nni_trial_tool/web_channel.py +++ b/tools/nni_trial_tool/web_channel.py @@ -37,9 +37,9 @@ def _inner_open(self): def _inner_close(self): if self.client is not None: self.client.close() - if self._event_loop.is_running(): - self._event_loop.close() self.client = None + if self._event_loop.is_running(): + self._event_loop.stop() self._event_loop = None def _inner_send(self, message):