Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed message passing to asset loader resolves #670 #671

Merged
merged 1 commit into from
Mar 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions lib/cluster/assets_loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,34 @@ module.exports = function (context) {
const clusterConfig = context.sysconfig.teraslice;
const assetsDir = clusterConfig.assets_directory;
const messaging = messageModule(context, logger);
const config = context.sysconfig.teraslice;

messaging.register({ event: 'worker:shutdown', callback: shutdown });

const job = JSON.parse(process.env.job);
const exId = job.ex_id;

function shutdown() {
let counter = config.shutdown_timeout;
return new Promise((resolve) => {
const shutDownInterval = setInterval(() => {
if (counter <= 0) {
logger.error(`shut down time limit has been reached, asset_loader for execution ${exId} will exit while its loading assets ${JSON.stringify(job.assets)}`);
clearInterval(shutDownInterval);

Promise.resolve()
.then(logger.flush)
.finally(resolve(true));
} else {
if (counter % 6000 === 0) {
logger.warn(`shutdown sequence initiated, but is still processing. Will force shutdown in ${counter / 1000} seconds`);
}

counter -= 1000;
}
}, 1000);
});
}

function loadAssets(assetStore, assetsArray) {
// first step we normalize all identifiers to their proper id
Expand All @@ -36,9 +64,7 @@ module.exports = function (context) {
}

// TODO currently any job asset that doesn't exist in elasticsearch is ignored
// TODO: needs a shutdown story
const job = JSON.parse(process.env.job);
const exId = job.ex_id;

const msgId = process.env.__msgId;
// need to mock incoming message for response since this is dynamically created
const respondingData = { __source: 'cluster_master', __msgId: msgId };
Expand Down
32 changes: 26 additions & 6 deletions lib/cluster/services/messaging.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ const executionMessages = {
network: {}
};

const assetLoaderMessages = {
ipc: {
'worker:shutdown': 'worker:shutdown'
},
network: {}
};

const allMessages = Object.assign(
{},
clusterMasterMessages.network,
Expand All @@ -121,7 +128,9 @@ const allMessages = Object.assign(
assetServiceMessages.network,
assetServiceMessages.ipc,
executionMessages.network,
executionMessages.ipc
executionMessages.ipc,
assetLoaderMessages.ipc,
assetLoaderMessages.network
);


Expand Down Expand Up @@ -185,12 +194,18 @@ module.exports = function messaging(context, logger, childHookFn) {
return send(outgoingResponse);
}

function isExecutionProcess(process, exId) {
const assignment = process.assignment;
const executionProcess = assignment === 'worker' || assignment === 'execution_controller';
return process.ex_id === exId && executionProcess;
}

function sendToProcesses(msg) {
const childProcesses = context.cluster.workers;
const msgExId = msg.ex_id || _.get(msg, 'payload.ex_id');
if (msg.to === 'execution') {
_.each(childProcesses, (workerProcess) => {
if (workerProcess.ex_id === msg.ex_id) {
if (isExecutionProcess(workerProcess, msgExId)) {
// convert from a broad destination to a specific one
msg.to = workerProcess.assignment;
// needed for node state so it can show assets
Expand Down Expand Up @@ -451,6 +466,7 @@ module.exports = function messaging(context, logger, childHookFn) {
if (type === 'execution_controller') return slicerMessages;
if (type === 'worker') return workerMessages;
if (type === 'assets_service') return assetServiceMessages;
if (type === 'assets_loader') return assetLoaderMessages;
return new Error(`could not find message model for type: ${type}`);
}

Expand All @@ -473,10 +489,14 @@ module.exports = function messaging(context, logger, childHookFn) {
if (config.clients.ipcClient) {
process.on('message', (ipcMessage) => {
const msg = ipcMessage.message;
const realMsg = selfMessages.ipc[msg];
logger.debug(`process emitting ${realMsg}`, ipcMessage);
// process is an event emitter, the events are set during register
process.emit(realMsg, ipcMessage);
const realMsg = _.get(selfMessages, `ipc.${msg}`, null);
if (realMsg) {
logger.debug(`process emitting ${realMsg}`, ipcMessage);
// process is an event emitter, the events are set during register
process.emit(realMsg, ipcMessage);
} else {
logger.error(`process: ${self} has received a message: ${msg}, which is not registered in the messaging module`);
}
});
} else {
processContext.on('online', (worker) => {
Expand Down