Skip to content

Commit

Permalink
fixed race condition on job initialization resolves #524 (#526)
Browse files Browse the repository at this point in the history
* fixed race condition on job initialization resolves #524

* moved initializeJob to job runner file

* changed needsAssets to need_assets

* removed uneeded need_assets variable passing

* fixed spacing of variable
  • Loading branch information
jsnoble authored and kstaken committed Sep 7, 2017
1 parent 7f96c07 commit f43e90f
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 92 deletions.
13 changes: 10 additions & 3 deletions lib/cluster/node_master.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ module.exports = function(context) {
});

messaging.register('cluster:slicer:create', function(createSlicerMsg) {
var needAssets = jobNeedsAssets(createSlicerMsg.job);

var slicerContext = {
assignment: 'slicer',
job: createSlicerMsg.job,
Expand All @@ -152,7 +154,7 @@ module.exports = function(context) {
logger.trace('starting a slicer', slicerContext);
context.foundation.startWorkers(1, slicerContext);

if (createSlicerMsg.needsAssets) {
if (needAssets) {
logger.info(`node ${context.sysconfig._nodeName} is checking assets for job ${createSlicerMsg.ex_id}`);
context.foundation.startWorkers(1, {
assignment: 'assets_loader',
Expand All @@ -169,7 +171,7 @@ module.exports = function(context) {

messaging.register('cluster:workers:create', function(createWorkerMsg) {
var numOfCurrentWorkers = Object.keys(context.cluster.workers).length;

var needAssets = jobNeedsAssets(createWorkerMsg.job);
var newWorkers = createWorkerMsg.workers;
logger.info(`Attempting to allocate ${newWorkers} workers.`);

Expand All @@ -194,7 +196,7 @@ module.exports = function(context) {
});

//for workers on nodes that don't have the asset loading process already going
if (createWorkerMsg.needsAssets && !assetIsLoading(context, createWorkerMsg.ex_id)) {
if (needAssets && !assetIsLoading(context, createWorkerMsg.ex_id)) {
logger.info(`node ${context.sysconfig._nodeName} is checking assets for job ${createWorkerMsg.ex_id}`);
context.foundation.startWorkers(1, {
assignment: 'assets_loader',
Expand Down Expand Up @@ -344,6 +346,11 @@ module.exports = function(context) {
sendNodeState()
}
});

function jobNeedsAssets(jobStr){
var job = JSON.parse(jobStr);
return job.assets && job.assets.length > 0
}

function getNodeState() {
var state = {
Expand Down
67 changes: 61 additions & 6 deletions lib/cluster/runners/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@

var Promise = require('bluebird');
var insertAnalyzers = require('../../utils/analytics').insertAnalyzers;
var parseError = require('../../utils/error_utils').parseError;

/*
* This module defines the job execution context on the worker nodes.
*/
module.exports = function(context) {
var op_runner = require('./op')(context);
var isSlicer = process.env.assignment === 'slicer';

function initialize() {
var job = getJob(process.env.job);
var max_retries = job.max_retries;
var job = getJob(process.env.job);
var needAssets = job.assets && job.assets.length > 0;
var max_retries = job.max_retries;
var assetPath = job.assets ? context.sysconfig.teraslice.assets_directory : null;
var jobAssets = job.assets ? job.assets : [];

function _instantiateJob() {
var slicer = null;
var reporter = null;
var queue = [];
var assetPath = job.assets ? context.sysconfig.teraslice.assets_directory : null;
var jobAssets = job.assets ? job.assets : [];

if (context.sysconfig.teraslice.reporter) {
throw new Error('reporters are not functional at this time, please do not set one in the configuration')
Expand Down Expand Up @@ -63,6 +65,59 @@ module.exports = function(context) {
}
}

function initialize(events, logger) {
return new Promise(function(resolve, reject) {
var gettingJob = true;
var gettingJobInterval;
var job;

events.on('worker:assets_loaded', function(ipcMessage) {
if (ipcMessage.error) {
logger.error(`Error while loading assets, error: ${ipcMessage.error}`);
events.removeAllListeners('worker:assets_loaded');
reject(ipcMessage.error)
}
else {
gettingJobInterval = setInterval(function() {
if (!gettingJob) {
gettingJob = true;
Promise.resolve(_instantiateJob())
.then(function(job) {
clearInterval(gettingJobInterval);
events.removeAllListeners('worker:assets_loaded');
resolve(job)
})
.catch(function(err) {
clearInterval(gettingJobInterval);
events.removeAllListeners('worker:assets_loaded');
logger.error('error initializing job after loading assets', err.message);
reject(err.message)
})
}
}, 100);

}
});

Promise.resolve(_instantiateJob())
.then(function(job) {
events.removeAllListeners('worker:assets_loaded');
clearInterval(gettingJobInterval);
resolve(job)
})
.catch(function(err) {
//if this errors, then we will wait for the events to fire to start job
if (needAssets) {
//error out if there are no assets and job cannot initialize straight away
reject(parseError(err))
}
else {
gettingJob = false;
}
});
})
}


function getJob(processJob) {
if (processJob) {
Expand Down
13 changes: 3 additions & 10 deletions lib/cluster/services/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,6 @@ module.exports = function(context, server) {
function allocateWorker(job) {
var ex_id = job.ex_id;
var job_id = job.job_id;
var needsAssets = job.assets && job.assets.length > 0;
var jobStr = JSON.stringify(job);
var sortedNodes = _.sortByOrder(cluster_state, 'available', 'desc');

Expand All @@ -470,8 +469,7 @@ module.exports = function(context, server) {
job_id: job_id,
workers: 1,
node_id: workerNodeID,
assignment: 'worker',
needsAssets: needsAssets
assignment: 'worker'
};

return notifyNode(workerNodeID, 'cluster:workers:create', data);
Expand All @@ -481,7 +479,6 @@ module.exports = function(context, server) {
function allocateWorkers(job, numOfWorkersRequested) {
var ex_id = job.ex_id;
var job_id = job.job_id;
var needsAssets = job.assets && job.assets.length > 0;
var jobStr = JSON.stringify(job);
var sortedNodes = _.orderBy(cluster_state, 'available', 'desc');

Expand Down Expand Up @@ -512,8 +509,7 @@ module.exports = function(context, server) {
ex_id: ex_id,
job_id: job_id,
workers: 1,
assignment: 'worker',
needsAssets: needsAssets
assignment: 'worker'
};

while (numOfWorkersRequested > 0) {
Expand All @@ -531,8 +527,7 @@ module.exports = function(context, server) {
job_id: job_id,
node_id: node_id,
workers: workerRequested,
assignment: 'worker',
needsAssets: needsAssets
assignment: 'worker'
};

results.push(
Expand Down Expand Up @@ -561,14 +556,12 @@ module.exports = function(context, server) {

var ex_id = job.ex_id;
var job_id = job.job_id;
var needsAssets = job.assets && job.assets.length > 0;
var jobStr = JSON.stringify(job);
var data = {
job: jobStr,
ex_id: ex_id,
job_id: job_id,
workers: 1,
needsAssets: needsAssets,
slicer_port: portObj.port,
node_id: slicerNodeID,
assignment: 'slicer',
Expand Down
4 changes: 1 addition & 3 deletions lib/cluster/services/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ module.exports = function(context, cluster_service) {
.then(function(ex) {
if (options[ex._status]) {
logger.warn(`node ${data.node_id} has disconnected with active workers for job: ${data.ex_id} , enqueuing the workers`);
var needsAssets = ex.assets && ex.assets.length > 0;
var numOfWorkers = data.workers;
var jobStr = JSON.stringify(ex);

Expand All @@ -120,8 +119,7 @@ module.exports = function(context, cluster_service) {
job_id: ex.job_id,
node_id: node_id,
workers: 1,
assignment: 'worker',
needsAssets: needsAssets
assignment: 'worker'
};

while (numOfWorkers > 0) {
Expand Down
37 changes: 2 additions & 35 deletions lib/cluster/slicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ module.exports = function(context) {
var state_store;
var ex_id = process.env.ex_id;
var events = context.foundation.getEventEmitter();


var job_runner = require('./runners/job')(context);
var logger = context.foundation.makeLogger('slicer', 'slicer', {ex_id: ex_id, module: 'slicer'});
var messaging = messageModule(context, logger);
var queueLength = context.sysconfig.teraslice.slicer_queue_length;
Expand Down Expand Up @@ -299,7 +298,7 @@ module.exports = function(context) {
.then(function(assets_store) {
context.assets = {};
context.assets.getPath = assets_store.getPath;
return instantiateJob()
return job_runner.initialize(events, logger)
})
.then(function(_job) {
job = _job;
Expand Down Expand Up @@ -520,38 +519,6 @@ module.exports = function(context) {
________________________________________________________________
*/

function instantiateJob() {
return new Promise(function(resolve, reject) {
var job_runner = require('./runners/job')(context);
var job;

events.on('worker:assets_loaded', function(ipcMessage) {
if (ipcMessage.error) {
logger.error(`Error while loading assets, error: ${ipcMessage.error}`);
reject(ipcMessage.error)
}
else {
Promise.resolve(job_runner.initialize())
.then(function(job) {
resolve(job)
})
.catch(function(err) {
logger.error('error initializing job after loading assets', err.message);
reject(err.message)
})
}
});

Promise.resolve(job_runner.initialize())
.then(function(job) {
resolve(job)
})
.catch(function(err) {
//if this errors, then we will wait for the events to fire to start job
});
})
}

function terminalShutdown(errEV) {
logger.error(`Terminal error: shutting down job ${ex_id}`);
messaging.send({message: 'job:error:terminal', error: errEV.err, ex_id: ex_id})
Expand Down
38 changes: 3 additions & 35 deletions lib/cluster/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ var _ = require('lodash');
var parseError = require('../utils/error_utils').parseError;
var messagingFn = require('./services/messaging');


module.exports = function(context) {
var events = context.foundation.getEventEmitter();
var cluster = context.cluster;
Expand All @@ -14,12 +13,13 @@ module.exports = function(context) {
var isDone = true;
var isShuttingDown = false;
var ID = context.sysconfig.teraslice.hostname + "__" + cluster.worker.id;
var job_runner = require('./runners/job')(context);
var analytics_store;
var queue;
var max_retries;
var job;
var recyle;
var startingMemory;


//this will be used to keep track of the previously sent message just in case of a disconnect
var sentMessage = false;
Expand Down Expand Up @@ -106,7 +106,7 @@ module.exports = function(context) {
.then(function(assets_store) {
context.assets = {};
context.assets.getPath = assets_store.getPath;
return instantiateJob()
return job_runner.initialize(events, logger)
})
.then(function(_job) {
job = _job;
Expand Down Expand Up @@ -311,38 +311,6 @@ module.exports = function(context) {

}

function instantiateJob() {
return new Promise(function(resolve, reject) {
var job_runner = require('./runners/job')(context);
var job;

events.on('worker:assets_loaded', function(ipcMessage) {
if (ipcMessage.error) {
logger.error(`Error while loading assets, error: ${ipcMessage.error}`);
reject(ipcMessage.error)
}
else {
Promise.resolve(job_runner.initialize())
.then(function(job) {
resolve(job)
})
.catch(function(err) {
logger.error('error initializing job after loading assets', err.message);
reject(err.message)
})
}
});

Promise.resolve(job_runner.initialize())
.then(function(job) {
resolve(job)
})
.catch(function(err) {
//if this errors, then we will wait for the events to fire to start job
});
})
}

function terminalShutdown(errEV) {
logger.error(`Terminal error, shutting down job ${ex_id}`);
events.emit('worker:shutdown');
Expand Down

0 comments on commit f43e90f

Please sign in to comment.