-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Support remote training service use reuse mode #2923
Changes from 41 commits
dcd2ffd
3b8b6fb
916e444
caeffb8
57c300e
65660e6
9376d6a
5fef3cf
5544ae8
f9fdfee
c5e26ef
10a04ba
aa64fe6
4ed907f
c6a5f8c
68abe2f
c2b50d2
14e9619
f69e206
a5bb753
12ef0aa
7600a0f
ddcf229
bd327d4
c4f6e66
da2d1c4
529c29f
2a386d3
169e65f
88f8c1b
870b2d0
60ff833
c4fa1c3
4e56975
a428853
3b57f94
8d106ba
492ff8e
41e3ebd
9b5b3f7
1dabc88
c68a7f3
abd660c
d998599
c8ec30a
ebc12d2
e772871
863100c
1387f38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,8 +125,9 @@ export abstract class EnvironmentService { | |
public abstract get hasStorageService(): boolean; | ||
public abstract config(key: string, value: string): Promise<void>; | ||
public abstract refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void>; | ||
public abstract startEnvironment(environment: EnvironmentInformation): Promise<void>; | ||
public abstract stopEnvironment(environment: EnvironmentInformation): Promise<void>; | ||
public abstract startEnvironment(environment: EnvironmentInformation): Promise<void>; | ||
public abstract getInitializeEnvironmentNumber(): number; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed to use prefetchedEnvironmentCount(). I think it is better split these three data, for different platform has different data type and value. for example, openPAI only need to set hasStorageService, while remote need to set prefetchedEnvironmentCount, split them is more flexible. |
||
|
||
// It depends on environment pressure and settings | ||
// for example, OpenPAI relies on API calls, and there is an limitation for frequence, so it need to be bigger. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,309 @@ | ||
// Copyright (c) Microsoft Corporation. | ||
// Licensed under the MIT license. | ||
|
||
'use strict'; | ||
|
||
import * as fs from 'fs'; | ||
import * as path from 'path'; | ||
import { Deferred } from 'ts-deferred'; | ||
import * as component from '../../../common/component'; | ||
import { getExperimentId } from '../../../common/experimentStartupInfo'; | ||
import { getLogger, Logger } from '../../../common/log'; | ||
import { EnvironmentInformation, EnvironmentService } from '../environment'; | ||
import { ObservableTimer } from '../../../common/observableTimer'; | ||
import { | ||
getExperimentRootDir, | ||
} from '../../../common/utils'; | ||
import { TrialConfig } from '../../common/trialConfig'; | ||
import { TrialConfigMetadataKey } from '../../common/trialConfigMetadataKey'; | ||
import { execMkdir, validateCodeDir } from '../../common/util'; | ||
import { MachineScheduler } from '../remote/machineScheduler'; | ||
import { | ||
ExecutorManager, RemoteMachineMeta, | ||
} from '../../remote_machine/remoteMachineData'; | ||
import { ShellExecutor } from 'training_service/remote_machine/shellExecutor'; | ||
import { RemoteMachineEnvironmentInformation } from '../remote/remoteConfig'; | ||
|
||
|
||
@component.Singleton | ||
export class RemoteEnvironmentService extends EnvironmentService { | ||
|
||
private readonly initExecutorId = "initConnection"; | ||
private readonly machineExecutorManagerMap: Map<RemoteMachineMeta, ExecutorManager>; | ||
private readonly machineCopyExpCodeDirPromiseMap: Map<RemoteMachineMeta, Promise<void>>; | ||
private readonly environmentExecutorManagerMap: Map<string, ExecutorManager>; | ||
squirrelsc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private readonly environmentJobsMap: Map<string, RemoteMachineEnvironmentInformation>; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. environmentJobsMap may be unused, and can be removed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
private trialConfig: TrialConfig | undefined; | ||
private machineScheduler?: MachineScheduler; | ||
private readonly log: Logger; | ||
private sshConnectionPromises: any[]; | ||
private experimentRootDir: string; | ||
private experimentId: string; | ||
|
||
constructor() { | ||
super(); | ||
this.experimentId = getExperimentId(); | ||
this.environmentJobsMap = new Map<string, RemoteMachineEnvironmentInformation>(); | ||
this.environmentExecutorManagerMap = new Map<string, ExecutorManager>(); | ||
squirrelsc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.machineCopyExpCodeDirPromiseMap = new Map<RemoteMachineMeta, Promise<void>>(); | ||
this.machineExecutorManagerMap = new Map<RemoteMachineMeta, ExecutorManager>(); | ||
this.sshConnectionPromises = []; | ||
this.experimentRootDir = getExperimentRootDir(); | ||
this.experimentId = getExperimentId(); | ||
this.log = getLogger(); | ||
this.log.info('Construct remote machine training service.'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line of log is useless. If it's needed, it means one line log should be added at higher level. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
} | ||
|
||
public getInitializeEnvironmentNumber(): number { | ||
return this.machineExecutorManagerMap.size; | ||
} | ||
|
||
public get environmentMaintenceLoopInterval(): number { | ||
return 5000; | ||
} | ||
|
||
public get hasMoreEnvironments(): boolean { | ||
return false; | ||
} | ||
|
||
public get hasStorageService(): boolean { | ||
return false; | ||
} | ||
|
||
public async config(key: string, value: string): Promise<void> { | ||
switch (key) { | ||
case TrialConfigMetadataKey.MACHINE_LIST: | ||
await this.setupConnections(value); | ||
break; | ||
case TrialConfigMetadataKey.TRIAL_CONFIG: { | ||
const remoteMachineTrailConfig: TrialConfig = <TrialConfig>JSON.parse(value); | ||
// Parse trial config failed, throw Error | ||
if (remoteMachineTrailConfig === undefined) { | ||
throw new Error('trial config parsed failed'); | ||
} | ||
// codeDir is not a valid directory, throw Error | ||
if (!fs.lstatSync(remoteMachineTrailConfig.codeDir) | ||
.isDirectory()) { | ||
throw new Error(`codeDir ${remoteMachineTrailConfig.codeDir} is not a directory`); | ||
} | ||
try { | ||
// Validate to make sure codeDir doesn't have too many files | ||
await validateCodeDir(remoteMachineTrailConfig.codeDir); | ||
} catch (error) { | ||
this.log.error(error); | ||
return Promise.reject(new Error(error)); | ||
} | ||
|
||
this.trialConfig = remoteMachineTrailConfig; | ||
break; | ||
} | ||
default: | ||
this.log.debug(`Remote not support metadata key: '${key}', value: '${value}'`); | ||
} | ||
} | ||
|
||
private async setupConnections(machineList: string): Promise<void> { | ||
this.log.debug(`Connecting to remote machines: ${machineList}`); | ||
//TO DO: verify if value's format is wrong, and json parse failed, how to handle error | ||
const rmMetaList: RemoteMachineMeta[] = <RemoteMachineMeta[]>JSON.parse(machineList); | ||
|
||
for (const rmMeta of rmMetaList) { | ||
this.sshConnectionPromises.push(await this.initRemoteMachineOnConnected(rmMeta)); | ||
} | ||
} | ||
|
||
private async initRemoteMachineOnConnected(rmMeta: RemoteMachineMeta): Promise<void> { | ||
rmMeta.occupiedGpuIndexMap = new Map<number, number>(); | ||
const executorManager: ExecutorManager = new ExecutorManager(rmMeta); | ||
this.log.info(`connecting to ${rmMeta.username}@${rmMeta.ip}:${rmMeta.port}`); | ||
const executor: ShellExecutor = await executorManager.getExecutor(this.initExecutorId); | ||
this.log.debug(`reached ${executor.name}`); | ||
this.machineExecutorManagerMap.set(rmMeta, executorManager); | ||
this.log.debug(`initializing ${executor.name}`); | ||
|
||
// Create root working directory after executor is ready | ||
const nniRootDir: string = executor.joinPath(executor.getTempPath(), 'nni'); | ||
await executor.createFolder(executor.getRemoteExperimentRootDir(getExperimentId())); | ||
|
||
// the directory to store temp scripts in remote machine | ||
const remoteGpuScriptCollectorDir: string = executor.getRemoteScriptsPath(getExperimentId()); | ||
|
||
// clean up previous result. | ||
await executor.createFolder(remoteGpuScriptCollectorDir, true); | ||
await executor.allowPermission(true, nniRootDir); | ||
} | ||
|
||
public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> { | ||
environments.forEach(async (environment) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it better to refresh all environment concurrently? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
const executor = await this.getExecutor(environment.id); | ||
const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`; | ||
const trialReturnCodeFilePath: string = `${environment.runnerWorkingFolder}/code`; | ||
if (fs.existsSync(jobpidPath)) { | ||
/* eslint-disable require-atomic-updates */ | ||
try { | ||
const isAlive = await executor.isProcessAlive(jobpidPath); | ||
// if the process of jobpid is not alive any more | ||
if (!isAlive) { | ||
this.log.info(`pid in ${jobpidPath} is not alive!`); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may need to log remote machine information, or it's hard to find which node is not alive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
if (fs.existsSync(trialReturnCodeFilePath)) { | ||
const trialReturnCode: string = await executor.getRemoteFileContent(trialReturnCodeFilePath); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change names There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
const match: RegExpMatchArray | null = trialReturnCode.trim() | ||
.match(/^-?(\d+)\s+(\d+)$/); | ||
if (match !== null) { | ||
const { 1: code } = match; | ||
// Update trial job's status based on result code | ||
if (parseInt(code, 10) === 0) { | ||
environment.setStatus('SUCCEEDED'); | ||
} else { | ||
environment.setStatus('FAILED'); | ||
} | ||
this.releaseTrialResource(environment); | ||
} | ||
} | ||
} | ||
} catch (error) { | ||
this.releaseTrialResource(environment); | ||
this.log.error(`Update job status exception, error is ${error.message}`); | ||
} | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* If a trial is finished, release the connection resource | ||
* @param trial remote machine trial job detail | ||
*/ | ||
public releaseTrialResource(environment: EnvironmentInformation): void { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be called like releaseEnvironment or stopRunner. Trial is confusing here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated. |
||
const executorManager = this.environmentExecutorManagerMap.get(environment.id); | ||
if (executorManager === undefined) { | ||
throw new Error(`ExecutorManager is not assigned for environment ${environment.id}`); | ||
} | ||
if (this.machineScheduler === undefined) { | ||
throw new Error(`machineScheduler is not initialized!`); | ||
} | ||
// Note, it still keep reference in trialExecutorManagerMap, as there may be following requests from nni manager. | ||
executorManager.releaseExecutor(environment.id); | ||
const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation; | ||
if (remoteEnvironment.rmMachineMeta === undefined) { | ||
throw new Error(`${remoteEnvironment.id} rmMachineMeta not initialized!`); | ||
} | ||
this.machineScheduler.recycleMachineReservation(remoteEnvironment.rmMachineMeta); | ||
} | ||
|
||
public async startEnvironment(environment: EnvironmentInformation): Promise<void> { | ||
if (this.sshConnectionPromises.length > 0) { | ||
await Promise.all(this.sshConnectionPromises); | ||
this.log.info('ssh connection initialized!'); | ||
// set sshConnectionPromises to [] to avoid log information duplicated | ||
this.sshConnectionPromises = []; | ||
if (this.trialConfig === undefined) { | ||
throw new Error("trial config not initialized!"); | ||
} | ||
this.machineScheduler = new MachineScheduler(Array.from(this.machineExecutorManagerMap.keys())); | ||
} | ||
const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation; | ||
remoteEnvironment.status = 'WAITING'; | ||
await this.prepareEnvironment(remoteEnvironment); | ||
await this.launchEnvironmentOnScheduledMachine(remoteEnvironment); | ||
} | ||
|
||
private async prepareEnvironment(environment: RemoteMachineEnvironmentInformation): Promise<boolean> { | ||
const deferred: Deferred<boolean> = new Deferred<boolean>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not necessary to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated. |
||
if (this.trialConfig === undefined) { | ||
throw new Error('trial config is not initialized'); | ||
} | ||
if (this.machineScheduler === undefined) { | ||
throw new Error('machineScheduler is not initialized'); | ||
} | ||
|
||
// get an executor from scheduler | ||
const rmMachineMeta: RemoteMachineMeta | undefined = this.machineScheduler.scheduleMachine(); | ||
if (rmMachineMeta === undefined) { | ||
this.log.warning(`No available machine!`); | ||
deferred.resolve(false); | ||
} else { | ||
environment.rmMachineMeta = rmMachineMeta; | ||
const copyExpCodeDirPromise = this.machineCopyExpCodeDirPromiseMap.get( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it still useful? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, this promise object is used to copy data to remote machine, and initialized in setClusterMetadata() step. need to use this object to workaround, otherwise setClusterMetadata will cost much time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. couldn't find setClusterMetadata, is it replaced by sshConnectionPromises? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
environment.rmMachineMeta); | ||
if (copyExpCodeDirPromise !== undefined) { | ||
await copyExpCodeDirPromise; | ||
} | ||
this.allocateExecutorManagerForEnvironment(environment); | ||
const executor = await this.getExecutor(environment.id); | ||
environment.runnerWorkingFolder = | ||
executor.joinPath(executor.getRemoteExperimentRootDir(getExperimentId()), | ||
'envs', environment.id) | ||
environment.command = `cd ${environment.runnerWorkingFolder} && \ | ||
${environment.command} --job_pid_file ${environment.runnerWorkingFolder}/pid \ | ||
&& echo $? \`date +%s%3N\` >${environment.runnerWorkingFolder}/code`; | ||
|
||
await this.launchEnvironmentOnScheduledMachine(environment); | ||
|
||
environment.status = 'RUNNING'; | ||
environment.trackingUrl = `file://${rmMachineMeta.ip}:${environment.runnerWorkingFolder}`; | ||
this.environmentJobsMap.set(environment.id, environment); | ||
deferred.resolve(true); | ||
} | ||
|
||
return deferred.promise; | ||
} | ||
|
||
/** | ||
* give environment an executor | ||
* @param environment RemoteMachineEnvironmentDetail | ||
*/ | ||
private allocateExecutorManagerForEnvironment(environment: RemoteMachineEnvironmentInformation): void { | ||
if (environment.rmMachineMeta === undefined) { | ||
throw new Error(`rmMeta not set in trial ${environment.id}`); | ||
} | ||
const executorManager: ExecutorManager | undefined = this.machineExecutorManagerMap.get(environment.rmMachineMeta); | ||
if (executorManager === undefined) { | ||
throw new Error(`executorManager not initialized`); | ||
} | ||
this.environmentExecutorManagerMap.set(environment.id, executorManager); | ||
} | ||
|
||
private async getExecutor(environmentId: string): Promise<ShellExecutor> { | ||
const executorManager = this.environmentExecutorManagerMap.get(environmentId); | ||
if (executorManager === undefined) { | ||
throw new Error(`ExecutorManager is not assigned for environment ${environmentId}`); | ||
} | ||
return await executorManager.getExecutor(environmentId); | ||
} | ||
|
||
public async stopEnvironment(environment: EnvironmentInformation): Promise<void> { | ||
const executor = await this.getExecutor(environment.id); | ||
|
||
if (environment.status === 'UNKNOWN') { | ||
environment.status = 'USER_CANCELED'; | ||
this.releaseTrialResource(environment); | ||
return | ||
} | ||
|
||
const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`; | ||
try { | ||
await executor.killChildProcesses(jobpidPath); | ||
this.releaseTrialResource(environment); | ||
} catch (error) { | ||
this.log.error(`remoteTrainingService.cancelTrialJob: ${error}`); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the error message is for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
} | ||
} | ||
|
||
private async launchEnvironmentOnScheduledMachine(environment: RemoteMachineEnvironmentInformation): Promise<void> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the name is strange. how about prepareEnvironment or launchTrialRunner? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. prepareEnvironment is already used, this function is used to launcher an environment process, changed to use |
||
if (this.trialConfig === undefined) { | ||
throw new Error('trial config is not initialized'); | ||
} | ||
const executor = await this.getExecutor(environment.id); | ||
const environmentLocalTempFolder: string = | ||
path.join(this.experimentRootDir, this.experimentId, "environment-temp") | ||
await executor.createFolder(environment.runnerWorkingFolder); | ||
await execMkdir(environmentLocalTempFolder); | ||
await fs.promises.writeFile(path.join(environmentLocalTempFolder, executor.getScriptName("run")), | ||
environment.command, { encoding: 'utf8' }); | ||
// Copy files in codeDir to remote working directory | ||
await executor.copyDirectoryToRemote(environmentLocalTempFolder, environment.runnerWorkingFolder); | ||
// Execute command in remote machine | ||
executor.executeScript(executor.joinPath(environment.runnerWorkingFolder, | ||
executor.getScriptName("run")), true, false); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is a little strange to have both "machineList" and "remoteConfig" in the same level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
machineList is a list type field, I considered merge machineList under remoteConfig, but it may cause compatibility problem.