From fdb2d77a5db7f765b07adb9138ce56ec1c9e169f Mon Sep 17 00:00:00 2001 From: SparkSnail Date: Wed, 19 May 2021 19:16:25 +0800 Subject: [PATCH 1/3] set version check as warning --- nni/tools/trial_tool/trial_keeper.py | 3 +-- nni/tools/trial_tool/trial_runner.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/nni/tools/trial_tool/trial_keeper.py b/nni/tools/trial_tool/trial_keeper.py index c6e502d13e..65ea160a27 100644 --- a/nni/tools/trial_tool/trial_keeper.py +++ b/nni/tools/trial_tool/trial_keeper.py @@ -151,14 +151,13 @@ def check_version(args): nni_log(LogType.Info, 'nni_manager_version is {0}'.format(nni_manager_version)) log_entry = {} if trial_keeper_version != nni_manager_version: - nni_log(LogType.Error, 'Version does not match!') + nni_log(LogType.Warning, 'Version does not match!') error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format( nni_manager_version, trial_keeper_version) log_entry['tag'] = 'VCFail' log_entry['msg'] = error_message rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, False) - os._exit(1) else: nni_log(LogType.Info, 'Version match!') log_entry['tag'] = 'VCSuccess' diff --git a/nni/tools/trial_tool/trial_runner.py b/nni/tools/trial_tool/trial_runner.py index 9f39fe74b5..ebc6ee7dad 100644 --- a/nni/tools/trial_tool/trial_runner.py +++ b/nni/tools/trial_tool/trial_runner.py @@ -119,7 +119,7 @@ def check_version(args): nni_log(LogType.Info, '{0}: nni_manager_version is {1}'.format(args.node_id, nni_manager_version)) log_entry = {} if trial_runner_version != nni_manager_version: - nni_log(LogType.Error, '{0}: Version does not match!'.format(args.node_id)) + nni_log(LogType.Warning, '{0}: Version does not match!'.format(args.node_id)) error_message = '{0}: NNIManager version is {1}, Trial runner version is {2}, NNI version does not match!'.format( args.node_id, nni_manager_version, trial_runner_version) log_entry['tag'] = 'VCFail' @@ -127,7 +127,6 @@ def check_version(args): command_channel.send(CommandType.VersionCheck, log_entry) while not command_channel.sent(): time.sleep(1) - os._exit(1) else: nni_log(LogType.Info, '{0}: Version match!'.format(args.node_id)) log_entry['tag'] = 'VCSuccess' From 899d231b3d5d9c8018e265d7fa90189764a57cad Mon Sep 17 00:00:00 2001 From: SparkSnail Date: Fri, 9 Jul 2021 01:45:53 +0800 Subject: [PATCH 2/3] add doc --- docs/en_US/TrainingService/KubeflowMode.rst | 38 +++ nni/experiment/config/kubeflow.py | 34 +- ts/nni_manager/common/experimentConfig.ts | 30 +- ts/nni_manager/core/nnimanager.ts | 3 - .../environments/environmentServiceFactory.ts | 5 +- .../kubernetes/kubeflowEnvironmentService.ts | 290 ++++++++++++++++++ .../kubernetesEnvironmentService.ts | 260 ++++++++++++++++ .../reusable/routerTrainingService.ts | 7 +- 8 files changed, 634 insertions(+), 33 deletions(-) create mode 100644 ts/nni_manager/training_service/reusable/environments/kubernetes/kubeflowEnvironmentService.ts create mode 100644 ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts diff --git a/docs/en_US/TrainingService/KubeflowMode.rst b/docs/en_US/TrainingService/KubeflowMode.rst index dcfbb22eb8..eb8a31db43 100644 --- a/docs/en_US/TrainingService/KubeflowMode.rst +++ b/docs/en_US/TrainingService/KubeflowMode.rst @@ -253,3 +253,41 @@ version check NNI support version check feature in since version 0.6, `refer `__ Any problems when using NNI in Kubeflow mode, please create issues on `NNI Github repo `__. + + +Kubeflow reuse mode +---------------------- +NNI support setting reuse mode for trial jobs. In reuse mode, NNI will submit a long-running trial runner process to occupy the container, and start trial jobs as the subprocess of the trial runner process, it means k8s do not need to schedule new container again, it just reuse old container. +Currently, kubeflow reuse mode only support V2 config. +Here is the example: + +.. code-block:: yaml + + searchSpaceFile: search_space.json + trialCommand: python3 mnist.py + trialGpuNumber: 0 + trialConcurrency: 4 + maxTrialNumber: 20 + tuner: + name: TPE + classArgs: + optimize_mode: maximize + trainingService: + reuseMode: true + platform: kubeflow + worker: + command: python3 mnist.py + code_directory: . + dockerImage: msranni/nni + cpuNumber: 1 + gpuNumber: 0 + memorySize: 8192 + replicas: 1 + operator: tf-operator + storage: + storageType: azureStorage + azureAccount: {your_account} + azureShare: {your_share} + keyVaultName: {your_valut_name} + keyVaultKey: {your_valut_key} + apiVersion: v1 diff --git a/nni/experiment/config/kubeflow.py b/nni/experiment/config/kubeflow.py index aaa15085d4..f089f817fa 100644 --- a/nni/experiment/config/kubeflow.py +++ b/nni/experiment/config/kubeflow.py @@ -8,21 +8,21 @@ from .common import TrainingServiceConfig from . import util -__all__ = ['KubeflowConfig', 'KubeflowRoleConfig', 'KubeflowNfsConfig', 'KubeflowAzureStorageConfig'] +__all__ = ['KubeflowConfig', 'KubeflowRoleConfig', 'KubeflowStorageConfig', 'KubeflowNfsConfig', 'KubeflowAzureStorageConfig'] @dataclass(init=False) -class _KubeflowStorageConfig(ConfigBase): - storage: str +class KubeflowStorageConfig(ConfigBase): + storage_type: str server: Optional[str] = None path: Optional[str] = None azure_account: Optional[str] = None azure_share: Optional[str] = None - key_vault: Optional[str] = None - key_vault_secret: Optional[str] = None + key_vault_name: Optional[str] = None + key_vault_key: Optional[str] = None @dataclass(init=False) -class KubeflowNfsConfig(_KubeflowStorageConfig): +class KubeflowNfsConfig(KubeflowStorageConfig): storage: str = 'nfs' server: str path: str @@ -32,18 +32,19 @@ class KubeflowAzureStorageConfig(ConfigBase): storage: str = 'azureStorage' azure_account: str azure_share: str - key_vault: str - key_vault_secret: str + key_vault_name: str + key_vault_key: str @dataclass(init=False) class KubeflowRoleConfig(ConfigBase): replicas: int command: str - gpu_number: int + gpu_number: Optional[int] = 0 cpu_number: int memory_size: str docker_image: str = 'msranni/nni:latest' + code_directory: str @dataclass(init=False) @@ -51,18 +52,21 @@ class KubeflowConfig(TrainingServiceConfig): platform: str = 'kubeflow' operator: str api_version: str - storage: _KubeflowStorageConfig - worker: KubeflowRoleConfig - parameter_server: Optional[KubeflowRoleConfig] = None + storage: KubeflowStorageConfig + worker: Optional[KubeflowRoleConfig] = None + ps: Optional[KubeflowRoleConfig] = None + master: Optional[KubeflowRoleConfig] = None + reuse_mode: Optional[bool] = True #set reuse mode as true for v2 config def __init__(self, **kwargs): kwargs = util.case_insensitive(kwargs) - kwargs['storage'] = util.load_config(_KubeflowStorageConfig, kwargs.get('storage')) + kwargs['storage'] = util.load_config(KubeflowStorageConfig, kwargs.get('storage')) kwargs['worker'] = util.load_config(KubeflowRoleConfig, kwargs.get('worker')) - kwargs['parameterserver'] = util.load_config(KubeflowRoleConfig, kwargs.get('parameterserver')) + kwargs['ps'] = util.load_config(KubeflowRoleConfig, kwargs.get('ps')) + kwargs['master'] = util.load_config(KubeflowRoleConfig, kwargs.get('master')) super().__init__(**kwargs) _validation_rules = { 'platform': lambda value: (value == 'kubeflow', 'cannot be modified'), 'operator': lambda value: value in ['tf-operator', 'pytorch-operator'] - } + } \ No newline at end of file diff --git a/ts/nni_manager/common/experimentConfig.ts b/ts/nni_manager/common/experimentConfig.ts index fff40c547c..5412d5fa92 100644 --- a/ts/nni_manager/common/experimentConfig.ts +++ b/ts/nni_manager/common/experimentConfig.ts @@ -5,6 +5,9 @@ import * as assert from 'assert'; +import { KubeflowOperator, OperatorApiVersion } from '../training_service/kubernetes/kubeflow/kubeflowConfig' +import { KubernetesStorageKind } from '../training_service/kubernetes/kubernetesConfig'; + export interface TrainingServiceConfig { platform: string; } @@ -68,35 +71,38 @@ export interface AmlConfig extends TrainingServiceConfig { maxTrialNumberPerGpu: number; } -/* Kubeflow */ - -// FIXME: merge with shared storage config export interface KubeflowStorageConfig { - storage: string; + storageType: string; + maxTrialNumberPerGpu?: number; server?: string; path?: string; azureAccount?: string; azureShare?: string; - keyVault?: string; - keyVaultSecret?: string; + keyVaultName?: string; + keyVaultKey?: string; } export interface KubeflowRoleConfig { replicas: number; + codeDirectory: string; command: string; gpuNumber: number; cpuNumber: number; - memorySize: string; + memorySize: number; dockerImage: string; + privateRegistryAuthPath?: string; } export interface KubeflowConfig extends TrainingServiceConfig { platform: 'kubeflow'; - operator: string; - apiVersion: string; + ps?: KubeflowRoleConfig; + master?: KubeflowRoleConfig; + worker?: KubeflowRoleConfig; + maxTrialNumberPerGpu: number; + operator: KubeflowOperator; + apiVersion: OperatorApiVersion; storage: KubeflowStorageConfig; - worker: KubeflowRoleConfig; - parameterServer?: KubeflowRoleConfig; + reuseMode: boolean; } /* FrameworkController */ @@ -220,4 +226,4 @@ export function flattenConfig(config: ExperimentConfig, platform: string): T Object.assign(flattened, config.trainingService); } return flattened; -} +} \ No newline at end of file diff --git a/ts/nni_manager/core/nnimanager.ts b/ts/nni_manager/core/nnimanager.ts index 959371d09a..ea12f216b6 100644 --- a/ts/nni_manager/core/nnimanager.ts +++ b/ts/nni_manager/core/nnimanager.ts @@ -449,9 +449,6 @@ class NNIManager implements Manager { if (platform === 'local') { const module_ = await import('../training_service/local/localTrainingService'); return new module_.LocalTrainingService(config); - } else if (platform === 'kubeflow') { - const module_ = await import('../training_service/kubernetes/kubeflow/kubeflowTrainingService'); - return new module_.KubeflowTrainingService(); } else if (platform === 'frameworkcontroller') { const module_ = await import('../training_service/kubernetes/frameworkcontroller/frameworkcontrollerTrainingService'); return new module_.FrameworkControllerTrainingService(); diff --git a/ts/nni_manager/training_service/reusable/environments/environmentServiceFactory.ts b/ts/nni_manager/training_service/reusable/environments/environmentServiceFactory.ts index 24d7b8c088..aa1bcc5e6d 100644 --- a/ts/nni_manager/training_service/reusable/environments/environmentServiceFactory.ts +++ b/ts/nni_manager/training_service/reusable/environments/environmentServiceFactory.ts @@ -2,6 +2,7 @@ import { AMLEnvironmentService } from './amlEnvironmentService'; import { OpenPaiEnvironmentService } from './openPaiEnvironmentService'; import { LocalEnvironmentService } from './localEnvironmentService'; import { RemoteEnvironmentService } from './remoteEnvironmentService'; +import { KubeflowEnvironmentService } from './kubernetes/kubeflowEnvironmentService'; import { EnvironmentService } from '../environment'; import { ExperimentConfig } from '../../../common/experimentConfig'; import { ExperimentStartupInfo } from '../../../common/experimentStartupInfo'; @@ -20,6 +21,8 @@ export async function createEnvironmentService(name: string, config: ExperimentC return new AMLEnvironmentService(config, info); case 'openpai': return new OpenPaiEnvironmentService(config, info); + case 'kubeflow': + return new KubeflowEnvironmentService(config, info); } const esConfig = await getCustomEnvironmentServiceConfig(name); @@ -29,4 +32,4 @@ export async function createEnvironmentService(name: string, config: ExperimentC const esModule = importModule(esConfig.nodeModulePath); const esClass = esModule[esConfig.nodeClassName] as any; return new esClass(config, info); -} +} \ No newline at end of file diff --git a/ts/nni_manager/training_service/reusable/environments/kubernetes/kubeflowEnvironmentService.ts b/ts/nni_manager/training_service/reusable/environments/kubernetes/kubeflowEnvironmentService.ts new file mode 100644 index 0000000000..7f5974a8a0 --- /dev/null +++ b/ts/nni_manager/training_service/reusable/environments/kubernetes/kubeflowEnvironmentService.ts @@ -0,0 +1,290 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +'use strict'; + +import * as fs from 'fs'; +import * as path from 'path'; +import * as component from '../../../../common/component'; +import { ExperimentConfig, KubeflowConfig, flattenConfig } from '../../../../common/experimentConfig'; +import { ExperimentStartupInfo } from '../../../../common/experimentStartupInfo'; +import { EnvironmentInformation } from '../../environment'; +import { KubernetesEnvironmentService } from './kubernetesEnvironmentService'; +import { KubeflowOperatorClientFactory } from '../../../kubernetes/kubeflow/kubeflowApiClient'; +import { KubeflowClusterConfigAzure } from '../../../kubernetes/kubeflow/kubeflowConfig'; +import { KeyVaultConfig, AzureStorage } from '../../../kubernetes/kubernetesConfig'; + +interface FlattenKubeflowConfig extends ExperimentConfig, KubeflowConfig { } + +@component.Singleton +export class KubeflowEnvironmentService extends KubernetesEnvironmentService { + + private config: FlattenKubeflowConfig; + private createStoragePromise?: Promise; + + + constructor(config: ExperimentConfig, info: ExperimentStartupInfo) { + super(config, info); + this.experimentId = info.experimentId; + this.config = flattenConfig(config, 'kubeflow'); + // Create kubernetesCRDClient + this.kubernetesCRDClient = KubeflowOperatorClientFactory.createClient( + this.config.operator, this.config.apiVersion); + // Create storage + if (this.config.storage.storageType === 'azureStorage') { + if (this.config.storage.azureShare === undefined || + this.config.storage.azureAccount === undefined || + this.config.storage.keyVaultName === undefined || + this.config.storage.keyVaultKey === undefined) { + throw new Error("Azure storage configuration error!"); + } + + const azureStorage: AzureStorage = new AzureStorage(this.config.storage.azureShare, this.config.storage.azureAccount); + const keyValutConfig: KeyVaultConfig = new KeyVaultConfig(this.config.storage.keyVaultName, this.config.storage.keyVaultKey); + const azureKubeflowClusterConfig: KubeflowClusterConfigAzure = new KubeflowClusterConfigAzure( + this.config.operator, this.config.apiVersion, keyValutConfig, azureStorage); + this.azureStorageAccountName = azureKubeflowClusterConfig.azureStorage.accountName; + this.azureStorageShare = azureKubeflowClusterConfig.azureStorage.azureShare; + + this.createStoragePromise = this.createAzureStorage( + azureKubeflowClusterConfig.keyVault.vaultName, + azureKubeflowClusterConfig.keyVault.name + ); + } else if (this.config.storage.storageType === 'nfs') { + if (this.config.storage.server === undefined || + this.config.storage.path === undefined) { + throw new Error("NFS storage configuration error!"); + } + this.createStoragePromise = this.createNFSStorage( + this.config.storage.server, + this.config.storage.path + ); + } + } + + public get environmentMaintenceLoopInterval(): number { + return 5000; + } + + public get hasStorageService(): boolean { + return false; + } + + public get getName(): string { + return 'kubeflow'; + } + + public async startEnvironment(environment: EnvironmentInformation): Promise { + if (this.kubernetesCRDClient === undefined) { + throw new Error("kubernetesCRDClient not initialized!"); + } + if (this.createStoragePromise) { + await this.createStoragePromise; + } + environment.command = `mv envs outputs/envs && cd outputs && ${environment.command}`; + if (this.config.deprecated && this.config.deprecated.useActiveGpu !== undefined) { + environment.useActiveGpu = this.config.deprecated.useActiveGpu; + } + environment.maxTrialNumberPerGpu = this.config.maxTrialNumberPerGpu; + + const kubeflowJobName: string = `nni-exp-${this.experimentId}-env-${environment.id}`.toLowerCase(); + + await fs.promises.writeFile(path.join(this.environmentLocalTempFolder, "run.sh"), environment.command, { encoding: 'utf8' }); + + //upload script files to sotrage + const trialJobOutputUrl: string = await this.uploadFolder(this.environmentLocalTempFolder, `nni/${this.experimentId}`); + environment.trackingUrl = trialJobOutputUrl; + // Generate kubeflow job resource config object + const kubeflowJobConfig: any = await this.prepareKubeflowConfig(environment.id, kubeflowJobName); + // Create kubeflow job based on generated kubeflow job resource config + await this.kubernetesCRDClient.createKubernetesJob(kubeflowJobConfig); + } + + /** + * upload local folder to nfs or azureStroage + */ + private async uploadFolder(srcDirectory: string, destDirectory: string): Promise { + if (this.config.storage.storageType === 'azureStorage') { + if (this.azureStorageClient === undefined) { + throw new Error('azureStorageClient is not initialized'); + } + return await this.uploadFolderToAzureStorage(srcDirectory, destDirectory, 2); + } else { + // do not need to upload files to nfs server, temp folder already mounted to nfs + return `nfs://${this.config.storage.server}:${destDirectory}`; + } + } + + private async prepareKubeflowConfig(envId: string, kubeflowJobName: string): Promise { + const workerPodResources: any = {}; + if (this.config.worker !== undefined) { + workerPodResources.requests = this.generatePodResource(this.config.worker.memorySize, this.config.worker.cpuNumber, + this.config.worker.gpuNumber); + } + workerPodResources.limits = {...workerPodResources.requests}; + + const nonWorkerResources: any = {}; + if (this.config.operator === 'tf-operator') { + if (this.config.ps !== undefined) { + nonWorkerResources.requests = this.generatePodResource(this.config.ps.memorySize, this.config.ps.cpuNumber, + this.config.ps.gpuNumber); + nonWorkerResources.limits = {...nonWorkerResources.requests}; + } + } else if (this.config.operator === 'pytorch-operator') { + if (this.config.master !== undefined) { + nonWorkerResources.requests = this.generatePodResource(this.config.master.memorySize, this.config.master.cpuNumber, + this.config.master.gpuNumber); + nonWorkerResources.limits = {...nonWorkerResources.requests}; + } + } + + // Generate kubeflow job resource config object + const kubeflowJobConfig: any = await this.generateKubeflowJobConfig(envId, kubeflowJobName, workerPodResources, nonWorkerResources); + + return Promise.resolve(kubeflowJobConfig); + } + + public generatePodResource(memory: number, cpuNum: number, gpuNum: number): any { + const resources: any = { + memory: `${memory}Mi`, + cpu: `${cpuNum}` + }; + + if (gpuNum !== 0) { + resources['nvidia.com/gpu'] = `${gpuNum}`; + } + + return resources; + } + + /** + * Generate kubeflow resource config file + * @param kubeflowJobName job name + * @param workerPodResources worker pod template + * @param nonWorkerPodResources non-worker pod template, like ps or master + */ + private async generateKubeflowJobConfig(envId: string, kubeflowJobName: string, workerPodResources: any, + nonWorkerPodResources?: any): Promise { + + if (this.kubernetesCRDClient === undefined) { + throw new Error('Kubeflow operator client is not initialized'); + } + + const replicaSpecsObj: any = {}; + const replicaSpecsObjMap: Map = new Map(); + if (this.config.operator === 'tf-operator') { + if (this.config.worker) { + const privateRegistrySecretName = await this.createRegistrySecret(this.config.worker.privateRegistryAuthPath); + replicaSpecsObj.Worker = this.generateReplicaConfig(this.config.worker.replicas, + this.config.worker.dockerImage, 'run.sh', workerPodResources, privateRegistrySecretName); + } + if (this.config.ps !== undefined) { + const privateRegistrySecretName: string | undefined = await this.createRegistrySecret(this.config.ps.privateRegistryAuthPath); + replicaSpecsObj.Ps = this.generateReplicaConfig(this.config.ps.replicas, + this.config.ps.dockerImage, 'run.sh', nonWorkerPodResources, privateRegistrySecretName); + } + replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {tfReplicaSpecs: replicaSpecsObj}); + } else if (this.config.operator === 'pytorch-operator') { + if (this.config.worker !== undefined) { + const privateRegistrySecretName: string | undefined = await this.createRegistrySecret(this.config.worker.privateRegistryAuthPath); + replicaSpecsObj.Worker = this.generateReplicaConfig(this.config.worker.replicas, + this.config.worker.dockerImage, 'run.sh', workerPodResources, privateRegistrySecretName); + } + if (this.config.master !== undefined) { + const privateRegistrySecretName: string | undefined = await this.createRegistrySecret(this.config.master.privateRegistryAuthPath); + replicaSpecsObj.Master = this.generateReplicaConfig(this.config.master.replicas, + this.config.master.dockerImage, 'run.sh', nonWorkerPodResources, privateRegistrySecretName); + + } + replicaSpecsObjMap.set(this.kubernetesCRDClient.jobKind, {pytorchReplicaSpecs: replicaSpecsObj}); + } + + return Promise.resolve({ + apiVersion: `kubeflow.org/${this.kubernetesCRDClient.apiVersion}`, + kind: this.kubernetesCRDClient.jobKind, + metadata: { + name: kubeflowJobName, + namespace: 'default', + labels: { + app: this.NNI_KUBERNETES_TRIAL_LABEL, + expId: this.experimentId, + envId: envId + } + }, + spec: replicaSpecsObjMap.get(this.kubernetesCRDClient.jobKind) + }); + } + + /** + * Generate tf-operator's tfjobs replica config section + * @param replicaNumber replica number + * @param replicaImage image + * @param runScriptFile script file name + * @param podResources pod resource config section + */ + private generateReplicaConfig(replicaNumber: number, replicaImage: string, runScriptFile: string, + podResources: any, privateRegistrySecretName: string | undefined): any { + if (this.kubernetesCRDClient === undefined) { + throw new Error('Kubeflow operator client is not initialized'); + } + // The config spec for volume field + const volumeSpecMap: Map = new Map(); + if (this.config.storage.storageType === 'azureStorage') { + volumeSpecMap.set('nniVolumes', [ + { + name: 'nni-vol', + azureFile: { + secretName: `${this.azureStorageSecretName}`, + shareName: `${this.azureStorageShare}`, + readonly: false + } + }]); + } else { + volumeSpecMap.set('nniVolumes', [ + { + name: 'nni-vol', + nfs: { + server: `${this.config.storage.server}`, + path: `${this.config.storage.path}` + } + }]); + } + // The config spec for container field + const containersSpecMap: Map = new Map(); + containersSpecMap.set('containers', [ + { + // Kubeflow tensorflow operator requires that containers' name must be tensorflow + // TODO: change the name based on operator's type + name: this.kubernetesCRDClient.containerName, + image: replicaImage, + args: ['sh', `${path.join(this.environmentWorkingFolder, runScriptFile)}`], + volumeMounts: [ + { + name: 'nni-vol', + mountPath: this.CONTAINER_MOUNT_PATH + }], + resources: podResources + } + ]); + const spec: any = { + containers: containersSpecMap.get('containers'), + restartPolicy: 'ExitCode', + volumes: volumeSpecMap.get('nniVolumes') + } + if (privateRegistrySecretName) { + spec.imagePullSecrets = [ + { + name: privateRegistrySecretName + }] + } + return { + replicas: replicaNumber, + template: { + metadata: { + creationTimestamp: null + }, + spec: spec + } + } + } +} diff --git a/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts b/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts new file mode 100644 index 0000000000..a986023e5a --- /dev/null +++ b/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService.ts @@ -0,0 +1,260 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +'use strict'; + +import * as cpp from 'child-process-promise'; +import * as path from 'path'; +import * as azureStorage from 'azure-storage'; +import {Base64} from 'js-base64'; +import {String} from 'typescript-string-operations'; +import { ExperimentConfig } from '../../../../common/experimentConfig'; +import { ExperimentStartupInfo } from '../../../../common/experimentStartupInfo'; +import { getLogger, Logger } from '../../../../common/log'; +import { EnvironmentInformation, EnvironmentService } from '../../environment'; +import {GeneralK8sClient, KubernetesCRDClient} from '../../../kubernetes/kubernetesApiClient'; +import {AzureStorageClientUtility} from '../../../kubernetes/azureStorageClientUtils'; +import { KubeflowJobStatus } from '../../../kubernetes/kubeflow/kubeflowConfig'; +import {delay, uniqueString} from '../../../../common/utils'; +const fs = require('fs'); + +export class KubernetesEnvironmentService extends EnvironmentService { + protected azureStorageClient?: azureStorage.FileService; + protected azureStorageShare?: string; + protected azureStorageSecretName?: string; + protected azureStorageAccountName?: string; + protected genericK8sClient: GeneralK8sClient; + protected kubernetesCRDClient?: KubernetesCRDClient; + protected experimentRootDir: string; + protected experimentId: string; + + // experiment root dir in NFS + protected environmentLocalTempFolder: string; + protected NNI_KUBERNETES_TRIAL_LABEL: string = 'nni-kubernetes-trial'; + protected CONTAINER_MOUNT_PATH: string; + protected log: Logger = getLogger('KubernetesEnvironmentService'); + protected environmentWorkingFolder: string; + + constructor(config: ExperimentConfig, info: ExperimentStartupInfo) { + super(); + this.CONTAINER_MOUNT_PATH = '/tmp/mount'; + this.genericK8sClient = new GeneralK8sClient(); + this.experimentRootDir = info.logDir; + this.environmentLocalTempFolder = path.join(this.experimentRootDir, "environment-temp"); + this.experimentId = info.experimentId; + this.environmentWorkingFolder = path.join(this.CONTAINER_MOUNT_PATH, 'nni', this.experimentId); + } + + public get environmentMaintenceLoopInterval(): number { + return 5000; + } + + public get hasStorageService(): boolean { + return false; + } + + public get getName(): string { + return 'kubernetes'; + } + + protected async createAzureStorage(vaultName: string, valutKeyName: string): Promise { + try { + const result: any = await cpp.exec(`az keyvault secret show --name ${valutKeyName} --vault-name ${vaultName}`); + if (result.stderr) { + const errorMessage: string = result.stderr; + this.log.error(errorMessage); + + return Promise.reject(errorMessage); + } + const storageAccountKey: any = JSON.parse(result.stdout).value; + if (this.azureStorageAccountName === undefined) { + throw new Error('azureStorageAccountName not initialized!'); + } + //create storage client + this.azureStorageClient = azureStorage.createFileService(this.azureStorageAccountName, storageAccountKey); + await AzureStorageClientUtility.createShare(this.azureStorageClient, this.azureStorageShare); + //create sotrage secret + this.azureStorageSecretName = String.Format('nni-secret-{0}', uniqueString(8) + .toLowerCase()); + if (this.genericK8sClient === undefined) { + throw new Error("genericK8sClient undefined!"); + } + const namespace = this.genericK8sClient.getNamespace ? this.genericK8sClient.getNamespace : "default" + await this.genericK8sClient.createSecret( + { + apiVersion: 'v1', + kind: 'Secret', + metadata: { + name: this.azureStorageSecretName, + namespace: namespace, + labels: { + app: this.NNI_KUBERNETES_TRIAL_LABEL, + expId: this.experimentId + } + }, + type: 'Opaque', + data: { + azurestorageaccountname: Base64.encode(this.azureStorageAccountName), + azurestorageaccountkey: Base64.encode(storageAccountKey) + } + } + ); + } catch (error) { + this.log.error(error); + + return Promise.reject(error); + } + + return Promise.resolve(); + } + + /** + * upload local directory to azureStorage + * @param srcDirectory the source directory of local folder + * @param destDirectory the target directory in azure + * @param uploadRetryCount the retry time when upload failed + */ + protected async uploadFolderToAzureStorage(srcDirectory: string, destDirectory: string, uploadRetryCount: number | undefined): Promise { + if (this.azureStorageClient === undefined) { + throw new Error('azureStorageClient is not initialized'); + } + let retryCount: number = 1; + if (uploadRetryCount) { + retryCount = uploadRetryCount; + } + let uploadSuccess: boolean = false; + let folderUriInAzure = ''; + try { + do { + uploadSuccess = await AzureStorageClientUtility.uploadDirectory( + this.azureStorageClient, + `${destDirectory}`, + this.azureStorageShare, + `${srcDirectory}`); + if (!uploadSuccess) { + //wait for 5 seconds to re-upload files + await delay(5000); + this.log.info('Upload failed, Retry: upload files to azure-storage'); + } else { + folderUriInAzure = `https://${this.azureStorageAccountName}.file.core.windows.net/${this.azureStorageShare}/${destDirectory}`; + break; + } + } while (retryCount-- >= 0) + } catch (error) { + this.log.error(error); + //return a empty url when got error + return Promise.resolve(''); + } + return Promise.resolve(folderUriInAzure); + } + + protected async createNFSStorage(nfsServer: string, nfsPath: string): Promise { + await cpp.exec(`mkdir -p ${this.environmentLocalTempFolder}`); + try { + await cpp.exec(`sudo mount ${nfsServer}:${nfsPath} ${this.environmentLocalTempFolder}`); + } catch (error) { + const mountError: string = `Mount NFS ${nfsServer}:${nfsPath} to ${this.environmentLocalTempFolder} failed, error is ${error}`; + this.log.error(mountError); + + return Promise.reject(mountError); + } + + return Promise.resolve(); + } + protected async createPVCStorage(pvcPath: string): Promise { + try { + await cpp.exec(`mkdir -p ${pvcPath}`); + await cpp.exec(`sudo ln -s ${pvcPath} ${this.environmentLocalTempFolder}`); + } catch (error) { + const linkError: string = `Linking ${pvcPath} to ${this.environmentLocalTempFolder} failed, error is ${error}`; + this.log.error(linkError); + + return Promise.reject(linkError); + } + + return Promise.resolve(); + } + + protected async createRegistrySecret(filePath: string | undefined): Promise { + if (filePath === undefined || filePath === '') { + return undefined; + } + const body = fs.readFileSync(filePath).toString('base64'); + const registrySecretName = String.Format('nni-secret-{0}', uniqueString(8) + .toLowerCase()); + const namespace = this.genericK8sClient.getNamespace ? this.genericK8sClient.getNamespace : "default" + await this.genericK8sClient.createSecret( + { + apiVersion: 'v1', + kind: 'Secret', + metadata: { + name: registrySecretName, + namespace: namespace, + labels: { + app: this.NNI_KUBERNETES_TRIAL_LABEL, + expId: this.experimentId + } + }, + type: 'kubernetes.io/dockerconfigjson', + data: { + '.dockerconfigjson': body + } + } + ); + return registrySecretName; + } + + + public async refreshEnvironmentsStatus(environments: EnvironmentInformation[]): Promise { + environments.forEach(async (environment) => { + if (this.kubernetesCRDClient === undefined) { + throw new Error("kubernetesCRDClient undefined") + } + const kubeflowJobName: string = `nni-exp-${this.experimentId}-env-${environment.id}`.toLowerCase(); + const kubernetesJobInfo = await this.kubernetesCRDClient.getKubernetesJob(kubeflowJobName); + if (kubernetesJobInfo.status && kubernetesJobInfo.status.conditions) { + const latestCondition: any = kubernetesJobInfo.status.conditions[kubernetesJobInfo.status.conditions.length - 1]; + const tfJobType: KubeflowJobStatus = latestCondition.type; + switch (tfJobType) { + case 'Created': + environment.setStatus('WAITING'); + break; + case 'Running': + environment.setStatus('RUNNING'); + break; + case 'Failed': + environment.setStatus('FAILED'); + break; + case 'Succeeded': + environment.setStatus('SUCCEEDED'); + break; + default: + } + } + }); + } + + public async startEnvironment(environment: EnvironmentInformation): Promise { + throw new Error("Not implemented"); + } + + public async stopEnvironment(environment: EnvironmentInformation): Promise { + if (this.kubernetesCRDClient === undefined) { + throw new Error('kubernetesCRDClient not initialized!'); + } + try { + await this.kubernetesCRDClient.deleteKubernetesJob(new Map( + [ + ['app', this.NNI_KUBERNETES_TRIAL_LABEL], + ['expId', this.experimentId], + ['envId', environment.id] + ] + )); + } catch (err) { + const errorMessage: string = `Delete env ${environment.id} failed: ${err}`; + this.log.error(errorMessage); + + return Promise.reject(errorMessage); + } + } +} diff --git a/ts/nni_manager/training_service/reusable/routerTrainingService.ts b/ts/nni_manager/training_service/reusable/routerTrainingService.ts index bc9f413d05..858acb7ba7 100644 --- a/ts/nni_manager/training_service/reusable/routerTrainingService.ts +++ b/ts/nni_manager/training_service/reusable/routerTrainingService.ts @@ -5,11 +5,12 @@ import { getLogger, Logger } from '../../common/log'; import { MethodNotImplementedError } from '../../common/errors'; -import { ExperimentConfig, RemoteConfig, OpenpaiConfig } from '../../common/experimentConfig'; +import { ExperimentConfig, RemoteConfig, OpenpaiConfig, KubeflowConfig } from '../../common/experimentConfig'; import { TrainingService, TrialJobApplicationForm, TrialJobDetail, TrialJobMetric, LogType } from '../../common/trainingService'; import { delay } from '../../common/utils'; import { PAITrainingService } from '../pai/paiTrainingService'; import { RemoteMachineTrainingService } from '../remote_machine/remoteMachineTrainingService'; +import { KubeflowTrainingService } from '../kubernetes/kubeflow/kubeflowTrainingService'; import { TrialDispatcher } from './trialDispatcher'; @@ -29,6 +30,8 @@ class RouterTrainingService implements TrainingService { instance.internalTrainingService = new RemoteMachineTrainingService(config); } else if (platform === 'openpai' && !(config.trainingService).reuseMode) { instance.internalTrainingService = new PAITrainingService(config); + } else if (platform === 'kubeflow' && !(config.trainingService).reuseMode) { + instance.internalTrainingService = new KubeflowTrainingService(); } else { instance.internalTrainingService = await TrialDispatcher.construct(config); } @@ -125,4 +128,4 @@ class RouterTrainingService implements TrainingService { } } -export { RouterTrainingService }; +export { RouterTrainingService }; \ No newline at end of file From b5a435f7c184df7d5b7a6045f4c5f7230cf3df8f Mon Sep 17 00:00:00 2001 From: SparkSnail Date: Mon, 26 Jul 2021 12:10:18 +0800 Subject: [PATCH 3/3] remove unused file --- .../reusable/environments/kubernetes/kubernetesEnvironmentService | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService diff --git a/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService b/ts/nni_manager/training_service/reusable/environments/kubernetes/kubernetesEnvironmentService deleted file mode 100644 index e69de29bb2..0000000000