Skip to content

Commit

Permalink
fixed race condition on job initialization resolves terascope#524
Browse files Browse the repository at this point in the history
  • Loading branch information
jsnoble committed Sep 7, 2017
1 parent 9da9f4e commit 6d3c2a4
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 71 deletions.
6 changes: 4 additions & 2 deletions lib/cluster/node_master.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ module.exports = function(context) {
node_id: context.sysconfig._nodeName,
ex_id: createSlicerMsg.ex_id,
job_id: createSlicerMsg.job_id,
slicer_port: createSlicerMsg.slicer_port
slicer_port: createSlicerMsg.slicer_port,
needsAssets: createSlicerMsg.needsAssets
};
//used to retry a job on startup after a stop command
if (createSlicerMsg.recover_execution) {
Expand Down Expand Up @@ -190,7 +191,8 @@ module.exports = function(context) {
node_id: context.sysconfig._nodeName,
job: createWorkerMsg.job,
ex_id: createWorkerMsg.ex_id,
job_id: createWorkerMsg.job_id
job_id: createWorkerMsg.job_id,
needsAssets: createWorkerMsg.needsAssets
});

//for workers on nodes that don't have the asset loading process already going
Expand Down
35 changes: 2 additions & 33 deletions lib/cluster/slicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var analyzeStats = require('../utils/analytics').analyzeStats;
var dateFormat = require('../utils/date_utils').dateFormat;
var parseError = require('../utils/error_utils').parseError;
var messageModule = require('./services/messaging');
var instantiateJob = require('../utils/config').instantiateJob;


module.exports = function(context) {
Expand Down Expand Up @@ -289,7 +290,7 @@ module.exports = function(context) {
.then(function(assets_store) {
context.assets = {};
context.assets.getPath = assets_store.getPath;
return instantiateJob()
return instantiateJob(context, events, logger)
})
.then(function(_job) {
job = _job;
Expand Down Expand Up @@ -510,38 +511,6 @@ module.exports = function(context) {
________________________________________________________________
*/

function instantiateJob() {
return new Promise(function(resolve, reject) {
var job_runner = require('./runners/job')(context);
var job;

events.on('worker:assets_loaded', function(ipcMessage) {
if (ipcMessage.error) {
logger.error(`Error while loading assets, error: ${ipcMessage.error}`);
reject(ipcMessage.error)
}
else {
Promise.resolve(job_runner.initialize())
.then(function(job) {
resolve(job)
})
.catch(function(err) {
logger.error('error initializing job after loading assets', err.message);
reject(err.message)
})
}
});

Promise.resolve(job_runner.initialize())
.then(function(job) {
resolve(job)
})
.catch(function(err) {
//if this errors, then we will wait for the events to fire to start job
});
})
}

function terminalShutdown(errEV) {
logger.error(`Terminal error: shutting down job ${ex_id}`);
messaging.send({message: 'job:error:terminal', error: errEV.err, ex_id: ex_id})
Expand Down
39 changes: 4 additions & 35 deletions lib/cluster/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ var Promise = require('bluebird');
var _ = require('lodash');
var parseError = require('../utils/error_utils').parseError;
var messagingFn = require('./services/messaging');

var instantiateJob = require('../utils/config').instantiateJob;

module.exports = function(context) {
var events = context.foundation.getEventEmitter();
Expand All @@ -19,7 +19,8 @@ module.exports = function(context) {
var max_retries;
var job;
var recyle;
var startingMemory;

console.log('what is this env', typeof process.env.needsAssets)

//this will be used to keep track of the previously sent message just in case of a disconnect
var sentMessage = false;
Expand Down Expand Up @@ -106,7 +107,7 @@ module.exports = function(context) {
.then(function(assets_store) {
context.assets = {};
context.assets.getPath = assets_store.getPath;
return instantiateJob()
return instantiateJob(context, events, logger)
})
.then(function(_job) {
job = _job;
Expand Down Expand Up @@ -311,38 +312,6 @@ module.exports = function(context) {

}

function instantiateJob() {
return new Promise(function(resolve, reject) {
var job_runner = require('./runners/job')(context);
var job;

events.on('worker:assets_loaded', function(ipcMessage) {
if (ipcMessage.error) {
logger.error(`Error while loading assets, error: ${ipcMessage.error}`);
reject(ipcMessage.error)
}
else {
Promise.resolve(job_runner.initialize())
.then(function(job) {
resolve(job)
})
.catch(function(err) {
logger.error('error initializing job after loading assets', err.message);
reject(err.message)
})
}
});

Promise.resolve(job_runner.initialize())
.then(function(job) {
resolve(job)
})
.catch(function(err) {
//if this errors, then we will wait for the events to fire to start job
});
})
}

function terminalShutdown(errEV) {
logger.error(`Terminal error, shutting down job ${ex_id}`);
events.emit('worker:shutdown');
Expand Down
59 changes: 58 additions & 1 deletion lib/utils/config.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';

var Promise = require('bluebird');


function getOpConfig(job, name) {
return job.operations.find(function(op) {
Expand Down Expand Up @@ -31,8 +33,63 @@ function getClient(context, config, type) {
}
}

function instantiateJob(context, events, logger) {
return new Promise(function(resolve, reject) {
var job_runner = require('../cluster/runners/job')(context);
var gettingJob = true;
var gettingJobInterval;
var job;

events.on('worker:assets_loaded', function(ipcMessage) {
if (ipcMessage.error) {
logger.error(`Error while loading assets, error: ${ipcMessage.error}`);
events.removeAllListeners('worker:assets_loaded');
reject(ipcMessage.error)
}
else {
gettingJobInterval = setInterval(function() {
if (!gettingJob) {
gettingJob = true;
Promise.resolve(job_runner.initialize())
.then(function(job) {
clearInterval(gettingJobInterval);
events.removeAllListeners('worker:assets_loaded');
resolve(job)
})
.catch(function(err) {
clearInterval(gettingJobInterval);
events.removeAllListeners('worker:assets_loaded');
logger.error('error initializing job after loading assets', err.message);
reject(err.message)
})
}
}, 100);

}
});

Promise.resolve(job_runner.initialize())
.then(function(job) {
events.removeAllListeners('worker:assets_loaded');
clearInterval(gettingJobInterval);
resolve(job)
})
.catch(function(err) {
//if this errors, then we will wait for the events to fire to start job
if (process.env.needsAssets === 'null') {
//error out if there are no assets and job cannot initialize straight away
reject(parseError(err))
}
else {
gettingJob = false;
}
});
})
}


module.exports = {
getClient: getClient,
getOpConfig: getOpConfig
getOpConfig: getOpConfig,
instantiateJob: instantiateJob
};

0 comments on commit 6d3c2a4

Please sign in to comment.