Skip to content

Commit

Permalink
fixed message passing to asset loader resolves #670 (#671)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsnoble authored and kstaken committed Mar 11, 2018
1 parent f2e63f4 commit e2c515f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 9 deletions.
32 changes: 29 additions & 3 deletions lib/cluster/assets_loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,34 @@ module.exports = function (context) {
const clusterConfig = context.sysconfig.teraslice;
const assetsDir = clusterConfig.assets_directory;
const messaging = messageModule(context, logger);
const config = context.sysconfig.teraslice;

messaging.register({ event: 'worker:shutdown', callback: shutdown });

const job = JSON.parse(process.env.job);
const exId = job.ex_id;

function shutdown() {
let counter = config.shutdown_timeout;
return new Promise((resolve) => {
const shutDownInterval = setInterval(() => {
if (counter <= 0) {
logger.error(`shut down time limit has been reached, asset_loader for execution ${exId} will exit while its loading assets ${JSON.stringify(job.assets)}`);
clearInterval(shutDownInterval);

Promise.resolve()
.then(logger.flush)
.finally(resolve(true));
} else {
if (counter % 6000 === 0) {
logger.warn(`shutdown sequence initiated, but is still processing. Will force shutdown in ${counter / 1000} seconds`);
}

counter -= 1000;
}
}, 1000);
});
}

function loadAssets(assetStore, assetsArray) {
// first step we normalize all identifiers to their proper id
Expand All @@ -36,9 +64,7 @@ module.exports = function (context) {
}

// TODO currently any job asset that doesn't exist in elasticsearch is ignored
// TODO: needs a shutdown story
const job = JSON.parse(process.env.job);
const exId = job.ex_id;

const msgId = process.env.__msgId;
// need to mock incoming message for response since this is dynamically created
const respondingData = { __source: 'cluster_master', __msgId: msgId };
Expand Down
32 changes: 26 additions & 6 deletions lib/cluster/services/messaging.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ const executionMessages = {
network: {}
};

const assetLoaderMessages = {
ipc: {
'worker:shutdown': 'worker:shutdown'
},
network: {}
};

const allMessages = Object.assign(
{},
clusterMasterMessages.network,
Expand All @@ -121,7 +128,9 @@ const allMessages = Object.assign(
assetServiceMessages.network,
assetServiceMessages.ipc,
executionMessages.network,
executionMessages.ipc
executionMessages.ipc,
assetLoaderMessages.ipc,
assetLoaderMessages.network
);


Expand Down Expand Up @@ -185,12 +194,18 @@ module.exports = function messaging(context, logger, childHookFn) {
return send(outgoingResponse);
}

function isExecutionProcess(process, exId) {
const assignment = process.assignment;
const executionProcess = assignment === 'worker' || assignment === 'execution_controller';
return process.ex_id === exId && executionProcess;
}

function sendToProcesses(msg) {
const childProcesses = context.cluster.workers;
const msgExId = msg.ex_id || _.get(msg, 'payload.ex_id');
if (msg.to === 'execution') {
_.each(childProcesses, (workerProcess) => {
if (workerProcess.ex_id === msg.ex_id) {
if (isExecutionProcess(workerProcess, msgExId)) {
// convert from a broad destination to a specific one
msg.to = workerProcess.assignment;
// needed for node state so it can show assets
Expand Down Expand Up @@ -451,6 +466,7 @@ module.exports = function messaging(context, logger, childHookFn) {
if (type === 'execution_controller') return slicerMessages;
if (type === 'worker') return workerMessages;
if (type === 'assets_service') return assetServiceMessages;
if (type === 'assets_loader') return assetLoaderMessages;
return new Error(`could not find message model for type: ${type}`);
}

Expand All @@ -473,10 +489,14 @@ module.exports = function messaging(context, logger, childHookFn) {
if (config.clients.ipcClient) {
process.on('message', (ipcMessage) => {
const msg = ipcMessage.message;
const realMsg = selfMessages.ipc[msg];
logger.debug(`process emitting ${realMsg}`, ipcMessage);
// process is an event emitter, the events are set during register
process.emit(realMsg, ipcMessage);
const realMsg = _.get(selfMessages, `ipc.${msg}`, null);
if (realMsg) {
logger.debug(`process emitting ${realMsg}`, ipcMessage);
// process is an event emitter, the events are set during register
process.emit(realMsg, ipcMessage);
} else {
logger.error(`process: ${self} has received a message: ${msg}, which is not registered in the messaging module`);
}
});
} else {
processContext.on('online', (worker) => {
Expand Down

0 comments on commit e2c515f

Please sign in to comment.