diff --git a/lib/cluster/execution_controller/engine.js b/lib/cluster/execution_controller/engine.js index 9374359c284..76daa93723e 100644 --- a/lib/cluster/execution_controller/engine.js +++ b/lib/cluster/execution_controller/engine.js @@ -64,6 +64,7 @@ module.exports = function module(context, messaging, exStore, stateStore, execut executionAnalytics.increment('workers_joined'); // if there are more clients than the length of the queue, increase the queue size _adjustSlicerQueueLength(); + events.emit('worker:ready', workerId); workerQueue.enqueue(workerOnlineData); } }); @@ -241,14 +242,23 @@ module.exports = function module(context, messaging, exStore, stateStore, execut function _engineSetup() { engineFn = function slicerEngineExecution() { while (workerQueue.size() && slicerQueue.size()) { - const worker = workerQueue.dequeue(); const sliceData = slicerQueue.dequeue(); - messaging.send({ - to: 'worker', - address: worker.worker_id, - message: 'slicer:slice:new', - payload: sliceData - }); + const messageData = { to: 'worker', message: 'slicer:slice:new', payload: sliceData }; + + if (sliceData.request.request_worker) { + const worker = workerQueue.extract('worker_id', sliceData.request.request_worker); + if (worker) { + messageData.address = worker.worker_id; + messaging.send(messageData); + } else { + events.emit('slice:invalid', sliceData.request); + stateStore.updateState(sliceData.request, 'invalid'); + } + } else { + const worker = workerQueue.dequeue(); + messageData.address = worker.worker_id; + messaging.send(messageData); + } } const currentWorkers = workerQueue.size(); diff --git a/lib/cluster/storage/state.js b/lib/cluster/storage/state.js index b9e7fc50ad5..38079c6fba4 100644 --- a/lib/cluster/storage/state.js +++ b/lib/cluster/storage/state.js @@ -74,8 +74,7 @@ module.exports = function module(context) { } function recoverSlices(exId, slicerId) { - const retryQuery = `ex_id:${exId} AND slicer_id:${slicerId} AND NOT state:completed`; - + const retryQuery = `ex_id:${exId} AND slicer_id:${slicerId} AND NOT (state:completed OR state:invalid)`; // Look for all slices that haven't been completed so they can be retried. return backend.refresh(indexName) .then(() => backend.search(retryQuery, 0, 5000)) @@ -101,7 +100,7 @@ module.exports = function module(context) { } function shutdown() { - logger.info('shutting down.'); + logger.info('shutting down'); return backend.shutdown(); } diff --git a/spec/execution_controller/engine-spec.js b/spec/execution_controller/engine-spec.js index 7e578bfffee..79820168bdf 100644 --- a/spec/execution_controller/engine-spec.js +++ b/spec/execution_controller/engine-spec.js @@ -51,6 +51,7 @@ describe('execution engine', () => { let respondingMsg = null; let executionOperationsUpdate = null; let exStatus = null; + const updateState = {}; const messaging = { send: (msg) => { @@ -83,8 +84,7 @@ describe('execution engine', () => { update: (exId, obj) => { executionOperationsUpdate = obj; } }; const stateStore = { - executionStartingSlice: () => { - }, + executionStartingSlice: () => {}, recoverSlices: () => { const data = testSlices.slice(); testSlices = []; @@ -92,7 +92,10 @@ describe('execution engine', () => { }, createState: () => {}, count: () => Promise.resolve(0), - shutdown: () => {} + shutdown: () => {}, + updateState: (slice, state, error) => { + updateState[state] = slice; + } }; const executionContext = { config: { @@ -716,4 +719,79 @@ describe('execution engine', () => { .catch(fail) .finally(done); }); + + it('can send slices to specific workers', (done) => { + const myEmitter = makeEmitter(); + const engine = makeEngine(); + const exId = 1234; + const engineTextContext = engine.__test_context(executionAnalytics, slicerAnalytics, recovery, exId); + const workerQueue = engineTextContext.workerQueue; + const slicerQueue = engineTextContext.slicerQueue; + const slice1 = { request: { some: 'slice' } }; + const slice2 = Object.assign({}, slice1, { request: { request_worker: 3 } }); + const slice3 = Object.assign({}, slice1, { request: { request_worker: 99 } }); + + const worker1 = { worker_id: 1 }; + const worker2 = { worker_id: 2 }; + const worker3 = { worker_id: 3 }; + + let invalidSlice = null; + + function workerQueueList() { + const results = []; + workerQueue.each(worker => results.push(worker)); + return results; + } + + engineTextContext._engineSetup(); + + myEmitter.on('slice:invalid', data => invalidSlice = data); + + workerQueue.enqueue(worker1); + workerQueue.enqueue(worker2); + workerQueue.enqueue(worker3); + + waitFor(10) + .then(() => { + // We expect that no workers have been allocated yet + expect(workerQueue.size()).toEqual(3); + slicerQueue.enqueue(slice1); + return waitFor(10); + }) + .then(() => { + expect(workerQueue.size()).toEqual(2); + expect(sentMsg).toEqual({ + to: 'worker', + message: 'slicer:slice:new', + payload: slice1, + address: 1 + }); + expect(workerQueueList()).toEqual([worker2, worker3]); + slicerQueue.enqueue(slice2); + return waitFor(10); + }) + .then(() => { + expect(workerQueue.size()).toEqual(1); + expect(sentMsg).toEqual({ + to: 'worker', + message: 'slicer:slice:new', + payload: slice2, + address: 3 + }); + expect(workerQueueList()).toEqual([worker2]); + slicerQueue.enqueue(slice3); + // check that there was no invalid state records so far + expect(invalidSlice).toEqual(null); + expect(updateState).toEqual({}); + return waitFor(10); + }) + .then(() => { + expect(workerQueue.size()).toEqual(1); + expect(invalidSlice).toEqual(slice3.request); + expect(updateState).toEqual({ invalid: slice3.request }); + expect(workerQueueList()).toEqual([worker2]); + }) + .catch(fail) + .finally(done); + }); });