Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

Commit

Permalink
refector file lock
Browse files Browse the repository at this point in the history
  • Loading branch information
Ning Shang committed Nov 26, 2020
1 parent 155c132 commit df491d4
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 54 deletions.
44 changes: 32 additions & 12 deletions nni/tools/nnictl/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import ruamel.yaml as yaml
import psutil
import filelock
import glob
from colorama import Fore

from .constants import ERROR_INFO, NORMAL_INFO, WARNING_INFO
Expand Down Expand Up @@ -98,23 +99,42 @@ def generate_folder_name():
os.makedirs(temp_dir)
return temp_dir

class SimpleFileLock(filelock.SoftFileLock):
class SimplePreemptiveLock(filelock.SoftFileLock):
'''this is a lock support check lock expiration, if you do not need check expiration, you can use SoftFileLock'''
def __init__(self, lock_file, timeout=-1, stale=-1):
super(__class__, self).__init__(lock_file, timeout)
self._stale = stale
def __init__(self, lock_file, check_interval=-1):
super(__class__, self).__init__(lock_file, check_interval)
self._lock_file_name = '{}.{}'.format(self._lock_file, os.getpid())

def __enter__(self):
count = 0
while True:
try:
if os.path.isfile(self._lock_file) and time.time() - os.stat(self._lock_file).st_mtime > self._stale:
os.remove(self._lock_file)
self.acquire()
return self
except Exception:
print_warning('[{}] fail lock file, auto try again!'.format(count))
count += 1
except TimeoutError:
print_warning('fail lock file, auto try again!')

def _acquire(self):
open_mode = os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_TRUNC
try:
lock_file_names = glob.glob(self._lock_file + '.*')
for file_name in lock_file_names:
if os.path.exists(file_name) and time.time() - os.stat(file_name).st_mtime < self._timeout:
raise TimeoutError()
fd = os.open(self._lock_file_name, open_mode)
except (IOError, OSError, TimeoutError):
pass
else:
self._lock_file_fd = fd
return None

def _release(self):
os.close(self._lock_file_fd)
self._lock_file_fd = None
try:
os.remove(self._lock_file_name)
except OSError:
pass
return None

def get_file_lock(path: string, timeout=-1, stale=-1):
return SimpleFileLock(path + '.lock', timeout=timeout, stale=stale)
def get_file_lock(path: string, check_interval=-1):
return SimplePreemptiveLock(path + '.lock', check_interval=-1)
2 changes: 1 addition & 1 deletion nni/tools/nnictl/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Experiments:
def __init__(self, home_dir=NNICTL_HOME_DIR):
os.makedirs(home_dir, exist_ok=True)
self.experiment_file = os.path.join(home_dir, '.experiment')
self.lock = get_file_lock(self.experiment_file, timeout=1, stale=2)
self.lock = get_file_lock(self.experiment_file, check_interval=2)
with self.lock:
self.experiments = self.read_file()

Expand Down
41 changes: 16 additions & 25 deletions ts/nni_manager/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import * as lockfile from 'lockfile';
import { Deferred } from 'ts-deferred';
import { Container } from 'typescript-ioc';
import * as util from 'util';
import * as glob from 'glob';

import { Database, DataStore } from './datastore';
import { ExperimentStartupInfo, getExperimentStartupInfo, setExperimentStartupInfo } from './experimentStartupInfo';
import { ExperimentParams, Manager } from './manager';
import { ExperimentManager } from './experimentManager';
import { HyperParameters, TrainingService, TrialJobStatus } from './trainingService';
import { logLevelNameMap } from './log';
import { time } from 'console';

