Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
Refactor code storage logic for trial (#2403)
Browse files Browse the repository at this point in the history
  • Loading branch information
SparkSnail authored May 19, 2020
1 parent bd7edf3 commit d7456c1
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

'use strict';

import * as assert from 'assert';
import * as cpp from 'child-process-promise';
import * as fs from 'fs';
import * as path from 'path';
Expand Down Expand Up @@ -72,6 +73,11 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
this.kubernetesRestServerPort = restServer.clusterRestServerPort;
}

// wait upload of code Dir to finish
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
}

const trialJobId: string = uniqueString(5);
// Set trial's NFS working folder
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
Expand All @@ -81,8 +87,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
this.generateContainerPort();
await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, form);

//upload code files
const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
//wait upload of script files to finish
const trialJobOutputUrl: string = await this.uploadFolder(trialLocalTempFolder, `nni/${getExperimentId()}/${trialJobId}`);
let initStatus: TrialJobStatus = 'WAITING';
if (!trialJobOutputUrl) {
initStatus = 'FAILED';
Expand Down Expand Up @@ -151,6 +157,8 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
// Validate to make sure codeDir doesn't have too many files
try {
await validateCodeDir(this.fcTrialConfig.codeDir);
//upload codeDir to storage
this.copyExpCodeDirPromise = this.uploadFolder(this.fcTrialConfig.codeDir, `nni/${getExperimentId()}/nni-code`);
} catch (error) {
this.log.error(error);

Expand All @@ -171,41 +179,31 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
}

/**
* upload code files to nfs or azureStroage
* @param trialJobId
* @param trialLocalTempFolder
* return: trialJobOutputUrl
* upload local folder to nfs or azureStroage
*/
private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> {
private async uploadFolder(srcDirectory: string, destDirectory: string): Promise<string> {
if (this.fcClusterConfig === undefined) {
throw new Error('Kubeflow Cluster config is not initialized');
}

if (this.fcTrialConfig === undefined) {
throw new Error('Kubeflow trial config is not initialized');
}

let trialJobOutputUrl: string = '';
assert(this.fcClusterConfig.storage === undefined
|| this.fcClusterConfig.storage === 'azureStorage'
|| this.fcClusterConfig.storage === 'nfs');

if (this.fcClusterConfig.storageType === 'azureStorage') {
const azureFrameworkControllerClusterConfig: FrameworkControllerClusterConfigAzure =
<FrameworkControllerClusterConfigAzure>this.fcClusterConfig;
trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.fcTrialConfig.codeDir,
azureFrameworkControllerClusterConfig.uploadRetryCount);
} else if (this.fcClusterConfig.storageType === 'nfs') {
const nfsFrameworkControllerClusterConfig: FrameworkControllerClusterConfigNFS =
<FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
// Creat work dir for current trial in NFS directory
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
// Copy code files from local dir to NFS mounted dir
await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
// Copy codeDir to NFS mounted dir
await cpp.exec(`cp -r ${this.fcTrialConfig.codeDir}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
const nfsConfig: NFSConfig = nfsFrameworkControllerClusterConfig.nfs;
trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`;
if (this.fcClusterConfig.storage === 'azureStorage') {
if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized');
}
const fcClusterConfigAzure: FrameworkControllerClusterConfigAzure = <FrameworkControllerClusterConfigAzure>this.fcClusterConfig;
return await this.uploadFolderToAzureStorage(srcDirectory, destDirectory, fcClusterConfigAzure.uploadRetryCount);
} else if (this.fcClusterConfig.storage === 'nfs' || this.fcClusterConfig.storage === undefined) {
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/${destDirectory}`);
await cpp.exec(`cp -r ${srcDirectory}/* ${this.trialLocalNFSTempFolder}/${destDirectory}/.`);
const fcClusterConfigNFS: FrameworkControllerClusterConfigNFS = <FrameworkControllerClusterConfigNFS>this.fcClusterConfig;
const nfsConfig: NFSConfig = fcClusterConfigNFS.nfs;
return `nfs://${nfsConfig.server}:${destDirectory}`;
}

return Promise.resolve(trialJobOutputUrl);
return '';
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,20 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
const restServer: KubeflowJobRestServer = component.get(KubeflowJobRestServer);
this.kubernetesRestServerPort = restServer.clusterRestServerPort;
}

// upload code Dir to storage
if (this.copyExpCodeDirPromise !== undefined) {
await this.copyExpCodeDirPromise;
}

const trialJobId: string = uniqueString(5);
const trialWorkingFolder: string = path.join(this.CONTAINER_MOUNT_PATH, 'nni', getExperimentId(), trialJobId);
const kubeflowJobName: string = `nni-exp-${this.experimentId}-trial-${trialJobId}`.toLowerCase();
const trialLocalTempFolder: string = path.join(getExperimentRootDir(), 'trials-local', trialJobId);
//prepare the runscript
await this.prepareRunScript(trialLocalTempFolder, trialJobId, trialWorkingFolder, form);
//upload files to sotrage
const trialJobOutputUrl: string = await this.uploadCodeFiles(trialJobId, trialLocalTempFolder);
//upload script files to sotrage
const trialJobOutputUrl: string = await this.uploadFolder(trialLocalTempFolder, `nni/${getExperimentId()}/${trialJobId}`);
let initStatus: TrialJobStatus = 'WAITING';
if (!trialJobOutputUrl) {
initStatus = 'FAILED';
Expand Down Expand Up @@ -152,6 +158,8 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
// Validate to make sure codeDir doesn't have too many files
try {
await validateCodeDir(this.kubeflowTrialConfig.codeDir);
//upload codeDir to storage
this.copyExpCodeDirPromise = this.uploadFolder(this.kubeflowTrialConfig.codeDir, `nni/${getExperimentId()}/nni-code`);
} catch (error) {
this.log.error(error);

Expand All @@ -172,12 +180,9 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
}

/**
* upload code files to nfs or azureStroage
* @param trialJobId
* @param trialLocalTempFolder
* return: trialJobOutputUrl
* upload local folder to nfs or azureStroage
*/
private async uploadCodeFiles(trialJobId: string, trialLocalTempFolder: string): Promise<string> {
private async uploadFolder(srcDirectory: string, destDirectory: string): Promise<string> {
if (this.kubeflowClusterConfig === undefined) {
throw new Error('Kubeflow Cluster config is not initialized');
}
Expand All @@ -186,8 +191,6 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
throw new Error('Kubeflow Trial config is not initialized');
}

let trialJobOutputUrl: string = '';

assert(this.kubeflowClusterConfig.storage === undefined
|| this.kubeflowClusterConfig.storage === 'azureStorage'
|| this.kubeflowClusterConfig.storage === 'nfs');
Expand All @@ -197,20 +200,15 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
throw new Error('azureStorageClient is not initialized');
}
const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = <KubeflowClusterConfigAzure>this.kubeflowClusterConfig;
trialJobOutputUrl = await this.uploadFilesToAzureStorage(trialJobId, trialLocalTempFolder, this.kubeflowTrialConfig.codeDir, azureKubeflowClusterConfig.uploadRetryCount);
return await this.uploadFolderToAzureStorage(srcDirectory, destDirectory, azureKubeflowClusterConfig.uploadRetryCount);
} else if (this.kubeflowClusterConfig.storage === 'nfs' || this.kubeflowClusterConfig.storage === undefined) {
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/${destDirectory}`);
await cpp.exec(`cp -r ${srcDirectory}/* ${this.trialLocalNFSTempFolder}/${destDirectory}/.`);
const nfsKubeflowClusterConfig: KubeflowClusterConfigNFS = <KubeflowClusterConfigNFS>this.kubeflowClusterConfig;
// Creat work dir for current trial in NFS directory
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}`);
// Copy script files from local dir to NFS mounted dir
await cpp.exec(`cp -r ${trialLocalTempFolder}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
// Copy codeDir to NFS mounted dir
await cpp.exec(`cp -r ${this.kubeflowTrialConfig.codeDir}/* ${this.trialLocalNFSTempFolder}/nni/${getExperimentId()}/${trialJobId}/.`);
const nfsConfig: NFSConfig = nfsKubeflowClusterConfig.nfs;
trialJobOutputUrl = `nfs://${nfsConfig.server}:${path.join(nfsConfig.path, 'nni', getExperimentId(), trialJobId, 'output')}`;
return `nfs://${nfsConfig.server}:${destDirectory}`;
}

return Promise.resolve(trialJobOutputUrl);
return '';
}

private async prepareRunScript(trialLocalTempFolder: string, trialJobId: string, trialWorkingFolder: string,
Expand Down
4 changes: 2 additions & 2 deletions src/nni_manager/training_service/kubernetes/kubernetesData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class KubernetesTrialJobDetail implements TrialJobDetail {
export const kubernetesScriptFormat: string =
`#!/bin/bash
export NNI_PLATFORM={0}
export NNI_SYS_DIR=$PWD/nni/{1}
export NNI_SYS_DIR={1}
export NNI_OUTPUT_DIR={2}
export MULTI_PHASE=false
export NNI_TRIAL_JOB_ID={3}
Expand All @@ -49,7 +49,7 @@ export NNI_TRIAL_SEQ_ID={6}
{7}
mkdir -p $NNI_SYS_DIR
mkdir -p $NNI_OUTPUT_DIR
cp -rT $NNI_CODE_DIR $NNI_SYS_DIR
cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR
cd $NNI_SYS_DIR
sh install_nni.sh
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ abstract class KubernetesTrainingService {
protected kubernetesClusterConfig?: KubernetesClusterConfig;
protected versionCheck: boolean = true;
protected logCollection: string;
protected copyExpCodeDirPromise?: Promise<string>;
protected expContainerCodeFolder: string;

constructor() {
this.log = getLogger();
Expand All @@ -57,6 +59,7 @@ abstract class KubernetesTrainingService {
this.trialLocalNFSTempFolder = path.join(getExperimentRootDir(), 'trials-nfs-tmp');
this.experimentId = getExperimentId();
this.CONTAINER_MOUNT_PATH = '/tmp/mount';
this.expContainerCodeFolder = path.join(this.CONTAINER_MOUNT_PATH, 'nni', this.experimentId, 'nni-code');
this.genericK8sClient = new GeneralK8sClient();
this.logCollection = 'none';
}
Expand Down Expand Up @@ -272,11 +275,11 @@ abstract class KubernetesTrainingService {
const runScript: string = String.Format(
kubernetesScriptFormat,
platform,
trialJobId,
trialWorkingFolder,
path.join(trialWorkingFolder, 'output', `${roleName}_output`),
trialJobId,
getExperimentId(),
trialWorkingFolder,
this.expContainerCodeFolder,
trialSequenceId,
nvidiaScript,
command,
Expand Down Expand Up @@ -329,51 +332,45 @@ abstract class KubernetesTrainingService {
);
return registrySecretName;
}

protected async uploadFilesToAzureStorage(trialJobId: string, trialLocalTempFolder: string, codeDir: string, uploadRetryCount: number | undefined): Promise<string> {

/**
* upload local directory to azureStorage
* @param srcDirectory the source directory of local folder
* @param destDirectory the target directory in azure
* @param uploadRetryCount the retry time when upload failed
*/
protected async uploadFolderToAzureStorage(srcDirectory: string, destDirectory: string, uploadRetryCount: number | undefined): Promise<string> {
if (this.azureStorageClient === undefined) {
throw new Error('azureStorageClient is not initialized');
}
let trialJobOutputUrl: string = '';
let retryCount: number = 1;
if(uploadRetryCount) {
retryCount = uploadRetryCount;
}
let resultUploadNNIScript: boolean = false;
let resultUploadCodeFile: boolean = false;
let uploadSuccess: boolean = false;
let folderUriInAzure = '';
try {
do {
//upload local files, including scripts for running the trial and configuration (e.g., hyperparameters) for the trial, to azure storage
if(!resultUploadNNIScript) {
resultUploadNNIScript = await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
`${trialLocalTempFolder}`);
}
//upload code files to azure storage
if(!resultUploadCodeFile) {
resultUploadCodeFile = await AzureStorageClientUtility.uploadDirectory(this.azureStorageClient,
`nni/${getExperimentId()}/${trialJobId}`, this.azureStorageShare,
`${codeDir}`);
}
if (resultUploadNNIScript && resultUploadCodeFile) {
trialJobOutputUrl = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}` +
`/${path.join('nni', getExperimentId(), trialJobId, 'output')}`;
break;
} else {
uploadSuccess = await AzureStorageClientUtility.uploadDirectory(
this.azureStorageClient,
`${destDirectory}`,
this.azureStorageShare,
`${srcDirectory}`);
if (!uploadSuccess) {
//wait for 5 seconds to re-upload files
await delay(5000);
this.log.info('Upload failed, Retry: upload files to azure-storage');
} else {
folderUriInAzure = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}/${destDirectory}`;
break;
}
} while (retryCount-- >= 0)
} catch (error) {
this.log.error(error);
//return a empty url when got error
return Promise.resolve("");
}
if(!trialJobOutputUrl) {
this.log.info(`Retry-count is used up, upload files to azureStorage for trial ${trialJobId} failed!`);
return Promise.resolve('');
}
return Promise.resolve(trialJobOutputUrl);
return Promise.resolve(folderUriInAzure);
}

}
Expand Down
39 changes: 23 additions & 16 deletions src/nni_manager/training_service/local/localTrainingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,21 +361,25 @@ class LocalTrainingService implements TrainingService {
trialJobDetail: TrialJobDetail,
resource: { gpuIndices: number[] },
gpuNum: number | undefined): { key: string; value: string }[] {
const envVariables: { key: string; value: string }[] = [
{ key: 'NNI_PLATFORM', value: 'local' },
{ key: 'NNI_EXP_ID', value: this.experimentId },
{ key: 'NNI_SYS_DIR', value: trialJobDetail.workingDirectory },
{ key: 'NNI_TRIAL_JOB_ID', value: trialJobDetail.id },
{ key: 'NNI_OUTPUT_DIR', value: trialJobDetail.workingDirectory },
{ key: 'NNI_TRIAL_SEQ_ID', value: trialJobDetail.form.sequenceId.toString() },
{ key: 'MULTI_PHASE', value: this.isMultiPhase.toString() }
];
if (gpuNum !== undefined) {
envVariables.push({
key: 'CUDA_VISIBLE_DEVICES',
value: this.gpuScheduler === undefined ? '-1' : resource.gpuIndices.join(',')
});
}
if (this.localTrialConfig === undefined) {
throw new Error('localTrialConfig is not initialized!');
}
const envVariables: { key: string; value: string }[] = [
{ key: 'NNI_PLATFORM', value: 'local' },
{ key: 'NNI_EXP_ID', value: this.experimentId },
{ key: 'NNI_SYS_DIR', value: trialJobDetail.workingDirectory },
{ key: 'NNI_TRIAL_JOB_ID', value: trialJobDetail.id },
{ key: 'NNI_OUTPUT_DIR', value: trialJobDetail.workingDirectory },
{ key: 'NNI_TRIAL_SEQ_ID', value: trialJobDetail.form.sequenceId.toString() },
{ key: 'MULTI_PHASE', value: this.isMultiPhase.toString() },
{ key: 'NNI_CODE_DIR', value: this.localTrialConfig.codeDir}
];
if (gpuNum !== undefined) {
envVariables.push({
key: 'CUDA_VISIBLE_DEVICES',
value: this.gpuScheduler === undefined ? '-1' : resource.gpuIndices.join(',')
});
}

return envVariables;
}
Expand Down Expand Up @@ -473,12 +477,16 @@ class LocalTrainingService implements TrainingService {
private getScript(localTrialConfig: TrialConfig, workingDirectory: string): string[] {
const script: string[] = [];
if (process.platform === 'win32') {
script.push(`Copy-Item $env:NNI_CODE_DIR\\* -Destination $env:NNI_SYS_DIR -Recurse`);
script.push(`cd $env:NNI_SYS_DIR`);
script.push(
`cmd.exe /c ${localTrialConfig.command} 2>"${path.join(workingDirectory, 'stderr')}"`,
`$NOW_DATE = [int64](([datetime]::UtcNow)-(get-date "1/1/1970")).TotalSeconds`,
`$NOW_DATE = "$NOW_DATE" + (Get-Date -Format fff).ToString()`,
`Write $LASTEXITCODE " " $NOW_DATE | Out-File "${path.join(workingDirectory, '.nni', 'state')}" -NoNewline -encoding utf8`);
} else {
script.push(`cp -r $NNI_CODE_DIR/. $NNI_SYS_DIR`);
script.push(`cd $NNI_SYS_DIR`);
script.push(`eval ${localTrialConfig.command} 2>"${path.join(workingDirectory, 'stderr')}"`);
if (process.platform === 'darwin') {
// https://superuser.com/questions/599072/how-to-get-bash-execution-time-in-milliseconds-under-mac-os-x
Expand Down Expand Up @@ -506,7 +514,6 @@ class LocalTrainingService implements TrainingService {
if (process.platform !== 'win32') {
runScriptContent.push('#!/bin/bash');
}
runScriptContent.push(`cd '${this.localTrialConfig.codeDir}'`);
for (const variable of variables) {
runScriptContent.push(setEnvironmentVariable(variable));
}
Expand Down
Loading

0 comments on commit d7456c1

Please sign in to comment.