Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
Support remoteLoggingType (#901)
Browse files Browse the repository at this point in the history
If user set remoteloggingType in config file, log content will not be transmitted from trialkeeper
  • Loading branch information
SparkSnail authored Mar 22, 2019
1 parent d10b8bc commit c297650
Show file tree
Hide file tree
Showing 17 changed files with 60 additions and 15 deletions.
4 changes: 4 additions & 0 deletions docs/en_US/ExperimentConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ machineList:

__logLevel__ sets log level for the experiment, available log levels are: `trace, debug, info, warning, error, fatal`. The default value is `info`.

* __logCollection__
* Description
__logCollection__ set the way to collect log in remote, pai, kubeflow, frameworkcontroller platform. There are two ways to collect log, one way is from `http`, trial keeper will post log content back from http request in this way, but this way may slow down the speed to process logs in trialKeeper. The other way is `none`, trial keeper will not post log content back, and only post job metrics. If your log content is too big, you could consider setting this param be `none`.

* __tuner__
* Description

Expand Down
1 change: 1 addition & 0 deletions src/nni_manager/common/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ interface ExperimentParams {
multiPhase?: boolean;
multiThread?: boolean;
versionCheck?: boolean;
logCollection?: string;
tuner?: {
className: string;
builtinTunerName?: string;
Expand Down
4 changes: 4 additions & 0 deletions src/nni_manager/core/nnimanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ class NNIManager implements Manager {
if (expParams.versionCheck !== undefined) {
this.trainingService.setClusterMetadata('version_check', expParams.versionCheck.toString());
}
// Set up logCollection config
if (expParams.logCollection !== undefined) {
this.trainingService.setClusterMetadata('log_collection', expParams.logCollection.toString());
}

const dispatcherCommand: string = getMsgDispatcherCommand(expParams.tuner, expParams.assessor, expParams.advisor,
expParams.multiPhase, expParams.multiThread);
Expand Down
1 change: 1 addition & 0 deletions src/nni_manager/rest_server/restValidationSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ export namespace ValidationSchemas {
multiPhase: joi.boolean(),
multiThread: joi.boolean(),
versionCheck: joi.boolean(),
logCollection: joi.string(),
advisor: joi.object({
builtinAdvisorName: joi.string().valid('Hyperband'),
codeDir: joi.string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ export enum TrialConfigMetadataKey {
KUBEFLOW_CLUSTER_CONFIG = 'kubeflow_config',
NNI_MANAGER_IP = 'nni_manager_ip',
FRAMEWORKCONTROLLER_CLUSTER_CONFIG = 'frameworkcontroller_config',
VERSION_CHECK = 'version_check'
VERSION_CHECK = 'version_check',
LOG_COLLECTION = 'log_collection'
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ class FrameworkControllerTrainingService extends KubernetesTrainingService imple
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ class KubeflowTrainingService extends KubernetesTrainingService implements Kuber
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,5 @@ mkdir -p $NNI_OUTPUT_DIR
cp -rT $NNI_CODE_DIR $NNI_SYS_DIR
cd $NNI_SYS_DIR
sh install_nni.sh
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} --version '{11}'`
python3 -m nni_trial_tool.trial_keeper --trial_command '{8}' --nnimanager_ip {9} --nnimanager_port {10} --version '{11}' --log_collection '{12}'`
+ `1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr`
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ abstract class KubernetesTrainingService {
protected kubernetesJobRestServer?: KubernetesJobRestServer;
protected kubernetesClusterConfig?: KubernetesClusterConfig;
protected versionCheck?: boolean = true;
protected logCollection: string;

constructor() {
this.log = getLogger();
Expand All @@ -72,6 +73,7 @@ abstract class KubernetesTrainingService {
this.nextTrialSequenceId = -1;
this.CONTAINER_MOUNT_PATH = '/tmp/mount';
this.genericK8sClient = new GeneralK8sClient();
this.logCollection = 'none';
}

public generatePodResource(memory: number, cpuNum: number, gpuNum: number) {
Expand Down Expand Up @@ -204,7 +206,8 @@ abstract class KubernetesTrainingService {
command,
nniManagerIp,
this.kubernetesRestServerPort,
version
version,
this.logCollection
);
return Promise.resolve(runScript);
}
Expand Down
2 changes: 1 addition & 1 deletion src/nni_manager/training_service/pai/paiData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export const PAI_TRIAL_COMMAND_FORMAT: string =
`export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4}
&& cd $NNI_SYS_DIR && sh install_nni.sh
&& python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}'
--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1' --version '{12}'`;
--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1' --version '{12}' --log_collection '{13}'`;

export const PAI_OUTPUT_DIR_FORMAT: string =
`hdfs://{0}:9000/`;
Expand Down
8 changes: 7 additions & 1 deletion src/nni_manager/training_service/pai/paiTrainingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class PAITrainingService implements TrainingService {
private nniManagerIpConfig?: NNIManagerIpConfig;
private copyExpCodeDirPromise?: Promise<void>;
private versionCheck?: boolean = true;
private logCollection: string;

constructor() {
this.log = getLogger();
Expand All @@ -88,6 +89,7 @@ class PAITrainingService implements TrainingService {
this.hdfsDirPattern = 'hdfs://(?<host>([0-9]{1,3}.){3}[0-9]{1,3})(:[0-9]{2,5})?(?<baseDir>/.*)?';
this.nextTrialSequenceId = -1;
this.paiTokenUpdateInterval = 7200000; //2hours
this.logCollection = 'none';
this.log.info('Construct OpenPAI training service.');
}

Expand Down Expand Up @@ -228,7 +230,8 @@ class PAITrainingService implements TrainingService {
this.hdfsOutputHost,
this.paiClusterConfig.userName,
HDFSClientUtility.getHdfsExpCodeDir(this.paiClusterConfig.userName),
version
version,
this.logCollection
).replace(/\r\n|\n|\r/gm, '');

console.log(`nniPAItrial command is ${nniPaiTrialCommand.trim()}`);
Expand Down Expand Up @@ -442,6 +445,9 @@ class PAITrainingService implements TrainingService {
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
default:
//Reject for unknown keys
throw new Error(`Uknown key: ${key}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ export NNI_PLATFORM=remote NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={
cd $NNI_SYS_DIR
sh install_nni.sh
echo $$ >{6}
python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' --version '{10}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
echo $? \`date +%s%3N\` >{11}`;
python3 -m nni_trial_tool.trial_keeper --trial_command '{7}' --nnimanager_ip '{8}' --nnimanager_port '{9}' --version '{10}' --log_collection '{11}' 1>$NNI_OUTPUT_DIR/trialkeeper_stdout 2>$NNI_OUTPUT_DIR/trialkeeper_stderr
echo $? \`date +%s%3N\` >{12}`;

export const HOST_JOB_SHELL_FORMAT: string =
`#!/bin/bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class RemoteMachineTrainingService implements TrainingService {
private readonly remoteOS: string;
private nniManagerIpConfig?: NNIManagerIpConfig;
private versionCheck: boolean = true;
private logCollection: string;

constructor(@component.Inject timer: ObservableTimer) {
this.remoteOS = 'linux';
Expand All @@ -91,6 +92,7 @@ class RemoteMachineTrainingService implements TrainingService {
this.timer = timer;
this.log = getLogger();
this.trialSequenceId = -1;
this.logCollection = 'none';
this.log.info('Construct remote machine training service.');
}

Expand Down Expand Up @@ -376,6 +378,9 @@ class RemoteMachineTrainingService implements TrainingService {
case TrialConfigMetadataKey.VERSION_CHECK:
this.versionCheck = (value === 'true' || value === 'True');
break;
case TrialConfigMetadataKey.LOG_COLLECTION:
this.logCollection = value;
break;
default:
//Reject for unknown keys
throw new Error(`Uknown key: ${key}`);
Expand Down Expand Up @@ -598,6 +603,7 @@ class RemoteMachineTrainingService implements TrainingService {
nniManagerIp,
this.remoteRestServerPort,
version,
this.logCollection,
path.join(trialWorkingFolder, '.nni', 'code')
)

Expand Down
1 change: 1 addition & 0 deletions tools/nni_cmd/config_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
Optional('logDir'): os.path.isdir,
Optional('debug'): bool,
Optional('logLevel'): Or('trace', 'debug', 'info', 'warning', 'error', 'fatal'),
Optional('logCollection'): Or('http', 'none'),
'useAnnotation': bool,
Optional('advisor'): Or({
'builtinAdvisorName': Or('Hyperband'),
Expand Down
2 changes: 2 additions & 0 deletions tools/nni_cmd/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ def set_experiment(experiment_config, mode, port, config_file_name):
#debug mode should disable version check
if experiment_config.get('debug') is not None:
request_data['versionCheck'] = not experiment_config.get('debug')
if experiment_config.get('logCollection'):
request_data['logCollection'] = experiment_config.get('logCollection')

request_data['clusterMetaData'] = []
if experiment_config['trainingServicePlatform'] == 'local':
Expand Down
20 changes: 15 additions & 5 deletions tools/nni_trial_tool/log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import logging.handlers
import time
import threading
import re

from datetime import datetime
from enum import Enum, unique
Expand Down Expand Up @@ -81,7 +82,7 @@ class RemoteLogger(object):
"""
NNI remote logger
"""
def __init__(self, syslog_host, syslog_port, tag, std_output_type, log_level=logging.INFO):
def __init__(self, syslog_host, syslog_port, tag, std_output_type, log_collection, log_level=logging.INFO):
'''
constructor
'''
Expand All @@ -94,12 +95,13 @@ def __init__(self, syslog_host, syslog_port, tag, std_output_type, log_level=log
self.orig_stdout = sys.__stdout__
else:
self.orig_stdout = sys.__stderr__
self.log_collection = log_collection

def get_pipelog_reader(self):
'''
Get pipe for remote logger
'''
return PipeLogReader(self.logger, logging.INFO)
return PipeLogReader(self.logger, self.log_collection, logging.INFO)

def write(self, buf):
'''
Expand All @@ -117,7 +119,7 @@ class PipeLogReader(threading.Thread):
"""
The reader thread reads log data from pipe
"""
def __init__(self, logger, log_level=logging.INFO):
def __init__(self, logger, log_collection, log_level=logging.INFO):
"""Setup the object with a logger and a loglevel
and start the thread
"""
Expand All @@ -131,6 +133,8 @@ def __init__(self, logger, log_level=logging.INFO):
self.orig_stdout = sys.__stdout__
self._is_read_completed = False
self.process_exit = False
self.log_collection = log_collection
self.log_pattern = re.compile(r'^NNISDK_MEb\'.*\'$')

def _populateQueue(stream, queue):
'''
Expand All @@ -143,8 +147,6 @@ def _populateQueue(stream, queue):
line = self.queue.get(True, 5)
try:
self.logger.log(self.log_level, line.rstrip())
self.orig_stdout.write(line.rstrip() + '\n')
self.orig_stdout.flush()
except Exception as e:
pass
except Exception as e:
Expand All @@ -165,9 +167,17 @@ def fileno(self):

def run(self):
"""Run the thread, logging everything.
If the log_collection is 'none', the log content will not be enqueued
"""
for line in iter(self.pipeReader.readline, ''):
self.orig_stdout.write(line.rstrip() + '\n')
self.orig_stdout.flush()
if self.log_collection == 'none':
# If not match metrics, do not put the line into queue
if not self.log_pattern.match(line):
continue
self.queue.put(line)

self.pipeReader.close()

def close(self):
Expand Down
6 changes: 3 additions & 3 deletions tools/nni_trial_tool/trial_keeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ def main_loop(args):

stdout_file = open(STDOUT_FULL_PATH, 'a+')
stderr_file = open(STDERR_FULL_PATH, 'a+')

trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout)
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection)
# redirect trial keeper's stdout and stderr to syslog
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout)
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection)
sys.stdout = sys.stderr = trial_keeper_syslogger
# backward compatibility
hdfs_host = None
Expand Down Expand Up @@ -144,6 +143,7 @@ def check_version(args):
PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs')
PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL')
PARSER.add_argument('--version', type=str, help='the nni version transmitted from trainingService')
PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trialkeeper')
args, unknown = PARSER.parse_known_args()
if args.trial_command is None:
exit(1)
Expand Down

0 comments on commit c297650

Please sign in to comment.