Skip to content

Commit

Permalink
Elasticsearch reader fix (#564)
Browse files Browse the repository at this point in the history
* added cross job validation of persistent mode with interval resolves #563

* fixed slicer to work with persistent elastic reader and fixed delaytime fn
  • Loading branch information
jsnoble authored and kstaken committed Sep 22, 2017
1 parent 63071b2 commit 7eeea45
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
2 changes: 2 additions & 0 deletions lib/cluster/slicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ module.exports = function(context) {
});

messaging.register('worker:slice:complete', 'worker_id', function(msg, worker_id) {

slicerAnalytics.processed += 1;
//Need to join room if a restart happened
if (msg.retry) {
Expand Down Expand Up @@ -669,6 +670,7 @@ module.exports = function(context) {
events.emit('slicer:job:finished');
hasCompleted = true;
}
isProcessing = false;
}
})
.catch(function(err) {
Expand Down
5 changes: 1 addition & 4 deletions lib/readers/elasticsearch_date_range/slicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -386,12 +386,9 @@ function newSlicer(context, opConfig, job, retryData, logger, client) {
function getTimes(opConfig, jobConfig) {
var end = processInterval(opConfig.interval);
var delayInterval = processInterval(opConfig.delay);

var delayTime = getMilliseconds(opConfig, end);

var delayTime = getMilliseconds(end);
var delayedEnd = moment().subtract(delayInterval[0], delayInterval[1]).format(dateFormat);
var delayedStart = moment(delayedEnd).subtract(end[0], end[1]).format(dateFormat);

var dateArray = divideRange(delayedStart, delayedEnd, {jobConfig: jobConfig});

return dateArray.map(function(dates) {
Expand Down
20 changes: 19 additions & 1 deletion lib/readers/elasticsearch_reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var getOpConfig = require('../utils/config').getOpConfig;
var dateOptions = require('../utils/date_utils').dateOptions;
var dateMath = require('datemath-parser');
var moment = require('moment');
var _ = require('lodash');


function newSlicer(context, job, retryData, slicerAnalytics, logger) {
Expand Down Expand Up @@ -207,9 +208,26 @@ function op_validation(op) {
}
}

function _findOperation(opConfig){
var schemaObj = schema();
return _.every(schemaObj, function(config, key){
return opConfig[key] !== undefined;
});
}

function post_validation(job, sysconfig) {
if (job.lifecycle === 'persistent') {
const op = job.operations.filter(_findOperation)[0];
if (op.interval === 'auto') {
throw new Error('interval for reader must be manually set while job is in persistent mode');
}
}
}

module.exports = {
newReader: newReader,
newSlicer: newSlicer,
schema: schema,
op_validation: op_validation
op_validation: op_validation,
post_validation: post_validation
};

0 comments on commit 7eeea45

Please sign in to comment.