Skip to content

Commit

Permalink
Merge pull request #72 from Microsoft/master
Browse files Browse the repository at this point in the history
[Kubeflow Training Service] V1, merge from kubeflow branch to master …
  • Loading branch information
SparkSnail authored Nov 20, 2018
2 parents 6878c4c + 806afeb commit 34936df
Show file tree
Hide file tree
Showing 28 changed files with 1,116 additions and 135 deletions.
11 changes: 8 additions & 3 deletions src/nni_manager/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import { LocalTrainingServiceForGPU } from './training_service/local/localTraini
import {
RemoteMachineTrainingService
} from './training_service/remote_machine/remoteMachineTrainingService';
import { PAITrainingService } from './training_service/pai/paiTrainingService'

import { PAITrainingService } from './training_service/pai/paiTrainingService';
import { KubeflowTrainingService } from './training_service/kubeflow/kubeflowTrainingService';

function initStartupInfo(startExpMode: string, resumeExperimentId: string, basePort: number) {
const createNew: boolean = (startExpMode === 'new');
Expand All @@ -52,6 +52,8 @@ async function initContainer(platformMode: string): Promise<void> {
Container.bind(TrainingService).to(RemoteMachineTrainingService).scope(Scope.Singleton);
} else if (platformMode === 'pai') {
Container.bind(TrainingService).to(PAITrainingService).scope(Scope.Singleton);
} else if (platformMode === 'kubeflow') {
Container.bind(TrainingService).to(KubeflowTrainingService).scope(Scope.Singleton);
} else {
throw new Error(`Error: unsupported mode: ${mode}`);
}
Expand All @@ -76,19 +78,22 @@ if (!strPort || strPort.length === 0) {
const port: number = parseInt(strPort, 10);

const mode: string = parseArg(['--mode', '-m']);
if (!['local', 'remote', 'pai'].includes(mode)) {
if (!['local', 'remote', 'pai', 'kubeflow'].includes(mode)) {
console.log(`FATAL: unknown mode: ${mode}`);
usage();
process.exit(1);
}

const startMode: string = parseArg(['--start_mode', '-s']);
if (!['new', 'resume'].includes(startMode)) {
console.log(`FATAL: unknown start_mode: ${startMode}`);
usage();
process.exit(1);
}

const experimentId: string = parseArg(['--experiment_id', '-id']);
if (startMode === 'resume' && experimentId.trim().length < 1) {
console.log(`FATAL: cannot resume experiment, invalid experiment_id: ${experimentId}`);
usage();
process.exit(1);
}
Expand Down
1 change: 1 addition & 0 deletions src/nni_manager/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"express": "^4.16.3",
"express-joi-validator": "^2.0.0",
"node-nvidia-smi": "^1.0.0",
"node-yaml": "^3.1.1",
"rx": "^4.1.0",
"sqlite3": "^4.0.2",
"ssh2": "^0.6.1",
Expand Down
17 changes: 13 additions & 4 deletions src/nni_manager/rest_server/restHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ class NNIRestHandler {
return router;
}

private handle_error(err: Error, res: Response): void {
this.log.info(err);

private handle_error(err: Error, res: Response, isFatal: boolean = false): void {
if (err instanceof NNIError && err.name === NNIErrorNames.NOT_FOUND) {
res.status(404);
} else {
Expand All @@ -100,6 +98,14 @@ class NNIRestHandler {
res.send({
error: err.message
});

// If it's a fatal error, exit process
if(isFatal) {
this.log.critical(err);
process.exit(1);
}

this.log.error(err);
}

// TODO add validators for request params, query, body
Expand Down Expand Up @@ -145,12 +151,14 @@ class NNIRestHandler {
experiment_id: eid
});
}).catch((err: Error) => {
// Start experiment is a step of initialization, so any exception thrown is a fatal
this.handle_error(err, res);
});
} else {
this.nniManager.resumeExperiment().then(() => {
res.send();
}).catch((err: Error) => {
// Resume experiment is a step of initialization, so any exception thrown is a fatal
this.handle_error(err, res);
});
}
Expand Down Expand Up @@ -180,7 +188,8 @@ class NNIRestHandler {
}
res.send();
} catch (err) {
this.handle_error(err, res);
// setClusterMetata is a step of initialization, so any exception thrown is a fatal
this.handle_error(err, res, true);
}
});
}
Expand Down
8 changes: 8 additions & 0 deletions src/nni_manager/rest_server/restValidationSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ export namespace ValidationSchemas {
userName: joi.string().min(1).required(),
passWord: joi.string().min(1).required(),
host: joi.string().min(1).required()
}),
kubeflow_config: joi.object({
operator: joi.string().min(1).required(),
nfs: joi.object({
server: joi.string().min(1).required(),
path: joi.string().min(1).required()
}).required(),
kubernetesServer: joi.string().min(1).required()
})
}
};
Expand Down
96 changes: 96 additions & 0 deletions src/nni_manager/training_service/common/clusterJobRestServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

'use strict';

import * as assert from 'assert';
import { Request, Response, Router } from 'express';
import * as bodyParser from 'body-parser';
import * as component from '../../common/component';
import { getBasePort, getExperimentId } from '../../common/experimentStartupInfo';
import { RestServer } from '../../common/restServer'

