Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Kubeflow training service: Check nfs configuration and throw error if NFS mount failed #380

Merged
merged 41 commits into from
Nov 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
bc8ee41
Merge pull request #2 from Microsoft/master
yds05 Nov 8, 2018
aedb000
Change base image from devel to runtime, to reduce docker image size
Nov 8, 2018
69e9de0
Support running multiple experiment for PAI
Nov 9, 2018
53c2cd9
Fix a bug regarding to recuisively reference between paiRestServer and
Nov 12, 2018
bd5dcd7
Initial version for Kubeflow Training Service
Nov 12, 2018
1e40376
Merge pull request #3 from Microsoft/master
yds05 Nov 12, 2018
575cbb9
Merge branch 'master' into kubeflow
Nov 12, 2018
fd052ca
simple refactor
Nov 12, 2018
b5d4e9d
Merge pull request #4 from Microsoft/master
yds05 Nov 13, 2018
c83a2ce
Merge branch 'master' into kubeflow
Nov 13, 2018
7e04718
Remove unused 51189 const variable
Nov 13, 2018
3eaea4f
Support launch kubeflow training service through nnictl
Nov 13, 2018
7d4e3ca
Enable kubeflow platfrom in NNI sdk
Nov 13, 2018
5f3f069
Add shell comand to launch kubeflow trial job
Nov 14, 2018
4bcda91
Test for ubuntu nFS logic
Nov 14, 2018
2e11a68
Fix trial nfs folder small bug
Nov 14, 2018
33d24c6
fix small bug
Nov 14, 2018
a2164ee
fix small bug
Nov 14, 2018
f63e16b
Move get_sequence_id() from reading sequence_id file to read env param
Nov 15, 2018
2b29427
Add cancelJob support for Kubeflow training service
Nov 15, 2018
e4e27e7
Add some delay to check kubeflow job status
Nov 15, 2018
8abd518
Fix a bug caused by error checking result.stderr
Nov 15, 2018
9bba10f
Add cleanup function for kubeflow training service
Nov 15, 2018
7256a82
Move mount operation from submitJob to setClusterMetadata
Nov 15, 2018
6efec43
Change set trial sequence id, change from upload sequence_id file to
Nov 16, 2018
634a464
Change NNI_OUT_DIR to nfs path
Nov 16, 2018
01171c4
Add file server log path for Kubeflow trial job
Nov 16, 2018
baedae3
Correct nfs log url path
Nov 16, 2018
b376d5b
Code refactor: move /tmp/nfs to const var, and use path join for nfs …
Nov 16, 2018
226fc16
Add install and check NNI package
Nov 16, 2018
d7f4556
Merge pull request #6 from Microsoft/master
yds05 Nov 16, 2018
7da87c4
Merge pull request #7 from yds05/master
yds05 Nov 16, 2018
8f71617
Fix nni stop (#368)
SparkSnail Nov 16, 2018
6e44618
Replace hard-coded tfjobs with kubeflow job plural name
Nov 19, 2018
a8a97c6
Fix wrongly import location in kubeflowconfig.ts
Nov 19, 2018
83264fb
Fix typos and remove unnecessary ut config
Nov 19, 2018
0794ce6
Merge pull request #9 from Microsoft/master
yds05 Nov 19, 2018
96c311e
Add error handling in setClusterMetadata for Kubeflow training service
Nov 19, 2018
85a4066
Merge branch 'kubeflow' of https://github.com/yds05/nni into kubeflow
Nov 19, 2018
108f73c
fix a small bug, donnot check stderr of mount command
Nov 19, 2018
6e6c828
Merge branch 'kubeflow' into kubeflow
yds05 Nov 19, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/nni_manager/common/restServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { Deferred } from 'ts-deferred';
import { getLogger, Logger } from './log';
import { getBasePort } from './experimentStartupInfo';


/**
* Abstraction class to create a RestServer
* The module who wants to use a RestServer could <b>extends</b> this abstract class
Expand Down Expand Up @@ -90,6 +91,10 @@ export abstract class RestServer {
} else {
this.startTask.promise.then(
() => { // Started
//Stops the server from accepting new connections and keeps existing connections.
//This function is asynchronous, the server is finally closed when all connections
//are ended and the server emits a 'close' event.
//Refer https://nodejs.org/docs/latest/api/net.html#net_server_close_callback
this.server.close().on('close', () => {
this.log.info('Rest server stopped.');
this.stopTask.resolve();
Expand All @@ -103,7 +108,7 @@ export abstract class RestServer {
}
);
}

this.stopTask.resolve()
return this.stopTask.promise;
}

Expand Down
1 change: 0 additions & 1 deletion src/nni_manager/core/nnimanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ class NNIManager implements Manager {

public async stopExperiment(): Promise<void> {
this.status.status = 'STOPPING';
await this.experimentDoneCleanUp();
}

public async getMetricData(trialJobId?: string, metricType?: MetricType): Promise<MetricDataRecord[]> {
Expand Down
20 changes: 15 additions & 5 deletions src/nni_manager/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,20 @@ mkDirP(getLogDir()).then(async () => {
});

process.on('SIGTERM', async () => {
const ds: DataStore = component.get(DataStore);
await ds.close();
const restServer: NNIRestServer = component.get(NNIRestServer);
await restServer.stop();
const log: Logger = getLogger();
log.close();
let hasError: boolean = false;
try{
const nniManager: Manager = component.get(Manager);
await nniManager.stopExperiment();
const ds: DataStore = component.get(DataStore);
await ds.close();
const restServer: NNIRestServer = component.get(NNIRestServer);
await restServer.stop();
}catch(err){
hasError = true;
log.error(`${err.stack}`);
}finally{
await log.close();
process.exit(hasError?1:0);
}
})
30 changes: 13 additions & 17 deletions src/nni_manager/rest_server/restHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class NNIRestHandler {
this.getExperimentProfile(router);
this.updateExperimentProfile(router);
this.startExperiment(router);
this.stopExperiment(router);
this.getTrialJobStatistics(router);
this.setClusterMetaData(router);
this.listTrialJobs(router);
Expand All @@ -90,9 +89,7 @@ class NNIRestHandler {
return router;
}

private handle_error(err: Error, res: Response): void {
this.log.info(err);

private handle_error(err: Error, res: Response, isFatal: boolean = false): void {
if (err instanceof NNIError && err.name === NNIErrorNames.NOT_FOUND) {
res.status(404);
} else {
Expand All @@ -101,6 +98,14 @@ class NNIRestHandler {
res.send({
error: err.message
});

// If it's a fatal error, exit process
if(isFatal) {
this.log.critical(err);
process.exit(1);
}

this.log.error(err);
}

// TODO add validators for request params, query, body
Expand Down Expand Up @@ -146,30 +151,20 @@ class NNIRestHandler {
experiment_id: eid
});
}).catch((err: Error) => {
// Start experiment is a step of initialization, so any exception thrown is a fatal
this.handle_error(err, res);
});
} else {
this.nniManager.resumeExperiment().then(() => {
res.send();
}).catch((err: Error) => {
// Resume experiment is a step of initialization, so any exception thrown is a fatal
this.handle_error(err, res);
});
}
});
}

private stopExperiment(router: Router): void {
router.delete('/experiment', async (req: Request, res: Response) => {
try {
await this.tb.cleanUp();
await this.nniManager.stopExperiment();
res.send();
} catch (err) {
this.handle_error(err, res);
}
});
}

private getTrialJobStatistics(router: Router): void {
router.get('/job-statistics', (req: Request, res: Response) => {
this.nniManager.getTrialJobStatistics().then((statistics: TrialJobStatistics[]) => {
Expand All @@ -193,7 +188,8 @@ class NNIRestHandler {
}
res.send();
} catch (err) {
this.handle_error(err, res);
// setClusterMetata is a step of initialization, so any exception thrown is a fatal
this.handle_error(err, res, true);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,21 @@ class KubeflowTrainingService implements TrainingService {
switch (key) {
case TrialConfigMetadataKey.KUBEFLOW_CLUSTER_CONFIG:
this.kubeflowClusterConfig = <KubeflowClusterConfig>JSON.parse(value);
//Check and mount NFS mount point here
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}`);
const nfsServer: string = this.kubeflowClusterConfig.nfs.server;
const nfsPath: string = this.kubeflowClusterConfig.nfs.path;
try {
await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.trialLocalNFSTempFolder}`);
} catch(error) {
this.log.error(`Mount NFS ${nfsServer}:${nfsPath} to ${this.trialLocalNFSTempFolder} failed, error is ${error}`);

// If NFS config section is valid in config file, proceed to mount and config NFS
if(this.kubeflowClusterConfig.nfs) {
//Check and mount NFS mount point here
await cpp.exec(`mkdir -p ${this.trialLocalNFSTempFolder}`);
const nfsServer: string = this.kubeflowClusterConfig.nfs.server;
const nfsPath: string = this.kubeflowClusterConfig.nfs.path;

try {
await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.trialLocalNFSTempFolder}`);
} catch(error) {
const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.trialLocalNFSTempFolder} failed, error is ${error}`;
this.log.error(mountError);
throw new Error(mountError);
}
}

this.kubeflowJobPlural = kubeflowOperatorMap.get(this.kubeflowClusterConfig.operator);
Expand Down
15 changes: 1 addition & 14 deletions tools/nni_cmd/nnictl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,6 @@ def stop_experiment(args):
print_normal('Experiment is not running...')
experiment_config.update_experiment(experiment_id, 'status', 'stopped')
return
running, _ = check_rest_server_quick(rest_port)
stop_rest_result = True
if running:
response = rest_delete(experiment_url(rest_port), 20)
if not response or not check_response(response):
if response:
print_error(response.text)
else:
print_error('No response from restful server!')
stop_rest_result = False
#sleep to wait rest handler done
time.sleep(3)
rest_pid = nni_config.get_config('restServerPid')
if rest_pid:
stop_rest_cmds = ['kill', str(rest_pid)]
Expand All @@ -204,8 +192,7 @@ def stop_experiment(args):
except Exception as exception:
print_error(exception)
nni_config.set_config('tensorboardPidList', [])
if stop_rest_result:
print_normal('Stop experiment success!')
print_normal('Stop experiment success!')
experiment_config.update_experiment(experiment_id, 'status', 'stopped')
time_now = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
experiment_config.update_experiment(experiment_id, 'endTime', str(time_now))
Expand Down