-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Support remote training service use reuse mode #2923
Changes from 44 commits
dcd2ffd
3b8b6fb
916e444
caeffb8
57c300e
65660e6
9376d6a
5fef3cf
5544ae8
f9fdfee
c5e26ef
10a04ba
aa64fe6
4ed907f
c6a5f8c
68abe2f
c2b50d2
14e9619
f69e206
a5bb753
12ef0aa
7600a0f
ddcf229
bd327d4
c4f6e66
da2d1c4
529c29f
2a386d3
169e65f
88f8c1b
870b2d0
60ff833
c4fa1c3
4e56975
a428853
3b57f94
8d106ba
492ff8e
41e3ebd
9b5b3f7
1dabc88
c68a7f3
abd660c
d998599
c8ec30a
ebc12d2
e772871
863100c
1387f38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,320 @@ | ||
// Copyright (c) Microsoft Corporation. | ||
// Licensed under the MIT license. | ||
|
||
'use strict'; | ||
|
||
import * as fs from 'fs'; | ||
import * as path from 'path'; | ||
import { Deferred } from 'ts-deferred'; | ||
import * as component from '../../../common/component'; | ||
import { getExperimentId } from '../../../common/experimentStartupInfo'; | ||
import { getLogger, Logger } from '../../../common/log'; | ||
import { EnvironmentInformation, EnvironmentService } from '../environment'; | ||
import { ObservableTimer } from '../../../common/observableTimer'; | ||
import { | ||
getExperimentRootDir, | ||
} from '../../../common/utils'; | ||
import { TrialConfig } from '../../common/trialConfig'; | ||
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey'; | ||
import { execMkdir, validateCodeDir } from '../../common/util'; | ||
import { | ||
ExecutorManager, RemoteMachineMeta, | ||
} from '../../remote_machine/remoteMachineData'; | ||
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor'; | ||
import { RemoteMachineEnvironmentInformation } from '../remote/remoteConfig'; | ||
|
||
|
||
@component.Singleton | ||
export class RemoteEnvironmentService extends EnvironmentService { | ||
|
||
private readonly initExecutorId = "initConnection"; | ||
private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>; | ||
private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineMeta, Promise<void>>; | ||
private readonly environmentExecutorManagerMap: Map<string, ExecutorManager>; | ||
squirrelsc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private readonly remoteMachineMetaOccupiedMap: Map<RemoteMachineMeta, boolean>; | ||
private trialConfig: TrialConfig | undefined; | ||
private readonly log: Logger; | ||
private sshConnectionPromises: any[]; | ||
private experimentRootDir: string; | ||
private experimentId: string; | ||
|
||
constructor() { | ||
super(); | ||
this.experimentId = getExperimentId(); | ||
this.environmentExecutorManagerMap = new Map<string, ExecutorManager>(); | ||
squirrelsc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineMeta, Promise<void>>(); | ||
this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>(); | ||
this.remoteMachineMetaOccupiedMap = new Map<RemoteMachineMeta, boolean>(); | ||
this.sshConnectionPromises = []; | ||
this.experimentRootDir = getExperimentRootDir(); | ||
this.experimentId = getExperimentId(); | ||
this.log = getLogger(); | ||
} | ||
|
||
public get prefetchedEnvironmentCount(): number { | ||
return this.machineExecutorManagerMap.size; | ||
} | ||
|
||
public get environmentMaintenceLoopInterval(): number { | ||
return 5000; | ||
} | ||
|
||
public get hasMoreEnvironments(): boolean { | ||
return false; | ||
} | ||
|
||
public get hasStorageService(): boolean { | ||
return false; | ||
} | ||
|
||
public async config(key: string, value: string): Promise<void> { | ||
switch (key) { | ||
case TrialConfigMetadataKey.MACHINE_LIST: | ||
await this.setupConnections(value); | ||
break; | ||
case TrialConfigMetadataKey.TRIAL_CONFIG: { | ||
const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value); | ||
// Parse trial config failed, throw Error | ||
if (remoteMachineTrailConfig === undefined) { | ||
throw new Error('trial config parsed failed'); | ||
} | ||
// codeDir is not a valid directory, throw Error | ||
if (!fs.lstatSync(remoteMachineTrailConfig.codeDir) | ||
.isDirectory()) { | ||
throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`); | ||
} | ||
try { | ||
// Validate to make sure codeDir doesn't have too many files | ||
await validateCodeDir(remoteMachineTrailConfig.codeDir); | ||
} catch (error) { | ||
this.log.error(error); | ||
return Promise.reject(new Error(error)); | ||
} | ||
|
||
this.trialConfig = remoteMachineTrailConfig; | ||
break; | ||
} | ||
default: | ||
this.log.debug(`Remote not support metadata key: '${key}', value: '${value}'`); | ||
} | ||
} | ||
|
||
private scheduleMachine(): RemoteMachineMeta | undefined { | ||
for (const [rmMeta, occupied] of this.remoteMachineMetaOccupiedMap) { | ||
if (!occupied) { | ||
this.remoteMachineMetaOccupiedMap.set(rmMeta, true); | ||
return rmMeta; | ||
} | ||
} | ||
return undefined; | ||
} | ||
|
||
private recycleMachineReservation(rmMeta: RemoteMachineMeta): void { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's called by one function only, and the name is confused with caller. don't need to be a seperated function. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
if (!this.remoteMachineMetaOccupiedMap.has(rmMeta)) { | ||
throw new Error(`${rmMeta} not initialized!`); | ||
} | ||
this.remoteMachineMetaOccupiedMap.set(rmMeta, false); | ||
} | ||
|
||
private async setupConnections(machineList: string): Promise<void> { | ||
this.log.debug(`Connecting to remote machines: ${machineList}`); | ||
//TO DO: verify if value's format is wrong, and json parse failed, how to handle error | ||
const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList); | ||
|
||
for (const rmMeta of rmMetaList) { | ||
this.sshConnectionPromises.push(await this.initRemoteMachineOnConnected(rmMeta)); | ||
} | ||
} | ||
|
||
private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta): Promise<void> { | ||
rmMeta.occupiedGpuIndexMap = new Map<number, number>(); | ||
const executorManager: ExecutorManager = new ExecutorManager(rmMeta); | ||
this.log.info(`connecting to ${rmMeta.username}@${rmMeta.ip}:${rmMeta.port}`); | ||
const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId); | ||
this.log.debug(`reached ${executor.name}`); | ||
this.machineExecutorManagerMap.set(rmMeta, executorManager); | ||
this.log.debug(`initializing ${executor.name}`); | ||
|
||
// Create root working directory after executor is ready | ||
const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni'); | ||
await executor.createFolder(executor.getRemoteExperimentRootDir(getExperimentId())); | ||
|
||
// the directory to store temp scripts in remote machine | ||
const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(getExperimentId()); | ||
|
||
// clean up previous result. | ||
await executor.createFolder(remoteGpuScriptCollectorDir, true); | ||
await executor.allowPermission(true, nniRootDir); | ||
} | ||
|
||
public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> { | ||
environments.forEach(async (environment) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it better to refresh all environment concurrently? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
const executor = await this.getExecutor(environment.id); | ||
const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`; | ||
const trialReturnCodeFilePath: string = `${environment.runnerWorkingFolder}/code`; | ||
if (fs.existsSync(jobpidPath)) { | ||
/* eslint-disable require-atomic-updates */ | ||
try { | ||
const isAlive = await executor.isProcessAlive(jobpidPath); | ||
// if the process of jobpid is not alive any more | ||
if (!isAlive) { | ||
this.log.info(`pid in ${jobpidPath} is not alive!`); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may need to log remote machine information, or it's hard to find which node is not alive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
if (fs.existsSync(trialReturnCodeFilePath)) { | ||
const trialReturnCode: string = await executor.getRemoteFileContent(trialReturnCodeFilePath); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change names There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
const match: RegExpMatchArray | null = trialReturnCode.trim() | ||
.match(/^-?(\d+)\s+(\d+)$/); | ||
if (match !== null) { | ||
const { 1: code } = match; | ||
// Update trial job's status based on result code | ||
if (parseInt(code, 10) === 0) { | ||
environment.setStatus('SUCCEEDED'); | ||
} else { | ||
environment.setStatus('FAILED'); | ||
} | ||
this.releaseEnvironmentResource(environment); | ||
} | ||
} | ||
} | ||
} catch (error) { | ||
this.releaseEnvironmentResource(environment); | ||
this.log.error(`Update job status exception, error is ${error.message}`); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* If a environment is finished, release the connection resource | ||
* @param environment remote machine environment job detail | ||
*/ | ||
public releaseEnvironmentResource(environment: EnvironmentInformation): void { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. private function? If it's proactive stopping, kill the trial runner. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
const executorManager = this.environmentExecutorManagerMap.get(environment.id); | ||
if (executorManager === undefined) { | ||
throw new Error(`ExecutorManager is not assigned for environment ${environment.id}`); | ||
} | ||
|
||
// Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager. | ||
executorManager.releaseExecutor(environment.id); | ||
const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation; | ||
if (remoteEnvironment.rmMachineMeta === undefined) { | ||
throw new Error(`${remoteEnvironment.id} rmMachineMeta not initialized!`); | ||
} | ||
this.recycleMachineReservation(remoteEnvironment.rmMachineMeta); | ||
} | ||
|
||
public async startEnvironment(environment: EnvironmentInformation): Promise<void> { | ||
if (this.sshConnectionPromises.length > 0) { | ||
await Promise.all(this.sshConnectionPromises); | ||
this.log.info('ssh connection initialized!'); | ||
// set sshConnectionPromises to [] to avoid log information duplicated | ||
this.sshConnectionPromises = []; | ||
if (this.trialConfig === undefined) { | ||
throw new Error("trial config not initialized!"); | ||
} | ||
Array.from(this.machineExecutorManagerMap.keys()).forEach(rmMeta => { | ||
// initialize remoteMachineMetaOccupiedMap, false means not occupied | ||
this.remoteMachineMetaOccupiedMap.set(rmMeta, false); | ||
}); | ||
} | ||
const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation; | ||
remoteEnvironment.status = 'WAITING'; | ||
await this.prepareEnvironment(remoteEnvironment); | ||
await this.launchEnvironment(remoteEnvironment); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's called here, and inside of prepareEnvironment also. Is it expected? The logic can be clearer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refactored. |
||
} | ||
|
||
private async prepareEnvironment(environment: RemoteMachineEnvironmentInformation): Promise<boolean> { | ||
const deferred: Deferred<boolean> = new Deferred<boolean>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not necessary to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated. |
||
if (this.trialConfig === undefined) { | ||
throw new Error('trial config is not initialized'); | ||
} | ||
|
||
// get an executor from scheduler | ||
const rmMachineMeta: RemoteMachineMeta | undefined = this.scheduleMachine(); | ||
if (rmMachineMeta === undefined) { | ||
this.log.warning(`No available machine!`); | ||
deferred.resolve(false); | ||
} else { | ||
environment.rmMachineMeta = rmMachineMeta; | ||
const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it still useful? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, this promise object is used to copy data to remote machine, and initialized in setClusterMetadata() step. need to use this object to workaround, otherwise setClusterMetadata will cost much time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. couldn't find setClusterMetadata, is it replaced by sshConnectionPromises? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
environment.rmMachineMeta); | ||
if (copyExpCodeDirPromise !== undefined) { | ||
await copyExpCodeDirPromise; | ||
} | ||
this.allocateExecutorManagerForEnvironment(environment); | ||
const executor = await this.getExecutor(environment.id); | ||
environment.runnerWorkingFolder = | ||
executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), | ||
'envs', environment.id) | ||
environment.command = `cd ${environment.runnerWorkingFolder} && \ | ||
${environment.command} --job_pid_file ${environment.runnerWorkingFolder}/pid \ | ||
&& echo $? \`date +%s%3N\` >${environment.runnerWorkingFolder}/code`; | ||
|
||
await this.launchEnvironment(environment); | ||
|
||
environment.status = 'RUNNING'; | ||
environment.trackingUrl = `file://${rmMachineMeta.ip}:${environment.runnerWorkingFolder}`; | ||
deferred.resolve(true); | ||
} | ||
|
||
return deferred.promise; | ||
} | ||
|
||
/** | ||
* give environment an executor | ||
* @param environment RemoteMachineEnvironmentDetail | ||
*/ | ||
private allocateExecutorManagerForEnvironment(environment: RemoteMachineEnvironmentInformation): void { | ||
if (environment.rmMachineMeta === undefined) { | ||
throw new Error(`rmMeta not set in trial ${environment.id}`); | ||
} | ||
const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(environment.rmMachineMeta); | ||
if (executorManager === undefined) { | ||
throw new Error(`executorManager not initialized`); | ||
} | ||
this.environmentExecutorManagerMap.set(environment.id, executorManager); | ||
} | ||
|
||
private async getExecutor(environmentId: string): Promise<ShellExecutor> { | ||
const executorManager = this.environmentExecutorManagerMap.get(environmentId); | ||
if (executorManager === undefined) { | ||
throw new Error(`ExecutorManager is not assigned for environment ${environmentId}`); | ||
} | ||
return await executorManager.getExecutor(environmentId); | ||
} | ||
|
||
public async stopEnvironment(environment: EnvironmentInformation): Promise<void> { | ||
const executor = await this.getExecutor(environment.id); | ||
|
||
if (environment.status === 'UNKNOWN') { | ||
environment.status = 'USER_CANCELED'; | ||
this.releaseEnvironmentResource(environment); | ||
return | ||
} | ||
|
||
const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`; | ||
try { | ||
await executor.killChildProcesses(jobpidPath); | ||
this.releaseEnvironmentResource(environment); | ||
} catch (error) { | ||
this.log.error(`remoteTrainingService.cancelTrialJob: ${error}`); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the error message is for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
} | ||
} | ||
|
||
private async launchEnvironment(environment: RemoteMachineEnvironmentInformation): Promise<void> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. launchEnvironment is similar with startEnvironment. launchRunner or other name? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
if (this.trialConfig === undefined) { | ||
throw new Error('trial config is not initialized'); | ||
} | ||
const executor = await this.getExecutor(environment.id); | ||
const environmentLocalTempFolder: string = | ||
path.join(this.experimentRootDir, this.experimentId, "environment-temp") | ||
await executor.createFolder(environment.runnerWorkingFolder); | ||
await execMkdir(environmentLocalTempFolder); | ||
await fs.promises.writeFile(path.join(environmentLocalTempFolder, executor.getScriptName("run")), | ||
environment.command, { encoding: 'utf8' }); | ||
// Copy files in codeDir to remote working directory | ||
await executor.copyDirectoryToRemote(environmentLocalTempFolder, environment.runnerWorkingFolder); | ||
// Execute command in remote machine | ||
executor.executeScript(executor.joinPath(environment.runnerWorkingFolder, | ||
executor.getScriptName("run")), true, false); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a little strange to have both "machineList" and "remoteConfig" in the same level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
machineList is a list type field, I considered merge machineList under remoteConfig, but it may cause compatibility problem.