/**
* Cluster Job Training service Rest server, provides rest API to support Cluster job metrics update
*
*/
@component.Singleton
export abstract class ClusterJobRestServer extends RestServer{
private readonly API_ROOT_URL: string = '/api/v1/nni-pai';

private readonly expId: string = getExperimentId();

/**
* constructor to provide NNIRestServer's own rest property, e.g. port
*/
constructor() {
super();
const basePort: number = getBasePort();
assert(basePort && basePort > 1024);

this.port = basePort + 1;
}

public get clusterRestServerPort(): number {
if(!this.port) {
throw new Error('PAI Rest server port is undefined');
}
return this.port;
}

/**
* NNIRestServer's own router registration
*/
protected registerRestHandler(): void {
this.app.use(bodyParser.json());
this.app.use(this.API_ROOT_URL, this.createRestHandler());
}

private createRestHandler() : Router {
const router: Router = Router();

// tslint:disable-next-line:typedef
router.use((req: Request, res: Response, next) => {
this.log.info(`${req.method}: ${req.url}: body:\n${JSON.stringify(req.body, undefined, 4)}`);
res.setHeader('Content-Type', 'application/json');
next();
});

router.post(`/update-metrics/${this.expId}/:trialId`, (req: Request, res: Response) => {
try {
this.log.info(`Get update-metrics request, trial job id is ${req.params.trialId}`);
this.log.info(`update-metrics body is ${JSON.stringify(req.body)}`);

this.handleTrialMetrics(req.body.jobId, req.body.metrics);

res.send();
}
catch(err) {
this.log.error(`json parse metrics error: ${err}`);
res.status(500);
res.send(err.message);
}
});

return router;
}

/** Abstract method to handle trial metrics data */
protected abstract handleTrialMetrics(jobId : string, trialMetrics : any[]) : void;
}
30 changes: 30 additions & 0 deletions src/nni_manager/training_service/common/containerJobData.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

'use strict';

export const CONTAINER_INSTALL_NNI_SHELL_FORMAT: string =
`#!/bin/bash
if python3 -c 'import nni' > /dev/null 2>&1; then
# nni module is already installed, skip
return
else
# Install nni
python3 -m pip install --user --upgrade nni
fi`;
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ export enum TrialConfigMetadataKey {
EXPERIMENT_ID = 'experimentId',
MULTI_PHASE = 'multiPhase',
RANDOM_SCHEDULER = 'random_scheduler',
PAI_CLUSTER_CONFIG = 'pai_config'
PAI_CLUSTER_CONFIG = 'pai_config',
KUBEFLOW_CLUSTER_CONFIG = 'kubeflow_config'
}
93 changes: 93 additions & 0 deletions src/nni_manager/training_service/kubeflow/kubeflowConfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { TrialConfig } from "../common/trialConfig";

/**
* Copyright (c) Microsoft Corporation
* All rights reserved.
*
* MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

'use strict';


/** operator types that kubeflow supported */
export type KubeflowOperator = 'tf-operator' | 'pytorch-operator' | 'mxnet-operator' | 'caffe2-operator' | 'chainer-operator' | 'mpi-operator';
export type KubeflowOperatorPlural = 'tfjobs' | 'pytorchjobs' | 'mxjobs' | 'caffe2jobs' | 'chainerjobs' | 'mpijobs';

/**
* map from Kubeflow operator name to its plural name in K8S
*/
export const kubeflowOperatorMap : Map<KubeflowOperator, KubeflowOperatorPlural> = new Map<KubeflowOperator, KubeflowOperatorPlural>([
['tf-operator' , 'tfjobs'],
['pytorch-operator', 'pytorchjobs'],
['mxnet-operator', 'mxjobs'],
['caffe2-operator', 'caffe2jobs'],
['chainer-operator', 'chainerjobs'],
['mpi-operator', 'mpijobs']
]);

/**
* Kuberflow cluster configuration
*
*/
export class KubeflowClusterConfig {
/** Name of Kubeflow operator, like tf-operator */
public readonly operator: KubeflowOperator;
public readonly nfs: NFSConfig;
public readonly kubernetesServer: string;

/**
* Constructor
* @param userName User name of Kubeflow Cluster
* @param passWord password of Kubeflow Cluster
* @param host Host IP of Kubeflow Cluster
*/
constructor(operator: KubeflowOperator, nfs : NFSConfig, kubernetesServer : string) {
this.operator = operator;
this.nfs = nfs;
this.kubernetesServer = kubernetesServer;
}
}

/**
* NFS configuration to store Kubeflow job related files
*/
export class NFSConfig {
/** IP Adress of NFS server */
public readonly server : string;
/** exported NFS path on NFS server */
public readonly path : string;

constructor(server : string, path : string) {
this.server = server;
this.path = path;
}
}

/**
* Trial job configuration for Kubeflow
*/
export class KubeflowTrialConfig extends TrialConfig {
public readonly cpuNum: number;
public readonly memoryMB: number;
public readonly image: string;

constructor(command : string, codeDir : string, gpuNum : number, cpuNum: number, memoryMB: number, image: string) {
super(command, codeDir, gpuNum);
this.cpuNum = cpuNum;
this.memoryMB = memoryMB;
this.image = image;
}
}
Loading

0 comments on commit 34936df

Please sign in to comment.