function getExperimentRootDir(): string {
return getExperimentStartupInfo()
Expand Down Expand Up @@ -424,40 +426,29 @@ function unixPathJoin(...paths: any[]): string {
return dir;
}

/**
* lock a file
*/
async function withLock(func: Function, filePath: string, lockOpts: {[key: string]: any}, ...args: any): Promise<any> {
const lockPath = path.join(path.dirname(filePath), path.basename(filePath) + '.lock');
const deferred = new Deferred<any>();
lockfile.lock(lockPath, lockOpts, (err: any) => {
if (err) {
deferred.reject(err);
}
try {
const result = func(...args);
lockfile.unlockSync(lockPath);
deferred.resolve(result);
} catch (err) {
deferred.reject(err);
}
});
return deferred.promise;
}

/**
* lock a file sync
*/
function withLockSync(func: Function, filePath: string, lockOpts: {[key: string]: any}, ...args: any): any {
const lockPath = path.join(path.dirname(filePath), path.basename(filePath) + '.lock');
lockfile.lockSync(lockPath, lockOpts);
const lockName = path.join(path.dirname(filePath), path.basename(filePath) + `.lock.${process.pid}`);
if (typeof lockOpts.stale === 'number'){
const lockPath = path.join(path.dirname(filePath), path.basename(filePath) + '.lock.*');
const lockFileNames: string[] = glob.sync(lockPath);
const canLock: boolean = lockFileNames.map((fileName) => {
return fs.existsSync(fileName) && Date.now() - fs.statSync(fileName).mtimeMs > lockOpts.stale;
}).filter(isExpired=>isExpired === false).length === 0;
if (!canLock) {
throw new Error('File has been locked.');
}
}
lockfile.lockSync(lockName, lockOpts);
const result = func(...args);
lockfile.unlockSync(lockPath);
lockfile.unlockSync(lockName);
return result;
}

export {
countFilesRecursively, validateFileNameRecursively, generateParamFileName, getMsgDispatcherCommand, getCheckpointDir, getExperimentsInfoPath,
getLogDir, getExperimentRootDir, getJobCancelStatus, getDefaultDatabaseDir, getIPV4Address, unixPathJoin, withLock, withLockSync,
getLogDir, getExperimentRootDir, getJobCancelStatus, getDefaultDatabaseDir, getIPV4Address, unixPathJoin, withLockSync,
mkDirP, mkDirPSync, delay, prepareUnitTest, parseArg, cleanupUnitTest, uniqueString, randomInt, randomSelect, getLogLevel, getVersion, getCmdPy, getTunerProc, isAlive, killPid, getNewLine
};
37 changes: 23 additions & 14 deletions ts/nni_manager/core/nniExperimentsManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import * as path from 'path';
import * as assert from 'assert';

import { getLogger, Logger } from '../common/log';
import { isAlive, withLock, withLockSync, getExperimentsInfoPath, delay } from '../common/utils';
import { isAlive, withLockSync, getExperimentsInfoPath, delay } from '../common/utils';
import { ExperimentManager } from '../common/experimentManager';
import { Deferred } from 'ts-deferred';

Expand All @@ -35,7 +35,7 @@ class NNIExperimentsManager implements ExperimentManager {
}

public async getExperimentsInfo(): Promise<JSON> {
const fileInfo = await this.readExperimentsInfo();
const fileInfo: FileInfo = await this.withLockIterated(this.readExperimentsInfo, 100);
const experimentsInformation = JSON.parse(fileInfo.buffer.toString());
const expIdList: Array<string> = Object.keys(experimentsInformation).filter((expId) => {
return experimentsInformation[expId]['status'] !== 'STOPPED';
Expand All @@ -44,7 +44,7 @@ class NNIExperimentsManager implements ExperimentManager {
return this.checkCrashed(expId, experimentsInformation[expId]['pid']);
}))).filter(crashedInfo => crashedInfo.isCrashed);
if (updateList.length > 0){
const result = await this.withLock(this.updateAllStatus, updateList.map(crashedInfo => crashedInfo.experimentId), fileInfo.mtime);
const result = await this.withLockIterated(this.updateAllStatus, 100, updateList.map(crashedInfo => crashedInfo.experimentId), fileInfo.mtime);
if (result !== undefined) {
return JSON.parse(JSON.stringify(Object.keys(result).map(key=>result[key])));
} else {
Expand Down Expand Up @@ -83,27 +83,36 @@ class NNIExperimentsManager implements ExperimentManager {
} catch (err) {
this.log.error(err);
this.log.debug(`Experiment Manager: Retry set key value: ${experimentId} {${key}: ${value}}`);
if (err.code === 'EEXIST') {
this.profileUpdateTimer[key] = setTimeout(this.setExperimentInfo.bind(this), 1000, experimentId, key, value);
if (err.code === 'EEXIST' || err.message === 'File has been locked.') {
this.profileUpdateTimer[key] = setTimeout(this.setExperimentInfo.bind(this), 100, experimentId, key, value);
}
}
}

private async withLock (func: Function, ...args: any): Promise<any> {
return withLock(func.bind(this), this.experimentsPath, {stale: 2 * 1000, retries: 100, retryWait: 100}, ...args);
private async withLockIterated (func: Function, retry: number, ...args: any): Promise<any> {
if (retry < 0) {
throw new Error('Lock file out of retries.');
}
try {
return this.withLockSync(func, ...args);
} catch(err) {
if (err.code === 'EEXIST' || err.message === 'File has been locked.') {
// retry wait is 100ms
delay(100);
return await this.withLockIterated(func, retry - 1, ...args);
}
throw err;
}
}

private withLockSync (func: Function, ...args: any): any {
return withLockSync(func.bind(this), this.experimentsPath, {stale: 2 * 1000}, ...args);
}

private async readExperimentsInfo(): Promise<FileInfo> {
return this.withLock((path: string) => {
const buffer: Buffer = fs.readFileSync(path);
const mtime: number = fs.statSync(path).mtimeMs;
return {buffer: buffer, mtime: mtime};
},
this.experimentsPath);
private readExperimentsInfo(): FileInfo {
const buffer: Buffer = fs.readFileSync(this.experimentsPath);
const mtime: number = fs.statSync(this.experimentsPath).mtimeMs;
return {buffer: buffer, mtime: mtime};
}

private async checkCrashed(expId: string, pid: number): Promise<CrashedInfo> {
Expand Down
4 changes: 2 additions & 2 deletions ts/nni_manager/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import { PAIYarnTrainingService } from './training_service/pai/paiYarn/paiYarnTr
import { DLTSTrainingService } from './training_service/dlts/dltsTrainingService';

function initStartupInfo(
startExpMode: string, ExperimentId: string, basePort: number, platform: string,
startExpMode: string, experimentId: string, basePort: number, platform: string,
logDirectory: string, experimentLogLevel: string, readonly: boolean): void {
const createNew: boolean = (startExpMode === ExperimentStartUpMode.NEW);
setExperimentStartupInfo(createNew, ExperimentId, basePort, platform, logDirectory, experimentLogLevel, readonly);
setExperimentStartupInfo(createNew, experimentId, basePort, platform, logDirectory, experimentLogLevel, readonly);
}

async function initContainer(foreground: boolean, platformMode: string, logFileName?: string): Promise<void> {
Expand Down

0 comments on commit df491d4

Please sign in to comment.