Skip to content

Commit

Permalink
allow slices to be sent to specific workers resolves #624 (#653)
Browse files Browse the repository at this point in the history
* allow slices to be sent to specific workers resolves #624

* fixed parameter names to reference execution semantics

* changed message sent back
  • Loading branch information
jsnoble authored and kstaken committed Feb 14, 2018
1 parent e4851ee commit beb2ac4
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 24 deletions.
24 changes: 17 additions & 7 deletions lib/cluster/execution_controller/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ module.exports = function module(context, messaging, exStore, stateStore, execut
executionAnalytics.increment('workers_joined');
// if there are more clients than the length of the queue, increase the queue size
_adjustSlicerQueueLength();
events.emit('worker:ready', workerId);
workerQueue.enqueue(workerOnlineData);
}
});
Expand Down Expand Up @@ -241,14 +242,23 @@ module.exports = function module(context, messaging, exStore, stateStore, execut
function _engineSetup() {
engineFn = function slicerEngineExecution() {
while (workerQueue.size() && slicerQueue.size()) {
const worker = workerQueue.dequeue();
const sliceData = slicerQueue.dequeue();
messaging.send({
to: 'worker',
address: worker.worker_id,
message: 'slicer:slice:new',
payload: sliceData
});
const messageData = { to: 'worker', message: 'slicer:slice:new', payload: sliceData };

if (sliceData.request.request_worker) {
const worker = workerQueue.extract('worker_id', sliceData.request.request_worker);
if (worker) {
messageData.address = worker.worker_id;
messaging.send(messageData);
} else {
events.emit('slice:invalid', sliceData.request);
stateStore.updateState(sliceData.request, 'invalid');
}
} else {
const worker = workerQueue.dequeue();
messageData.address = worker.worker_id;
messaging.send(messageData);
}
}

const currentWorkers = workerQueue.size();
Expand Down
8 changes: 4 additions & 4 deletions lib/cluster/runners/op.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ module.exports = function module(context) {
getClient
});

function findOp(name, assetsPath, jobAssets) {
function findOp(name, assetsPath, executionAssets) {
let filePath;
let codeName;

Expand Down Expand Up @@ -79,7 +79,7 @@ module.exports = function module(context) {
if (filePath) return filePath;

if (assetsPath && existsSync(assetsPath)) {
jobAssets.forEach((assetID) => {
executionAssets.forEach((assetID) => {
const assetOpPath = `${assetsPath}/${assetID}`;
// if the path is not found yet and the opPath exists
if (!filePath && existsSync(assetOpPath)) {
Expand All @@ -103,10 +103,10 @@ module.exports = function module(context) {
return filePath;
}

function load(opName, assetPath, jobAssets) {
function load(opName, assetPath, executionAssets) {
isString(opName);

const codePath = findOp(opName, assetPath, jobAssets);
const codePath = findOp(opName, assetPath, executionAssets);
try {
return require(codePath);
} catch (error) {
Expand Down
2 changes: 1 addition & 1 deletion lib/cluster/services/assets.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ module.exports = function (context) {
})
.catch((err) => {
const code = err.code ? err.code : 500;
sendError(res, code, err.message);
sendError(res, code, parseError(err));
});
}
};
5 changes: 2 additions & 3 deletions lib/cluster/storage/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ module.exports = function module(context) {
}

function recoverSlices(exId, slicerId) {
const retryQuery = `ex_id:${exId} AND slicer_id:${slicerId} AND NOT state:completed`;

const retryQuery = `ex_id:${exId} AND slicer_id:${slicerId} AND NOT (state:completed OR state:invalid)`;
// Look for all slices that haven't been completed so they can be retried.
return backend.refresh(indexName)
.then(() => backend.search(retryQuery, 0, 5000))
Expand All @@ -101,7 +100,7 @@ module.exports = function module(context) {
}

function shutdown() {
logger.info('shutting down.');
logger.info('shutting down');
return backend.shutdown();
}

Expand Down
3 changes: 1 addition & 2 deletions lib/processors/elasticsearch_bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ var _ = require('lodash');
var getClient = require('../utils/config').getClient;
var getOpConfig = require('../utils/config').getOpConfig;

function newProcessor(context, opConfig, jobConfig) {
function newProcessor(context, opConfig, executionConfig) {
var context = context;
var logger;
var opConfig = opConfig;
var limit = opConfig.size;
var client;

var bulk_contexts = {};
var multisend = opConfig.multisend;
var multisend_index_append = opConfig.multisend_index_append;
Expand Down
2 changes: 1 addition & 1 deletion lib/processors/elasticsearch_index_selector.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ var _ = require('lodash');
var getOpConfig = require('../utils/config').getOpConfig;


function newProcessor(context, opConfig, jobConfig) {
function newProcessor(context, opConfig, executionConfig) {

function formattedDate(record) {
var offsets = {
Expand Down
2 changes: 1 addition & 1 deletion lib/processors/noop.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';


function newProcessor(context, opConfig, jobConfig) {
function newProcessor(context, opConfig, executionConfig) {
return function(data) {
return data;
};
Expand Down
2 changes: 1 addition & 1 deletion lib/processors/save_file.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

var fs = require('fs');

function newProcessor(context, opConfig, jobConfig) {
function newProcessor(context, opConfig, executionConfig) {
var path = opConfig.file_path;

return function(data) {
Expand Down
2 changes: 1 addition & 1 deletion lib/processors/stdout.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

var _ = require('lodash');

function newProcessor(context, opConfig, jobConfig) {
function newProcessor(context, opConfig, executionConfig) {
var opConfig = opConfig;

return function(data) {
Expand Down
84 changes: 81 additions & 3 deletions spec/execution_controller/engine-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ describe('execution engine', () => {
let respondingMsg = null;
let executionOperationsUpdate = null;
let exStatus = null;
const updateState = {};

const messaging = {
send: (msg) => {
Expand Down Expand Up @@ -83,16 +84,18 @@ describe('execution engine', () => {
update: (exId, obj) => { executionOperationsUpdate = obj; }
};
const stateStore = {
executionStartingSlice: () => {
},
executionStartingSlice: () => {},
recoverSlices: () => {
const data = testSlices.slice();
testSlices = [];
return Promise.resolve(data);
},
createState: () => {},
count: () => Promise.resolve(0),
shutdown: () => {}
shutdown: () => {},
updateState: (slice, state, error) => {
updateState[state] = slice;
}
};
const executionContext = {
config: {
Expand Down Expand Up @@ -716,4 +719,79 @@ describe('execution engine', () => {
.catch(fail)
.finally(done);
});

it('can send slices to specific workers', (done) => {
const myEmitter = makeEmitter();
const engine = makeEngine();
const exId = 1234;
const engineTextContext = engine.__test_context(executionAnalytics, slicerAnalytics, recovery, exId);
const workerQueue = engineTextContext.workerQueue;
const slicerQueue = engineTextContext.slicerQueue;
const slice1 = { request: { some: 'slice' } };
const slice2 = Object.assign({}, slice1, { request: { request_worker: 3 } });
const slice3 = Object.assign({}, slice1, { request: { request_worker: 99 } });

const worker1 = { worker_id: 1 };
const worker2 = { worker_id: 2 };
const worker3 = { worker_id: 3 };

let invalidSlice = null;

function workerQueueList() {
const results = [];
workerQueue.each(worker => results.push(worker));
return results;
}

engineTextContext._engineSetup();

myEmitter.on('slice:invalid', data => invalidSlice = data);

workerQueue.enqueue(worker1);
workerQueue.enqueue(worker2);
workerQueue.enqueue(worker3);

waitFor(10)
.then(() => {
// We expect that no workers have been allocated yet
expect(workerQueue.size()).toEqual(3);
slicerQueue.enqueue(slice1);
return waitFor(10);
})
.then(() => {
expect(workerQueue.size()).toEqual(2);
expect(sentMsg).toEqual({
to: 'worker',
message: 'slicer:slice:new',
payload: slice1,
address: 1
});
expect(workerQueueList()).toEqual([worker2, worker3]);
slicerQueue.enqueue(slice2);
return waitFor(10);
})
.then(() => {
expect(workerQueue.size()).toEqual(1);
expect(sentMsg).toEqual({
to: 'worker',
message: 'slicer:slice:new',
payload: slice2,
address: 3
});
expect(workerQueueList()).toEqual([worker2]);
slicerQueue.enqueue(slice3);
// check that there was no invalid state records so far
expect(invalidSlice).toEqual(null);
expect(updateState).toEqual({});
return waitFor(10);
})
.then(() => {
expect(workerQueue.size()).toEqual(1);
expect(invalidSlice).toEqual(slice3.request);
expect(updateState).toEqual({ invalid: slice3.request });
expect(workerQueueList()).toEqual([worker2]);
})
.catch(fail)
.finally(done);
});
});

0 comments on commit beb2ac4

Please sign in to comment.