Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
Fix remote reuse bugs (#2981)
Browse files Browse the repository at this point in the history
  • Loading branch information
SparkSnail authored Oct 20, 2020
1 parent 058b58a commit add7ca6
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ class LinuxCommands extends OsCommands {
return `${preCommand} && ${command}`;
}
}

public fileExistCommand(filePath: string): string {
return `test -e ${filePath} && echo True || echo False`;
}
}

export { LinuxCommands };
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ class WindowsCommands extends OsCommands {
return `${preCommand} && set prePath=%path% && ${command}`;
}
}

public fileExistCommand(filePath: string): string {
return `powershell Test-Path ${filePath} -PathType Leaf`;
}
}

export { WindowsCommands };
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ abstract class OsCommands {
public abstract extractFile(tarFileName: string, targetFolder: string): string;
public abstract executeScript(script: string, isFile: boolean): string;
public abstract addPreCommand(preCommand: string | undefined, command: string | undefined): string | undefined;
public abstract fileExistCommand(filePath: string): string | undefined;

public joinPath(...paths: string[]): string {
let dir: string = paths.filter((path: any) => path !== '').join(this.pathSpliter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,12 @@ class ShellExecutor {
return commandResult.exitCode == 0;
}

public async fileExist(filePath: string): Promise<boolean> {
const commandText = this.osCommands && this.osCommands.fileExistCommand(filePath);
const commandResult = await this.execute(commandText);
return commandResult.stdout !== undefined && commandResult.stdout.trim() === 'True';
}

public async extractFile(tarFileName: string, targetFolder: string): Promise<boolean> {
const commandText = this.osCommands && this.osCommands.extractFile(tarFileName, targetFolder);
const commandResult = await this.execute(commandText);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,40 +137,43 @@ export class RemoteEnvironmentService extends EnvironmentService {

private async refreshEnvironment(environment: EnvironmentInformation): Promise<void> {
const executor = await this.getExecutor(environment.id);
const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`;
const runnerReturnCodeFilePath: string = `${environment.runnerWorkingFolder}/code`;
if (fs.existsSync(jobpidPath)) {
/* eslint-disable require-atomic-updates */
try {
const isAlive = await executor.isProcessAlive(jobpidPath);
// if the process of jobpid is not alive any more
if (!isAlive) {
const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation;
if (remoteEnvironment.rmMachineMeta === undefined) {
throw new Error(`${remoteEnvironment.id} machine meta not initialized!`);
}
this.log.info(`pid in ${remoteEnvironment.rmMachineMeta.ip}:${jobpidPath} is not alive!`);
if (fs.existsSync(runnerReturnCodeFilePath)) {
const runnerReturnCode: string = await executor.getRemoteFileContent(runnerReturnCodeFilePath);
const match: RegExpMatchArray | null = runnerReturnCode.trim()
.match(/^-?(\d+)\s+(\d+)$/);
if (match !== null) {
const { 1: code } = match;
// Update trial job's status based on result code
if (parseInt(code, 10) === 0) {
environment.setStatus('SUCCEEDED');
} else {
environment.setStatus('FAILED');
}
this.releaseEnvironmentResource(environment);
}
const jobpidPath: string = `${environment.runnerWorkingFolder}/pid`;
const runnerReturnCodeFilePath: string = `${environment.runnerWorkingFolder}/code`;
/* eslint-disable require-atomic-updates */
try {
// check if pid file exist
const pidExist = await executor.fileExist(jobpidPath);
if (!pidExist) {
return;
}
const isAlive = await executor.isProcessAlive(jobpidPath);
environment.status = 'RUNNING';
// if the process of jobpid is not alive any more
if (!isAlive) {
const remoteEnvironment: RemoteMachineEnvironmentInformation = environment as RemoteMachineEnvironmentInformation;
if (remoteEnvironment.rmMachineMeta === undefined) {
throw new Error(`${remoteEnvironment.id} machine meta not initialized!`);
}
this.log.info(`pid in ${remoteEnvironment.rmMachineMeta.ip}:${jobpidPath} is not alive!`);
if (fs.existsSync(runnerReturnCodeFilePath)) {
const runnerReturnCode: string = await executor.getRemoteFileContent(runnerReturnCodeFilePath);
const match: RegExpMatchArray | null = runnerReturnCode.trim()
.match(/^-?(\d+)\s+(\d+)$/);
if (match !== null) {
const { 1: code } = match;
// Update trial job's status based on result code
if (parseInt(code, 10) === 0) {
environment.setStatus('SUCCEEDED');
} else {
environment.setStatus('FAILED');
}
this.releaseEnvironmentResource(environment);
}
} catch (error) {
this.releaseEnvironmentResource(environment);
this.log.error(`Update job status exception, error is ${error.message}`);
}
}
} catch (error) {
this.log.error(`Update job status exception, error is ${error.message}`);
}
}

public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise<void> {
Expand Down Expand Up @@ -245,6 +248,7 @@ export class RemoteEnvironmentService extends EnvironmentService {
'envs', environment.id)
environment.command = `cd ${environment.runnerWorkingFolder} && \
${environment.command} --job_pid_file ${environment.runnerWorkingFolder}/pid \
1>${environment.runnerWorkingFolder}/trialrunner_stdout 2>${environment.runnerWorkingFolder}/trialrunner_stderr \
&& echo $? \`date +%s%3N\` >${environment.runnerWorkingFolder}/code`;
return Promise.resolve(true);
}
Expand All @@ -266,7 +270,6 @@ ${environment.command} --job_pid_file ${environment.runnerWorkingFolder}/pid \
// Execute command in remote machine
executor.executeScript(executor.joinPath(environment.runnerWorkingFolder,
executor.getScriptName("run")), true, false);
environment.status = 'RUNNING';
if (environment.rmMachineMeta === undefined) {
throw new Error(`${environment.id} rmMachineMeta not initialized!`);
}
Expand Down
19 changes: 11 additions & 8 deletions src/nni_manager/training_service/reusable/trialDispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -662,19 +662,22 @@ class TrialDispatcher implements TrainingService {
trial.status = "RUNNING";
await this.commandChannel.sendCommand(trial.environment, NEW_TRIAL_JOB, trial.settings);
}


/**
* release the trial assigned environment resources
* @param trial
*/
private releaseEnvironment(trial: TrialDetail): void {
if (undefined === trial.environment) {
throw new Error(`TrialDispatcher: environment is not assigned to trial ${trial.id}, and cannot be released!`);
}
if (trial.environment.runningTrialCount <= 0) {
throw new Error(`TrialDispatcher: environment ${trial.environment.id} has no counted running trial!`);
if (trial.environment !== undefined) {
if (trial.environment.runningTrialCount <= 0) {
throw new Error(`TrialDispatcher: environment ${trial.environment.id} has no counted running trial!`);
}
trial.environment.runningTrialCount--;
trial.environment = undefined;
}
if (true === this.enableGpuScheduler) {
this.gpuScheduler.removeGpuReservation(trial);
}
trial.environment.runningTrialCount--;
trial.environment = undefined;
}

private async handleMetricData(trialId: string, data: any): Promise<void> {
Expand Down
2 changes: 2 additions & 0 deletions test/config/training_service.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ pai:
containerNFSMountPath:
paiStorageConfigName:
remote:
remoteConfig:
reuse: false
machineList:
- ip:
passwd:
Expand Down
3 changes: 3 additions & 0 deletions test/nni_test/nnitest/generate_ts_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def update_training_service_config(args):
config[args.ts]['machineList'][0]['port'] = args.remote_port
if args.remote_pwd is not None:
config[args.ts]['machineList'][0]['passwd'] = args.remote_pwd
if args.remote_reuse is not None:
config[args.ts]['remoteConfig']['reuse'] = args.remote_reuse.lower() == 'true'

dump_yml_content(TRAINING_SERVICE_FILE, config)

Expand Down Expand Up @@ -119,6 +121,7 @@ def update_training_service_config(args):
parser.add_argument("--remote_pwd", type=str)
parser.add_argument("--remote_host", type=str)
parser.add_argument("--remote_port", type=int)
parser.add_argument("--remote_reuse", type=str)
args = parser.parse_args()

update_training_service_config(args)
2 changes: 1 addition & 1 deletion test/pipelines/pipelines-it-remote-linux-to-linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
- script: |
set -e
cd test
python3 nni_test/nnitest/generate_ts_config.py --ts remote --remote_user $(docker_user) --remote_host $(remote_host) \
python3 nni_test/nnitest/generate_ts_config.py --ts remote --remote_reuse $(remote_reuse) --remote_user $(docker_user) --remote_host $(remote_host) \
--remote_port $(cat port) --remote_pwd $(docker_pwd) --nni_manager_ip $(nni_manager_ip)
cat config/training_service.yml
PATH=$HOME/.local/bin:$PATH python3 nni_test/nnitest/run_tests.py --config config/integration_tests.yml --ts remote
Expand Down
13 changes: 9 additions & 4 deletions tools/nni_trial_tool/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,15 @@ def is_running(self):
def kill(self, trial_id=None):
if trial_id == self.id or trial_id is None:
if self.process is not None:
nni_log(LogType.Info, "%s: killing trial" % self.name)
for child in psutil.Process(self.process.pid).children(True):
child.kill()
self.process.kill()
try:
nni_log(LogType.Info, "%s: killing trial" % self.name)
for child in psutil.Process(self.process.pid).children(True):
child.kill()
self.process.kill()
except psutil.NoSuchProcess:
nni_log(LogType.Info, "kill trial %s failed: %s does not exist!" % (trial_id, self.process.pid))
except Exception as ex:
nni_log(LogType.Error, "kill trial %s failed: %s " % (trial_id, str(ex)))
self.cleanup()

def cleanup(self):
Expand Down

0 comments on commit add7ca6

Please sign in to comment.