Skip to content

Commit

Permalink
fixed returns on retries to not immediatly complete resolves #549
Browse files Browse the repository at this point in the history
  • Loading branch information
jsnoble committed Sep 15, 2017
1 parent d64b72c commit e99f418
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 50 deletions.
32 changes: 5 additions & 27 deletions lib/readers/elasticsearch_date_range/slicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ var dateOptions = require('./../../utils/date_utils').dateOptions;
var dateFormatMS = require('./../../utils/date_utils').dateFormat;
var dateFormatS = require('./../../utils/date_utils').dateFormatSeconds;
var parseError = require('../../utils/error_utils').parseError;
var retryModule = require('../../utils/error_utils').retryModule;

function newSlicer(context, opConfig, job, retryData, logger, client) {
var events = context.foundation.getEventEmitter();
var jobConfig = job.jobConfig;
var isPersistent = jobConfig.lifecycle === 'persistent';
var slicers = [];
var numOfRetries = job.max_retries;
var time_resolution = dateOptions(opConfig.time_resolution);
var retryError = retryModule(logger, job.max_retries);


var dateFormat = time_resolution === 'ms' ? dateFormatMS : dateFormatS;

Expand Down Expand Up @@ -293,26 +295,6 @@ function newSlicer(context, opConfig, job, retryData, logger, client) {
return getIdData(idSubslicer)
}

function retryError(retry, dateObj, err, fn, msg) {
var errMessage = parseError(err);
logger.error('error while getting next slice', errMessage);
var startKey = dateObj.start.format(dateFormat);

if (!retry[startKey]) {
retry[startKey] = 1;
fn(msg)
}
else {
retry[startKey] += 1;
if (retry[startKey] > numOfRetries) {
return Promise.reject(`max_retries met for slice, start: ${startKey}`, errMessage);
}
else {
fn(msg)
}
}
}

function nextChunk(opConfig, client, jobConfig, dates, slicer_id, retryData) {
var shouldDivideByID = opConfig.subslice_by_key;
var threshold = opConfig.subslice_key_threshold;
Expand All @@ -329,7 +311,6 @@ function newSlicer(context, opConfig, job, retryData, logger, client) {
dateParams.end = moment(dateParams.start.format(dateFormat)).add(dateParams.interval[0], dateParams.interval[1]);
logger.debug('all date configurations for date slicer', dateParams);
//used to keep track of retried queries
var retry = {};

return function sliceDate(msg) {
if (dateParams.start.isSameOrAfter(dateParams.limit)) {
Expand Down Expand Up @@ -368,7 +349,7 @@ function newSlicer(context, opConfig, job, retryData, logger, client) {
}
})
.catch(function(err) {
return retryError(retry, dateParams, err, sliceDate, msg)
return retryError(dateParams.start.format(dateFormat), err, sliceDate, msg)
})
}
};
Expand Down Expand Up @@ -436,9 +417,6 @@ function newSlicer(context, opConfig, job, retryData, logger, client) {

logger.debug('all date configurations for date slicer', dateParams);

//used to keep track of retried queries
var retry = {};

//set a timer to add the next set it should process
setInterval(function() {
//keep a list of next batches in cases current batch is still running
Expand Down Expand Up @@ -495,7 +473,7 @@ function newSlicer(context, opConfig, job, retryData, logger, client) {
}
}
.catch(function(err) {
return retryError(retry, dateParams, err, sliceDate, msg)
return retryError(dateParams.start.format(dateFormat), err, sliceDate, msg)
})
);
}
Expand Down
25 changes: 3 additions & 22 deletions lib/readers/id_slicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

var _ = require('lodash');
var parseError = require('../utils/error_utils').parseError;
var retryModule = require('../utils/error_utils').retryModule;

var base64url = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
'x', 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
Expand All @@ -18,27 +19,7 @@ module.exports = function(client, job, opConfig, logger, retryData, range) {
var baseKeyArray = getKeyArray(opConfig);
var keyArray = opConfig.key_range ? opConfig.key_range : baseKeyArray.slice();
var elasticsearch = require('elasticsearch_api')(client, logger, opConfig);
var numOfRetries = job.max_retries;
var retry = {};

function retryKey(key, err, fn, msg) {
var errMessage = parseError(err);
logger.error('error while getting next slice', errMessage);

if (!retry[key]) {
retry[key] = 1;
fn(msg)
}
else {
retry[key] += 1;
if (retry[key] > numOfRetries) {
return Promise.reject(`max_retries met for slice, key: ${key}`, errMessage);
}
else {
fn(msg)
}
}
}
var retryError = retryModule(logger, job.max_retries);

function getCountForKey(query) {
return elasticsearch.search(query);
Expand Down Expand Up @@ -106,7 +87,7 @@ module.exports = function(client, job, opConfig, logger, retryData, range) {

})
.catch(function(err) {
return retryKey(key, err, getKeySlice, query)
return retryError(key, err, getKeySlice, query)
})
}

Expand Down
25 changes: 24 additions & 1 deletion lib/utils/error_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,29 @@ function parseError(err) {
return err.response ? err.response : err;
}

function retryModule(logger, numOfRetries) {
let retry = {};
return function(key, err, fn, msg) {
var errMessage = parseError(err);
logger.error('error while getting next slice', errMessage);

if (!retry[key]) {
retry[key] = 1;
return fn(msg)
}
else {
retry[key] += 1;
if (retry[key] > numOfRetries) {
return Promise.reject(`max_retries met for slice, key: ${key}`, errMessage);
}
else {
return fn(msg)
}
}
}
}

module.exports = {
parseError: parseError
parseError: parseError,
retryModule: retryModule
};

0 comments on commit e99f418

Please sign in to comment.