Skip to content

Commit

Permalink
slicer can not direct slices to a particular worker resolves terascop…
Browse files Browse the repository at this point in the history
  • Loading branch information
jsnoble committed Dec 4, 2017
1 parent 0301363 commit 8f91412
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
34 changes: 24 additions & 10 deletions lib/cluster/slicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ module.exports = function module(contextConfig) {
logger.info(`slicer for job: ${exId} has received a pause notice`);
engineCanRun = false;
clearInterval(engine);
events.emit('job:pause');
events.emit('execution:pause');
messaging.respond(msg);
}
});
Expand All @@ -115,7 +115,7 @@ module.exports = function module(contextConfig) {
logger.info(`slicer for job: ${exId} has received a resume notice`);
engine = setInterval(engineFn, 1);
engineCanRun = true;
events.emit('job:resume');
events.emit('execution:resume');
messaging.respond(msg);
}
});
Expand Down Expand Up @@ -164,6 +164,7 @@ module.exports = function module(contextConfig) {
queueLength = messaging.getClientCounts();
}
// messaging module will join connection
events.emit('worker:ready', workerId);
workerQueue.enqueue(msg.payload);
}
});
Expand Down Expand Up @@ -472,6 +473,10 @@ module.exports = function module(contextConfig) {
}));
}

function workerSpecificSlice() {

}

function slicerEngine(slicers) {
if (!Array.isArray(slicers)) {
throw new Error(`newSlicer from module ${job.jobConfig.operations[0]._op} needs to return an array of slicers`);
Expand All @@ -482,14 +487,23 @@ module.exports = function module(contextConfig) {

engineFn = function slicerEngineExecution() {
while (workerQueue.size() && slicerQueue.size()) {
const worker = workerQueue.dequeue();
const sliceData = slicerQueue.dequeue();
messaging.send({
to: 'worker',
address: worker.worker_id,
message: 'slicer:slice:new',
payload: sliceData
});
const messageData = { to: 'worker', message: 'slicer:slice:new', payload: sliceData };

if (sliceData.request.request_worker) {
const worker = workerQueue.extract('worker_id', sliceData.request.request_worker);
if (worker) {
messageData.address = worker.worker_id;
messaging.send(messageData);
} else {
events.emit('slice:invalid', sliceData.request);
stateStore.updateState(sliceData.request, 'invalid');
}
} else {
const worker = workerQueue.dequeue();
messageData.address = worker.worker_id;
messaging.send(messageData);
}
}

const currentWorkers = workerQueue.size();
Expand Down Expand Up @@ -559,7 +573,7 @@ module.exports = function module(contextConfig) {
clearInterval(engine);
clearInterval(analyticsTimer);
pushAnalytics();
events.emit('job:stop');
events.emit('execution:stop');
Promise.resolve()
.then(() => {
if (stateStore) {
Expand Down
3 changes: 1 addition & 2 deletions lib/cluster/storage/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const Promise = require('bluebird');
const parseError = require('error_parser');
const template = require('./backends/mappings/state.json');
const timeseriesIndex = require('../../utils/date_utils').timeseriesIndex;

// Module to manager job states in Elasticsearch.
Expand Down Expand Up @@ -56,7 +55,7 @@ module.exports = function module(context) {

function recoveryContext(exId, slicerId) {
const startQuery = `ex_id:${exId} AND slicer_id:${slicerId}`;
const retryQuery = `${startQuery} AND NOT state:completed`;
const retryQuery = `${startQuery} AND NOT (state:completed OR state:invalid)`;

// Look for all slices that haven't been completed so they can be retried.
return backend.refresh(indexName)
Expand Down

0 comments on commit 8f91412

Please sign in to comment.